Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 119 additions & 107 deletions nexus/reconfigurator/execution/src/omicron_zones.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ use anyhow::Context;
use anyhow::bail;
use cockroach_admin_client::types::NodeDecommission;
use cockroach_admin_client::types::NodeId;
use futures::Stream;
use futures::StreamExt;
use futures::future::Either;
use futures::stream;
use internal_dns_resolver::Resolver;
use internal_dns_types::names::ServiceName;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::db::datastore::CollectorReassignment;
use nexus_types::deployment::Blueprint;
use nexus_types::deployment::BlueprintZoneConfig;
use nexus_types::deployment::BlueprintZoneDisposition;
use nexus_types::deployment::BlueprintZoneType;
use omicron_common::address::COCKROACH_ADMIN_PORT;
use omicron_uuid_kinds::GenericUuid;
use omicron_uuid_kinds::OmicronZoneUuid;
use omicron_uuid_kinds::SledUuid;
use sled_agent_types::inventory::ZoneKind;
use slog::Logger;
use slog::info;
use slog::warn;
Expand All @@ -45,96 +46,116 @@ pub(crate) async fn clean_up_expunged_zones<R: CleanupResolver>(
// cockroach-admin knows how to gate decommissioning correctly.
let decommission_cockroach = false;

clean_up_expunged_zones_impl(
opctx,
datastore,
resolver,
decommission_cockroach,
blueprint
.all_omicron_zones(BlueprintZoneDisposition::is_ready_for_cleanup),
)
.await
}
let cockroach_cleanups = if decommission_cockroach {
Either::Left(clean_up_expunged_cockroach_zones(
opctx, datastore, resolver, blueprint,
))
} else {
Either::Right(stream::empty())
};

async fn clean_up_expunged_zones_impl<R: CleanupResolver>(
opctx: &OpContext,
datastore: &DataStore,
resolver: &R,
decommission_cockroach: bool,
zones_to_clean_up: impl Iterator<Item = (SledUuid, &BlueprintZoneConfig)>,
) -> Result<(), Vec<anyhow::Error>> {
let errors: Vec<anyhow::Error> = stream::iter(zones_to_clean_up)
.filter_map(async |(sled_id, config)| {
let log = opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"zone_id" => config.id.to_string(),
"zone_type" => config.zone_type.kind().report_str(),
));

let result = match &config.zone_type {
// Zones which need cleanup after expungement.
BlueprintZoneType::Nexus(_) => Some(
datastore
.database_nexus_access_delete(&opctx, config.id)
.await
.map_err(|err| anyhow::anyhow!(err)),
),
BlueprintZoneType::CockroachDb(_) => {
if decommission_cockroach {
Some(
decommission_cockroachdb_node(
opctx, datastore, resolver, config.id, &log,
)
.await,
)
} else {
None
}
}
BlueprintZoneType::Oximeter(_) => Some(
oximeter_cleanup(opctx, datastore, config.id, &log).await,
),

// Zones that may or may not need cleanup work - we haven't
// gotten to these yet!
BlueprintZoneType::BoundaryNtp(_)
| BlueprintZoneType::Clickhouse(_)
| BlueprintZoneType::ClickhouseKeeper(_)
| BlueprintZoneType::ClickhouseServer(_)
| BlueprintZoneType::Crucible(_)
| BlueprintZoneType::CruciblePantry(_)
| BlueprintZoneType::ExternalDns(_)
| BlueprintZoneType::InternalDns(_)
| BlueprintZoneType::InternalNtp(_) => {
warn!(
log,
"unsupported zone type for expungement cleanup; \
not performing any cleanup";
);
None
}
}?;

match result {
Err(error) => {
warn!(
log, "failed to clean up expunged zone";
"error" => #%error,
);
Some(error)
}
Ok(()) => {
info!(log, "successfully cleaned up after expunged zone");
None
}
let errors = cockroach_cleanups
.chain(clean_up_expunged_nexus_zones(opctx, datastore, blueprint))
.chain(clean_up_expunged_oximeter_zones(opctx, datastore, blueprint))
.filter_map(async |(log, result)| match result {
Ok(()) => {
info!(log, "successfully cleaned up after expunged zone");
None
}
Err(error) => {
warn!(
log, "failed to clean up expunged zone";
InlineErrorChain::new(&*error),
);
Some(error)
}
})
.collect()
.collect::<Vec<_>>()
.await;

if errors.is_empty() { Ok(()) } else { Err(errors) }
}

fn make_cleanup_logger(
opctx: &OpContext,
sled_id: SledUuid,
zone_id: OmicronZoneUuid,
zone_kind: ZoneKind,
) -> Logger {
opctx.log.new(slog::o!(
"sled_id" => sled_id.to_string(),
"zone_id" => zone_id.to_string(),
"zone_type" => zone_kind.report_str(),
))
}

fn clean_up_expunged_nexus_zones<'a>(
opctx: &'a OpContext,
datastore: &'a DataStore,
blueprint: &'a Blueprint,
) -> impl Stream<Item = (Logger, anyhow::Result<()>)> + 'a {
let zones_to_clean_up = blueprint
.all_omicron_zones(BlueprintZoneDisposition::is_ready_for_cleanup)
.filter(|(_sled_id, zone)| zone.zone_type.is_nexus());

stream::iter(zones_to_clean_up).then(move |(sled_id, zone)| async move {
let zone_kind = zone.zone_type.kind();
assert_eq!(zone_kind, ZoneKind::Nexus);

let log = make_cleanup_logger(opctx, sled_id, zone.id, zone_kind);
let result = datastore
.database_nexus_access_delete(&opctx, zone.id)
.await
.map_err(|err| anyhow::anyhow!(err));

(log, result)
})
}

fn clean_up_expunged_cockroach_zones<'a, R: CleanupResolver>(
opctx: &'a OpContext,
datastore: &'a DataStore,
resolver: &'a R,
blueprint: &'a Blueprint,
) -> impl Stream<Item = (Logger, anyhow::Result<()>)> + 'a {
let zones_to_clean_up = blueprint
.all_omicron_zones(BlueprintZoneDisposition::is_ready_for_cleanup)
.filter(|(_sled_id, zone)| zone.zone_type.is_cockroach());

stream::iter(zones_to_clean_up).then(move |(sled_id, zone)| async move {
let zone_kind = zone.zone_type.kind();
assert_eq!(zone_kind, ZoneKind::CockroachDb);

let log = make_cleanup_logger(opctx, sled_id, zone.id, zone_kind);
let result = decommission_cockroachdb_node(
opctx, datastore, resolver, zone.id, &log,
)
.await;

(log, result)
})
}

fn clean_up_expunged_oximeter_zones<'a>(
opctx: &'a OpContext,
datastore: &'a DataStore,
blueprint: &'a Blueprint,
) -> impl Stream<Item = (Logger, anyhow::Result<()>)> + 'a {
let zones_to_clean_up = blueprint
.all_omicron_zones(BlueprintZoneDisposition::is_ready_for_cleanup)
.filter(|(_sled_id, zone)| zone.zone_type.is_oximeter());

stream::iter(zones_to_clean_up).then(move |(sled_id, zone)| async move {
let zone_kind = zone.zone_type.kind();
assert_eq!(zone_kind, ZoneKind::Oximeter);

let log = make_cleanup_logger(opctx, sled_id, zone.id, zone_kind);
let result = oximeter_cleanup(opctx, datastore, zone.id, &log).await;

(log, result)
})
}

async fn oximeter_cleanup(
opctx: &OpContext,
datastore: &DataStore,
Expand Down Expand Up @@ -307,15 +328,14 @@ mod test {
use httptest::responders::{json_encoded, status_code};
use nexus_test_utils_macros::nexus_test;
use nexus_types::deployment::{
BlueprintZoneImageSource, blueprint_zone_type,
BlueprintZoneConfig, BlueprintZoneImageSource, BlueprintZoneType,
blueprint_zone_type,
};
use omicron_common::api::external::Generation;
use omicron_common::zpool_name::ZpoolName;
use omicron_uuid_kinds::OmicronZoneUuid;
use omicron_uuid_kinds::SledUuid;
use omicron_uuid_kinds::ZpoolUuid;
use sled_agent_types::inventory::OmicronZoneDataset;
use std::iter;
use uuid::Uuid;

type ControlPlaneTestContext =
Expand All @@ -325,11 +345,6 @@ mod test {
async fn test_clean_up_cockroach_zones(
cptestctx: &ControlPlaneTestContext,
) {
// The whole point of this test is to check that we send decommissioning
// requests; enable that. (See the real `clean_up_expunged_zones()` for
// more context.)
let decommission_cockroach = true;

// Test setup boilerplate.
let nexus = &cptestctx.server.server_context().nexus;
let datastore = nexus.datastore();
Expand All @@ -339,7 +354,6 @@ mod test {
);

// Construct the cockroach zone we're going to try to clean up.
let any_sled_id = SledUuid::new_v4();
let crdb_zone = BlueprintZoneConfig {
disposition: BlueprintZoneDisposition::Expunged {
as_of_generation: Generation::new(),
Expand Down Expand Up @@ -379,12 +393,12 @@ mod test {
// otherwise succeed, without attempting to contact our mock admin
// server. (We don't have a good way to confirm the warning was logged,
// so we'll just check for an Ok return and no contact to mock_admin.)
clean_up_expunged_zones_impl(
decommission_cockroachdb_node(
&opctx,
datastore,
&fake_resolver,
decommission_cockroach,
iter::once((any_sled_id, &crdb_zone)),
crdb_zone.id,
&opctx.log,
)
.await
.expect("unknown node ID: no cleanup");
Expand Down Expand Up @@ -426,12 +440,12 @@ mod test {
);
};
add_decommission_expecation(&mut mock_admin);
clean_up_expunged_zones_impl(
decommission_cockroachdb_node(
&opctx,
datastore,
&fake_resolver,
decommission_cockroach,
iter::once((any_sled_id, &crdb_zone)),
crdb_zone.id,
&opctx.log,
)
.await
.expect("decommissioned test node");
Expand All @@ -458,17 +472,15 @@ mod test {
add_decommission_failure_expecation(&mut mock_bad2);
let mut fake_resolver =
FixedResolver(vec![mock_bad1.addr(), mock_bad2.addr()]);
let mut err = clean_up_expunged_zones_impl(
let err = decommission_cockroachdb_node(
&opctx,
datastore,
&fake_resolver,
decommission_cockroach,
iter::once((any_sled_id, &crdb_zone)),
crdb_zone.id,
&opctx.log,
)
.await
.expect_err("no successful response should result in failure");
assert_eq!(err.len(), 1);
let err = err.pop().unwrap();
assert_eq!(
err.to_string(),
format!(
Expand All @@ -487,12 +499,12 @@ mod test {
add_decommission_failure_expecation(&mut mock_bad2);
add_decommission_expecation(&mut mock_admin);
fake_resolver.0.push(mock_admin.addr());
clean_up_expunged_zones_impl(
decommission_cockroachdb_node(
&opctx,
datastore,
&fake_resolver,
decommission_cockroach,
iter::once((any_sled_id, &crdb_zone)),
crdb_zone.id,
&opctx.log,
)
.await
.expect("decommissioned test node");
Expand Down
5 changes: 5 additions & 0 deletions nexus/types/src/deployment/zone_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ impl BlueprintZoneType {
matches!(self, BlueprintZoneType::Clickhouse(_))
}

/// Identifies whether this is an Oximeter zone
pub fn is_oximeter(&self) -> bool {
matches!(self, BlueprintZoneType::Oximeter(_))
}

/// Returns the durable dataset associated with this zone, if any exists.
pub fn durable_dataset(&self) -> Option<DurableDataset<'_>> {
let (dataset, kind) = match self {
Expand Down
Loading