Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 206 additions & 4 deletions src/controllers/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashSet, sync::Arc, time::Duration};
use jiff::{SignedDuration, Timestamp};
use k8s_openapi::{
api::{
apps::v1::Deployment,
batch::v1::Job,
core::v1::{Pod, Secret},
},
Expand All @@ -25,6 +26,7 @@ use super::{
jobs::{JobStatus, classify_job},
postgres,
postgres::DEFAULT_PG_VERSION,
restore::{build_credential_reset_job, credential_reset_job_name},
};
use crate::{
context::Context,
Expand Down Expand Up @@ -844,6 +846,166 @@ pub async fn reconcile(replica: Arc<PostgresPhysicalReplica>, ctx: Arc<Context>)
///
/// Returns `Ok(true)` when migration is complete (or skipped), `Ok(false)` when
/// in progress, or `Err` on permanent failure.
/// Returns true if a tokio_postgres error is an authentication failure.
/// We detect this by inspecting the SQLSTATE code (28P01 = invalid_password,
/// 28000 = invalid_authorization_specification) and the error message text as
/// a fallback for cases where the code is not surfaced.
fn is_auth_error(e: &Error) -> bool {
let Error::Postgres(pg_err) = e else {
return false;
};
if let Some(db_err) = pg_err.as_db_error() {
let code = db_err.code();
return code == &tokio_postgres::error::SqlState::INVALID_PASSWORD
|| code == &tokio_postgres::error::SqlState::INVALID_AUTHORIZATION_SPECIFICATION;
}
// Fallback: match the error message text that Postgres emits for auth failures.
let msg = pg_err.to_string().to_lowercase();
msg.contains("password authentication failed") || msg.contains("no password assigned")
}

/// Ensure the credential-reset Job exists and has run for the given restore.
///
/// The sequence is:
/// 1. Scale the restore deployment to 0 (so the PVC is free).
/// 2. Create a Job that uses `postgres --single` to ALTER ROLE.
/// 3. While the job is running, return `Ok(false)` (caller retries).
/// 4. When the job succeeds, scale the deployment back to 1 and return
/// `Ok(true)` so the caller re-attempts the connection on the next loop.
/// 5. If the job fails, log a warning and return `Ok(false)` — it will be
/// retried on the next reconcile once the job has been deleted.
///
/// Idempotent: safe to call repeatedly while the job is in flight.
async fn ensure_credential_reset(
client: &Client,
namespace: &str,
restore: &PostgresPhysicalRestore,
replica: &PostgresPhysicalReplica,
) -> Result<bool> {
let restore_name = restore.name_any();
let job_name = credential_reset_job_name(&restore_name);
let jobs: Api<Job> = Api::namespaced(client.clone(), namespace);
let deployments: Api<Deployment> = Api::namespaced(client.clone(), namespace);

// Check for an existing reset job first.
if let Some(job) = jobs.get_opt(&job_name).await? {
match classify_job(&job) {
JobStatus::Active => {
// Still running — nothing to do yet.
return Ok(false);
}
JobStatus::Succeeded => {
info!(
Comment thread
passcod marked this conversation as resolved.
restore = %restore_name,
job = %job_name,
"credential reset job succeeded, scaling deployment back up"
);

// Scale the deployment back to 1.
let scale_patch = serde_json::json!({
"spec": { "replicas": 1 }
});
if let Err(e) = deployments
.patch(
&restore_name,
&PatchParams::apply("postgres-restore-operator"),
&Patch::Merge(&scale_patch),
)
.await
{
warn!(restore = %restore_name, error = %e, "failed to scale deployment back up after credential reset");
}

// Delete the completed job.
if let Err(e) = jobs.delete(&job_name, &Default::default()).await {
warn!(job = %job_name, error = %e, "failed to delete completed credential reset job");
}

// Return true: credentials are fixed, caller should retry connection.
return Ok(true);
}
JobStatus::Failed => {
warn!(
restore = %restore_name,
job = %job_name,
"credential reset job failed; will retry on next reconcile"
);

// Delete the failed job so we create a fresh one next time.
if let Err(e) = jobs.delete(&job_name, &Default::default()).await {
warn!(job = %job_name, error = %e, "failed to delete failed credential reset job");
}

// Scale back up so the restore remains accessible in the meantime.
let scale_patch = serde_json::json!({
"spec": { "replicas": 1 }
});
if let Err(e) = deployments
.patch(
&restore_name,
&PatchParams::apply("postgres-restore-operator"),
&Patch::Merge(&scale_patch),
)
.await
{
warn!(restore = %restore_name, error = %e, "failed to scale deployment back up after failed credential reset");
}

return Ok(false);
}
}
}

// No job exists yet. Scale down the deployment first so the PVC is free.
info!(
restore = %restore_name,
"auth failure detected; scaling deployment to 0 for credential reset"
);
let scale_patch = serde_json::json!({
"spec": { "replicas": 0 }
});
deployments
.patch(
&restore_name,
&PatchParams::apply("postgres-restore-operator"),
&Patch::Merge(&scale_patch),
)
.await?;

// Wait for the pod to terminate so the PVC is released before we mount it
// in the job. We check once; if it's not gone yet we'll be called again on
// the next reconcile and will skip straight to the job-exists branch above.
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let label = format!("pgro.bes.au/restore={restore_name}");
let running_pods = pods
.list(&kube::api::ListParams::default().labels(&label).limit(2))
.await?;
Comment thread
review-hero[bot] marked this conversation as resolved.
let any_running = running_pods.items.iter().any(|p| {
p.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.is_some_and(|ph| ph == "Running" || ph == "Pending")
});
if any_running {
info!(
restore = %restore_name,
"waiting for pod to terminate before creating credential reset job"
);
return Ok(false);
}

// Pod is gone — create the reset job.
info!(
restore = %restore_name,
job = %job_name,
"creating credential reset job"
);
let job = build_credential_reset_job(restore, replica, &job_name, namespace)?;
jobs.create(&PostParams::default(), &job).await?;

Ok(false)
}

async fn reconcile_schema_migration(
client: &Client,
ctx: &Arc<Context>,
Expand Down Expand Up @@ -987,15 +1149,35 @@ async fn reconcile_schema_migration(
let reader_user = postgres::read_secret_field(&reader_secret, "username")?;
let reader_password = postgres::read_secret_field(&reader_secret, "password")?;

let source_dbname = postgres::discover_restore_database(
let source_dbname = match postgres::discover_restore_database(
client,
namespace,
&old_restore_name,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
.await
{
Ok(db) => db,
Err(e) if is_auth_error(&e) => {
warn!(
replica = %replica_name,
Comment thread
review-hero[bot] marked this conversation as resolved.
restore = %old_restore_name,
error = %e,
"auth failure connecting to active restore; triggering credential reset"
);
if ensure_credential_reset(client, namespace, old_restore, replica).await? {
info!(
replica = %replica_name,
restore = %old_restore_name,
"credential reset complete, retrying on next reconcile"
);
}
return Ok(false);
}
Err(e) => return Err(e),
};

// Filter out schemas that don't exist on the source. This happens when the
// user adds a schema to persistent_schemas before actually creating it.
Expand Down Expand Up @@ -1081,15 +1263,35 @@ async fn reconcile_schema_migration(
)
.await?;

let target_dbname = postgres::discover_restore_database(
let target_dbname = match postgres::discover_restore_database(
client,
namespace,
&new_restore_name,
&reader_user,
&reader_password,
ctx.use_port_forward(),
)
.await?;
.await
{
Ok(db) => db,
Err(e) if is_auth_error(&e) => {
warn!(
replica = %replica_name,
restore = %new_restore_name,
error = %e,
"auth failure connecting to switching restore; triggering credential reset"
);
if ensure_credential_reset(client, namespace, new_restore, replica).await? {
info!(
replica = %replica_name,
restore = %new_restore_name,
"credential reset complete, retrying on next reconcile"
);
}
return Ok(false);
}
Err(e) => return Err(e),
};

// Check that none of the persistent schemas already exist in the snapshot.
// If they do, the pg_dump|psql migration would conflict, so we must fail
Expand Down
2 changes: 2 additions & 0 deletions src/controllers/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::{

mod builders;

pub(crate) use builders::{build_credential_reset_job, credential_reset_job_name};

#[cfg(test)]
mod tests;

Expand Down
Loading