Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions maiko/src/monitoring/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ pub trait Monitor<E: Event, T: Topic<E> = DefaultTopic>: Send {
let _r = receiver;
}

/// Called when a new actor is registered in the system.
///
/// Fires once when the actor is spawned and added to the broker registry.
fn on_actor_registered(&self, actor_id: &ActorId) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good and useful, but... it's not plugged anywhere in Maiko's code, so technically it's a dead code. It should be invoked in supervisor when monitoring enabled. That's the only supervisor change needed (unlike the original version of the PR). Alternatively you could catch in in broker.rs.

let _a = actor_id;
}

/// Called when an actor's handler returns an error.
fn on_error(&self, err: &str, actor_id: &ActorId) {
let _a = actor_id;
Expand Down
169 changes: 169 additions & 0 deletions maiko/src/monitors/actor_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

use crate::{ActorId, DefaultTopic, Envelope, Event, OverflowPolicy, Topic, monitoring::Monitor};

/// Monitor that tracks actor lifecycle and overflow status.
pub struct ActorMonitor {
inner: Arc<Mutex<ActorMonitorInner>>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monitors run always as a single Tokio task - using Mutex adds unnecessary overhead, you could easily use Rc<RefCell<>> instead. Or - for numeric values - atomics.

See Tracer or Recorder for references.

}

struct ActorMonitorInner {
active: HashSet<ActorId>,
stopped: HashSet<ActorId>,
overflow_counts: HashMap<ActorId, usize>,
}

/// Status returned by `actor_status()`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ActorStatus {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conflates lifecycle and overflow history - an actor that overflowed once reports as Overflowing forever - that might be misleading. Sorry if my comment in the previous review was confusing here. What would make more sense would be to have status indicitaing only whether an actor is alive or stopped, and then the overflow_count on top of it. Or - even simplier - is_alive() and overflow_count().

Alive,
Stopped,
Overflowing(usize),
}

impl ActorMonitor {
/// Create a new `ActorMonitor`.
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(ActorMonitorInner {
active: HashSet::new(),
stopped: HashSet::new(),
overflow_counts: HashMap::new(),
})),
}
}

/// Returns a snapshot of currently registered (alive) actors.
pub fn actors(&self) -> Vec<ActorId> {
let lock = self.inner.lock().unwrap();
lock.active.iter().cloned().collect()
}

/// Returns a snapshot of actors that have stopped.
pub fn stopped_actors(&self) -> Vec<ActorId> {
let lock = self.inner.lock().unwrap();
lock.stopped.iter().cloned().collect()
}

/// Returns the status of a specific actor.
pub fn actor_status(&self, actor: &ActorId) -> ActorStatus {
let lock = self.inner.lock().unwrap();
if let Some(&count) = lock.overflow_counts.get(actor) {
return ActorStatus::Overflowing(count);
}
if lock.active.contains(actor) {
ActorStatus::Alive
} else {
ActorStatus::Stopped
}
}
}

impl<E, T> Monitor<E, T> for ActorMonitor
where
E: Event,
T: Topic<E> + Send,
{
fn on_actor_registered(&self, actor_id: &ActorId) {
let mut lock = self.inner.lock().unwrap();
lock.active.insert(actor_id.clone());
// ensure stopped set doesn't keep a stale entry
lock.stopped.remove(actor_id);
}

fn on_actor_stop(&self, actor_id: &ActorId) {
let mut lock = self.inner.lock().unwrap();
lock.active.remove(actor_id);
lock.stopped.insert(actor_id.clone());
}

fn on_overflow(
&self,
_envelope: &Envelope<E>,
_topic: &T,
receiver: &ActorId,
_policy: OverflowPolicy,
) {
let mut lock = self.inner.lock().unwrap();
*lock.overflow_counts.entry(receiver.clone()).or_insert(0) += 1;
}
}

impl Default for ActorMonitor {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;

#[derive(Clone, Debug)]
struct TestEvent(i32);
impl Event for TestEvent {}

fn make_id(name: &str) -> ActorId {
ActorId::new(Arc::from(name))
}

#[test]
fn default_is_empty() {
let m = ActorMonitor::default();
assert!(m.actors().is_empty());
assert!(m.stopped_actors().is_empty());
}

#[test]
fn actor_registration_and_status() {
let monitor = ActorMonitor::new();
let a = make_id("actor-1");
let m: &dyn Monitor<TestEvent, DefaultTopic> = &monitor;
m.on_actor_registered(&a);
assert!(monitor.actors().iter().any(|id| id == &a));
assert_eq!(monitor.actor_status(&a), ActorStatus::Alive);
}

#[test]
fn actor_stop_and_stopped_list() {
let monitor = ActorMonitor::new();
let a = make_id("actor-2");
let m: &dyn Monitor<TestEvent, DefaultTopic> = &monitor;
m.on_actor_registered(&a);
m.on_actor_stop(&a);
assert!(monitor.stopped_actors().iter().any(|id| id == &a));
assert_eq!(monitor.actor_status(&a), ActorStatus::Stopped);
}

#[test]
fn overflow_counts_and_status() {
let monitor = ActorMonitor::new();
let a = make_id("actor-3");
let env = Envelope::new(TestEvent(1), a.clone());
let topic = DefaultTopic;
monitor.on_overflow(&env, &topic, &a, OverflowPolicy::Fail);
assert_eq!(monitor.actor_status(&a), ActorStatus::Overflowing(1));
monitor.on_overflow(&env, &topic, &a, OverflowPolicy::Fail);
assert_eq!(monitor.actor_status(&a), ActorStatus::Overflowing(2));
}

#[test]
fn overflow_precedes_alive_or_stopped() {
let monitor = ActorMonitor::new();
let a = make_id("actor-4");
let env = Envelope::new(TestEvent(1), a.clone());
let topic = DefaultTopic;

let m: &dyn Monitor<TestEvent, DefaultTopic> = &monitor;
m.on_actor_registered(&a);
m.on_overflow(&env, &topic, &a, OverflowPolicy::Fail);
// overflow takes precedence in `actor_status()`
assert_eq!(monitor.actor_status(&a), ActorStatus::Overflowing(1));

m.on_actor_stop(&a);
// still overflowing even after stop (overflow_counts checked first)
assert_eq!(monitor.actor_status(&a), ActorStatus::Overflowing(1));
}
}
3 changes: 3 additions & 0 deletions maiko/src/monitors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
mod tracer;
pub use tracer::Tracer;

mod actor_monitor;
pub use actor_monitor::{ActorMonitor, ActorStatus};

#[cfg(feature = "recorder")]
mod recorder;

Expand Down