Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 85 additions & 34 deletions aggregator/src/aggregator/queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use itertools::Itertools;
use janus_aggregator_core::TIME_HISTOGRAM_BOUNDARIES;
use janus_core::time::{InstantClock, InstantLike, RealInstantClock};
use opentelemetry::{
KeyValue,
metrics::{Histogram, Meter},
Expand All @@ -11,7 +12,6 @@ use std::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Instant,
};
use tokio::{
select,
Expand All @@ -38,7 +38,7 @@ use super::Error;
/// because the order that request futures are scheduled and executed in is essentially
/// non-deterministic.
#[derive(Debug)]
pub struct LIFORequestQueue {
pub struct LIFORequestQueue<C: InstantClock = RealInstantClock> {
/// Sends messages to the dispatcher task.
dispatcher_tx: mpsc::UnboundedSender<DispatcherMessage>,

Expand All @@ -48,11 +48,13 @@ pub struct LIFORequestQueue {
/// counter.
id_counter: AtomicU64,

metrics: Metrics,
metrics: Metrics<C>,

instant_clock: C,
}

impl LIFORequestQueue {
/// Creates a new [`Self`].
/// Creates a new [`Self`] with the real clock.
///
/// `concurrency` must be greater than zero.
///
Expand All @@ -63,6 +65,24 @@ impl LIFORequestQueue {
depth: usize,
meter: &Meter,
meter_prefix: &str,
) -> Result<Self, Error> {
Self::with_instant_clock(concurrency, depth, meter, meter_prefix, RealInstantClock)
}
}

impl<C: InstantClock> LIFORequestQueue<C> {
/// Creates a new [`Self`] with a custom instant clock.
///
/// `concurrency` must be greater than zero.
///
/// `meter_prefix` is a string to disambiguate one queue from another in the metrics, while
/// using the same meter. All metric names will be prefixed with this string.
pub fn with_instant_clock(
concurrency: u32,
depth: usize,
meter: &Meter,
meter_prefix: &str,
instant_clock: C,
) -> Result<Self, Error> {
if concurrency < 1 {
return Err(Error::InvalidConfiguration(
Expand All @@ -82,6 +102,7 @@ impl LIFORequestQueue {
id_counter,
dispatcher_tx: message_tx,
metrics,
instant_clock,
})
}

Expand All @@ -98,7 +119,7 @@ impl LIFORequestQueue {
mut dispatcher_rx: mpsc::UnboundedReceiver<DispatcherMessage>,
concurrency: u32,
depth: usize,
metrics: Metrics,
metrics: Metrics<C>,
) -> JoinHandle<()> {
tokio::spawn(async move {
// Use a BTreeMap to allow for cancellation (i.e. removal) of waiting requests in
Expand Down Expand Up @@ -184,7 +205,7 @@ impl LIFORequestQueue {
let id = self.id_counter.fetch_add(1, Ordering::Relaxed);
let (permit_tx, permit_rx) = oneshot::channel();

let enqueue_time = Instant::now();
let enqueue_time = self.instant_clock.now();
self.dispatcher_tx
.send(DispatcherMessage::Enqueue(id, PermitTx(permit_tx)))
// We don't necessarily panic because the dispatcher task could be shutdown as part of
Expand All @@ -193,20 +214,20 @@ impl LIFORequestQueue {

/// Sends a cancellation message over the given channel when the guard is dropped, unless
/// [`Self::disarm`] is called.
struct CancelDropGuard {
struct CancelDropGuard<C: InstantClock, I: InstantLike> {
id: u64,
sender: mpsc::UnboundedSender<DispatcherMessage>,
armed: bool,
metrics: Metrics,
enqueue_time: Instant,
metrics: Metrics<C>,
enqueue_time: I,
}

impl CancelDropGuard {
impl<C: InstantClock, I: InstantLike> CancelDropGuard<C, I> {
fn new(
id: u64,
sender: mpsc::UnboundedSender<DispatcherMessage>,
metrics: Metrics,
enqueue_time: Instant,
metrics: Metrics<C>,
enqueue_time: I,
) -> Self {
Self {
id,
Expand All @@ -222,7 +243,7 @@ impl LIFORequestQueue {
}
}

impl Drop for CancelDropGuard {
impl<C: InstantClock, I: InstantLike> Drop for CancelDropGuard<C, I> {
fn drop(&mut self) {
if self.armed {
self.metrics.wait_time_histogram.record(
Expand All @@ -237,11 +258,11 @@ impl LIFORequestQueue {
}
}

let mut drop_guard = CancelDropGuard::new(
let mut drop_guard = CancelDropGuard::<C, _>::new(
id,
self.dispatcher_tx.clone(),
self.metrics.clone(),
enqueue_time,
enqueue_time.clone(),
);
let permit = permit_rx.await;
drop_guard.disarm();
Expand Down Expand Up @@ -288,14 +309,14 @@ impl PermitTx {
///
/// Multiple request handlers can share a queue, by cloning the [`Arc`] that wraps the queue.
#[derive(Handler)]
pub struct LIFOQueueHandler<H> {
pub struct LIFOQueueHandler<H, C: InstantClock = RealInstantClock> {
#[handler(except = [run])]
handler: H,
queue: Arc<LIFORequestQueue>,
queue: Arc<LIFORequestQueue<C>>,
}

impl<H: Handler> LIFOQueueHandler<H> {
pub fn new(queue: Arc<LIFORequestQueue>, handler: H) -> Self {
impl<H: Handler, C: InstantClock> LIFOQueueHandler<H, C> {
pub fn new(queue: Arc<LIFORequestQueue<C>>, handler: H) -> Self {
Self { handler, queue }
}

Expand All @@ -311,12 +332,15 @@ impl<H: Handler> LIFOQueueHandler<H> {
}

/// Convenience function for wrapping a handler with a [`LIFOQueueHandler`].
pub fn queued_lifo<H: Handler>(queue: Arc<LIFORequestQueue>, handler: H) -> impl Handler {
pub fn queued_lifo<H: Handler, C: InstantClock>(
queue: Arc<LIFORequestQueue<C>>,
handler: H,
) -> LIFOQueueHandler<H, C> {
LIFOQueueHandler::new(queue, handler)
}

#[derive(Clone, Debug)]
struct Metrics {
struct Metrics<C: InstantClock> {
/// The approximate number of requests currently being serviced by the queue. It's approximate
/// since the queue length may have changed before the measurement is taken. In practice, the
/// error should only be +/- 1. It is also more or less suitable for synchronization during
Expand All @@ -325,9 +349,11 @@ struct Metrics {

/// Histogram measuring how long a queue item waited before being dequeued.
wait_time_histogram: Histogram<f64>,

_phantom: std::marker::PhantomData<C>,
}

impl Metrics {
impl<C: InstantClock> Metrics<C> {
const OUTSTANDING_REQUESTS_METRIC_NAME: &'static str = "outstanding_requests";
const MAX_OUTSTANDING_REQUESTS_METRIC_NAME: &'static str = "max_outstanding_requests";
const WAIT_TIME_METRIC_NAME: &'static str = "lifo_queue_wait_time";
Expand Down Expand Up @@ -378,6 +404,7 @@ impl Metrics {
Ok(Self {
outstanding_requests,
wait_time_histogram,
_phantom: std::marker::PhantomData,
})
}
}
Expand All @@ -396,14 +423,17 @@ mod tests {
use backon::{BackoffBuilder, ExponentialBuilder, Retryable};
use futures::{Future, future::join_all};
use janus_aggregator_core::test_util::noop_meter;
use janus_core::test_util::install_test_trace_subscriber;
use janus_core::{
test_util::install_test_trace_subscriber,
time::{MockInstantClock, RealInstantClock},
};
use opentelemetry_sdk::metrics::data::Gauge;
use quickcheck::{Arbitrary, TestResult, quickcheck};
use tokio::{
runtime::Builder as RuntimeBuilder,
sync::Notify,
task::{JoinHandle, yield_now},
time::{sleep, timeout},
time::timeout,
};
use tracing::debug;
use trillium::{Conn, Handler, Status};
Expand All @@ -430,9 +460,9 @@ mod tests {
.await
// The metric may not be immediately available when we need it, so return an Option
// instead of unwrapping.
.get(&Metrics::metric_name(
.get(&Metrics::<RealInstantClock>::metric_name(
meter_prefix,
Metrics::OUTSTANDING_REQUESTS_METRIC_NAME,
Metrics::<RealInstantClock>::OUTSTANDING_REQUESTS_METRIC_NAME,
))?
.data
.as_any()
Expand All @@ -455,8 +485,8 @@ mod tests {
.map(|metric| !condition(metric))
.unwrap_or(true)
{
// Nominal sleep to prevent this loop from being too tight.
sleep(Duration::from_millis(3)).await;
// Yield to allow other tasks to run
yield_now().await;
}
}

Expand Down Expand Up @@ -635,9 +665,16 @@ mod tests {
let meter_prefix = "test";
let metrics = InMemoryMetricInfrastructure::new();
let unhang = Arc::new(Notify::new());
let instant_clock = MockInstantClock::new();
let queue = Arc::new(
LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix)
.unwrap(),
LIFORequestQueue::with_instant_clock(
concurrency,
depth,
&metrics.meter,
meter_prefix,
instant_clock,
)
.unwrap(),
);
let handler = Arc::new(queued_lifo(
Arc::clone(&queue),
Expand Down Expand Up @@ -716,9 +753,16 @@ mod tests {
let unhang = Arc::new(Notify::new());
let meter_prefix = "test";
let metrics = InMemoryMetricInfrastructure::new();
let instant_clock = MockInstantClock::new();
let queue = Arc::new(
LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix)
.unwrap(),
LIFORequestQueue::with_instant_clock(
concurrency,
depth,
&metrics.meter,
meter_prefix,
instant_clock,
)
.unwrap(),
);
let handler = Arc::new(queued_lifo(
Arc::clone(&queue),
Expand Down Expand Up @@ -781,9 +825,16 @@ mod tests {
let unhang = Arc::new(Notify::new());
let meter_prefix = "test";
let metrics = InMemoryMetricInfrastructure::new();
let instant_clock = MockInstantClock::new();
let queue = Arc::new(
LIFORequestQueue::new(concurrency, depth, &metrics.meter, meter_prefix)
.unwrap(),
LIFORequestQueue::with_instant_clock(
concurrency,
depth,
&metrics.meter,
meter_prefix,
instant_clock,
)
.unwrap(),
);
let handler = Arc::new(queued_lifo(
Arc::clone(&queue),
Expand Down
8 changes: 6 additions & 2 deletions aggregator/src/binary_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub async fn datastore<C: Clock>(
datastore_keys: &[String],
check_schema_version: bool,
max_transaction_retries: u64,
) -> Result<Datastore<C>> {
) -> Result<Datastore<C, janus_core::time::RealInstantClock>> {
let datastore_keys = datastore_keys
.iter()
.filter(|k| !k.is_empty())
Expand All @@ -165,21 +165,25 @@ pub async fn datastore<C: Clock>(
}

let datastore = if check_schema_version {
Datastore::new(
use janus_core::time::RealInstantClock;
Datastore::with_instant_clock(
pool,
Crypter::new(datastore_keys),
clock,
meter,
max_transaction_retries,
RealInstantClock,
)
.await?
} else {
use janus_core::time::RealInstantClock;
Datastore::new_without_supported_versions(
pool,
Crypter::new(datastore_keys),
clock,
meter,
max_transaction_retries,
RealInstantClock,
)
.await
};
Expand Down
Loading
Loading