Real-time AI agent security monitoring. Workers do real tasks. Sentinels watch for compromise. Commanders quarantine threats. All over PubNub.
+
+ Security Monitoring
+ Real I/O
+ PubNub
+ Bedsheet Sense
+ OpenClaw Inspired
+
+
+
+
+
1 The Threat Landscape
+
+
The OpenClaw crisis of January-February 2026 showed the world what happens when AI agents go wrong. This demo recreates those scenarios in a controlled environment so you can see how distributed sentinel agents detect and respond to compromise.
+
+
+
Rogue Agent Behavior
+
An OpenClaw agent gained iMessage access and spammed 500+ messages to a user's contacts. Our behavior sentinel monitors output rates to detect this kind of anomaly.
+
+
+
+
Supply Chain Attacks
+
7.1% of skills in the ClawHub marketplace contained malicious code. Our supply chain sentinel verifies skill integrity using SHA-256 hashes against a known-good registry.
+
+
+
+
API Key Exposure
+
The Moltbook breach exposed 1.5 million API keys. Our workers simulate real tasks where credentials could leak if an agent goes rogue.
+
+
+
+
Mass Exposure
+
Over 40,000 unsecured OpenClaw instances were found on the public internet. Sentinel networks like this one provide the missing security layer.
+
+
+
+
2 Architecture
+
+
Agent Sentinel uses six agents organized in three tiers:
When triggered, the web researcher enters a rapid-fire search burst — 50 searches in quick succession, each logged to the activity log. This simulates the output spam pattern seen in the OpenClaw incident:
The behavior sentinel reads data/activity_log.jsonl and calculates actions-per-minute. A burst of 50 searches triggers a rate anomaly alert at the threshold of 10 actions/min.
+
+
+
+
4 Scheduler
+
+
The scheduler manages a real JSON calendar file (data/calendar.json). Every 20 seconds, the LLM reviews and manages appointments.
Calendar sabotage: deletes all real appointments and fills the calendar with 20 junk entries dated 1999-01-01. Each deletion and addition is logged to the activity log.
+
+
+
5 Skill Acquirer
+
+
The skill acquirer manages a local ClawHub-style marketplace. It installs Python skill files from clawhub/ into data/installed_skills/, verifying SHA-256 hashes against the registry.
The install tool checks the registry before installing. If a skill is flagged "malicious": true, installation is blocked:
+
+
@skill_tools.action("install_skill", "Install a skill from ClawHub")
+async def install_skill(skill_name: str) -> str:
+ info = registry[skill_name]
+ if info.get("malicious"):
+ return f"BLOCKED: '{skill_name}' is flagged as malicious"
+
+ # Verify SHA-256 before installing
+ actual_hash = hashlib.sha256(open(source, "rb").read()).hexdigest()
+ if actual_hash != info["sha256"]:
+ return "INTEGRITY ERROR: hash mismatch"
+
+ shutil.copy2(source, dest)
+ return f"Installed '{skill_name}' (SHA-256 verified)"
+
+
Rogue Behavior (~15% chance)
+
+
Bypasses the malicious check and installs data_exfiltrator.py directly. The file is inert (no actual network calls), but contains suspicious imports (base64, socket) that the supply chain sentinel will flag.
+
+
+
The data_exfiltrator.py Skill
+
This file is intentionally crafted to look malicious for the demo. It imports base64 and socket and defines an _EXFIL_ENDPOINT, but the exfiltrate_data() function just returns a string. It performs no actual network calls or data theft.
+
+
+
+
6 Behavior Sentinel
+
+
The behavior sentinel monitors output rates by reading the shared activity log every 10 seconds. It doesn't use the LLM for monitoring — it does direct file I/O and math.
When the commander queries the behavior sentinel via request(), the LLM uses these tools to investigate:
+
+
+
Tool
What It Does
+
check_activity_log(minutes)
Count actions per agent in time window
+
check_output_rate(agent_name)
Actions/minute for a specific agent
+
get_anomaly_report()
Full report of all anomalies detected
+
+
+
+
7 Supply Chain Sentinel
+
+
The supply chain sentinel scans data/installed_skills/ every 15 seconds, computing SHA-256 hashes and comparing them against the ClawHub registry.
+
+
Detection Logic
+
+
def _scan_for_issues() -> list[dict]:
+ registry = _load_registry()
+ for f in os.listdir(_INSTALLED_DIR):
+ actual_hash = hashlib.sha256(open(path, "rb").read()).hexdigest()
+ info = registry.get(f)
+
+ if info is None:
+ issues.append({"type": "unknown_origin"})
+ elif info.get("malicious"):
+ issues.append({"type": "known_malicious"})
+ elif actual_hash != info["sha256"]:
+ issues.append({"type": "hash_mismatch"})
+ return issues
+
+
Three detection categories:
+
+
+
Issue Type
Severity
Meaning
+
known_malicious
Critical
Installed skill is flagged malicious in registry
+
hash_mismatch
High
File has been tampered with since registry was created
+
unknown_origin
High
Skill not in registry at all (unknown provenance)
+
+
+
+
8 Sentinel Commander
+
+
The commander is the correlation engine. It listens for alert signals and uses Sense network RPC to query sentinels for evidence before issuing quarantine.
+
+
Alert Handling Flow
+
+
+
When an alert arrives:
+
+
Claim the incident using the claim protocol (prevents duplicate handling)
+
Query sentinels via request_remote_agent() for corroborating evidence
+
Assess the threat — the LLM synthesizes findings from multiple sentinels
+
Issue quarantine if evidence confirms compromise
+
Generate report — a threat assessment printed to the console
When the commander calls request("behavior-sentinel", "check web-researcher activity"), the Sixth Sense sends a request signal to the behavior sentinel's direct channel. The sentinel's LLM receives the task, uses its tools to read the activity log, and sends the result back as a response signal matched by correlation_id. The commander's request() call returns the string result.
+
+
+
+
9 Setup & Run
+
+
Prerequisites
+
+
+
PubNub account — Sign up at pubnub.com (free tier is sufficient)
+
Anthropic API key — For Claude LLM access
+
Python 3.11+
+
+
+
Installation
+
+
# Install dependencies
+uv pip install bedsheet[sense] duckduckgo-search
+
+# Navigate to the demo
+cd examples/agent-sentinel
+
+# Validate the project configuration
+bedsheet validate
+
+
+╷ Configuration is valid!
+
+Configuration Summary:
+ Version: 1.0
+ Project: agent-sentinel
+ Active Target: local
+ Number of Agents: 6
+
+
Running
+
+
# Set environment variables
+export PUBNUB_SUBSCRIBE_KEY=sub-c-...
+export PUBNUB_PUBLISH_KEY=pub-c-...
+export ANTHROPIC_API_KEY=sk-ant-...
+
+# Launch all 6 agents
+python run.py
+
+
============================================================
+ Agent Sentinel - AI Agent Security Monitoring
+ Inspired by the OpenClaw crisis of 2026
+ Launching 6 agents (3 workers + 2 sentinels + 1 commander)...
+============================================================
+ Starting web-researcher...
+[web-researcher] Online and researching...
+ Starting scheduler...
+[scheduler] Online and managing calendar...
+ Starting skill-acquirer...
+[skill-acquirer] Online and acquiring skills...
+ Starting behavior-sentinel...
+[behavior-sentinel] Online and monitoring behavior...
+ Starting supply-chain-sentinel...
+[supply-chain-sentinel] Online and monitoring skill integrity...
+ Starting sentinel-commander...
+[sentinel-commander] Online and coordinating...
+============================================================
+ All agents online! Workers are doing real work.
+ Sentinels are watching. ~15% chance of rogue behavior per cycle.
+ Press Ctrl+C to stop.
+============================================================
+
+
+
10 Expected Demo Flow
+
+
Because rogue behavior triggers randomly (~15% per cycle), each run tells a different story. Here's a typical sequence:
+
+
Phase 1: Normal Operations (first 30-60 seconds)
+
+
Workers do real work. The web researcher searches DuckDuckGo. The scheduler manages appointments. The skill acquirer installs legitimate skills. Sentinels scan and find nothing unusual.
+
+
Phase 2: Rogue Trigger
+
+
Eventually a worker goes rogue (15% chance per cycle). The most dramatic scenarios:
+
+
+
Scenario A: Search Spam
+
Web researcher fires 50 rapid searches. Behavior sentinel detects rate anomaly (25 actions/min, threshold is 10). Broadcasts alert. Commander investigates.
+
+
+
+
Scenario B: Malicious Skill Install
+
Skill acquirer installs data_exfiltrator.py. Supply chain sentinel detects known-malicious skill on next scan. Broadcasts critical alert. Commander queries both sentinels and issues quarantine.
Start from scratch with bedsheet init my-sentinel-network, then add your own worker and sentinel agents. See the Sixth Sense Guide for the full API tutorial, and the Design Document for architecture details.
+ Set up and run the Agent Sentinel demo — a distributed security monitoring system where
+ AI agents watch each other for rogue behavior, compromised supply chains, and coordinated threats.
+
+
+
+
+
+
1. Overview
+
+ Agent Sentinel is a multi-agent demo that showcases Bedsheet's
+ Sixth Sense distributed communication module. Six agents run as
+ separate processes, communicating over PubNub in real time:
+
+
+
+
+
Agent
Role
What It Does
+
+
+
+
web-researcher
+
Worker
+
Performs DuckDuckGo searches every 15s. Occasionally goes rogue with burst searches.
+
+
+
scheduler
+
Worker
+
Manages a JSON calendar every 20s. May go rogue and sabotage appointments.
+
+
+
skill-acquirer
+
Worker
+
Installs Python skills with SHA-256 verification every 25s. May install malicious skills.
+
+
+
behavior-sentinel
+
Sentinel
+
Monitors activity logs for output rate anomalies (>10 actions/min).
+
+
+
supply-chain-sentinel
+
Sentinel
+
Scans installed skills against a known-good registry for integrity violations.
+
+
+
sentinel-commander
+
Commander
+
Correlates alerts from sentinels and issues quarantine orders.
+
+
+
+
+
+ Each worker has a ~15% chance per cycle to exhibit rogue behavior.
+ Sentinels detect anomalies and broadcast alerts. The commander investigates and quarantines compromised agents.
+
+
+
+ Related Resources
+ Live Dashboard — Real-time PubNub signal visualization
+ examples/agent-sentinel/docs/design.html — Architecture and design documentation
+
+ Tip
+ Add these to a .env file in the examples/agent-sentinel/
+ directory for convenience. The demo reads from the environment directly, so use
+ source .env or a tool like direnv.
+
+
+
+
+
+
6. Validate Configuration
+
Navigate to the demo directory and validate the agent configuration:
+ The launcher starts all 6 agents as separate subprocesses with staggered 2-second delays.
+ Workers start first, followed by sentinels, then the commander.
+
+ API Costs
+ Each agent cycle makes Claude API calls. The demo is designed for short demonstration runs
+ (5–10 minutes). Stop with Ctrl+C when you've seen the sentinel
+ detection and quarantine flow.
+
+
+
+
+
+
8. What to Watch For
+
+ As the demo runs, watch for these key events in the terminal output:
+
+
+
Normal Operations
+
+ Workers perform their tasks on regular cycles. You'll see search queries, calendar
+ operations, and skill installations with hash verifications.
+
+
+
Rogue Triggers (~15% per cycle)
+
+ When a worker goes rogue, its output changes dramatically:
+
+
+
web-researcher: Fires 50 rapid searches in quick succession
+
scheduler: Wipes the calendar and adds 20 junk entries
+
skill-acquirer: Installs data_exfiltrator.py bypassing the safety check
+
+
+
Sentinel Alerts
+
+ Sentinels detect anomalies and broadcast alert signals:
+
+ While the demo is running, open the live dashboard to visualize signals in real time:
+
+
+
+
Open docs/agent-sentinel-dashboard.html in your browser
+
Enter your PubNub Subscribe Key in the connection panel
+
Click Connect
+
+
+
The dashboard connects to the same PubNub channels as the agents and shows:
+
+
World map with agent nodes at simulated cloud regions
+
Alert feed with real-time alerts as they arrive
+
Agent status cards showing online/offline presence
+
Signal log showing all raw signals at the bottom
+
Animated signal lines between agents when they communicate
+
+
+
+ No Simulation
+ Every signal on the dashboard comes from the actual running agents.
+ The dashboard is a pure PubNub subscriber — it doesn't generate any fake data.
+
+
+
+
+
+
11. Next Steps
+
+
Live Dashboard — Open the real-time signal visualization
+
examples/agent-sentinel/docs/design.html — Deep dive into the architecture and agent design
Before the Sixth Sense, Bedsheet agents were limited to single-process execution. A Supervisor could coordinate multiple agents, but they all lived in the same Python process. This created three problems:
+
+
+
Scalability: A single process can only run so many agents before resource contention kills performance. CPU-intensive agents block the event loop for others.
+
Isolation: One crashing agent can take down the whole system. No way to restart a single agent without restarting everything.
+
Distribution: No way to run agents across machines or cloud providers. A monitoring agent on AWS can't talk to an analyzer on GCP.
+
+
+
The Sixth Sense solves this by giving agents distributed communication over PubNub. Each agent runs in its own process (or container, or cloud function) and communicates through publish/subscribe messaging. They find each other by subscribing to shared channels.
+
+
+
Design Goal
+
Add distributed communication without changing how single agents work. A regular Agent stays exactly the same. You opt in to networking by adding SenseMixin to your class hierarchy — zero changes to existing code.
+
+
+
+
2 Architecture Overview
+
+
The Sixth Sense is organized as a layered module inside bedsheet/sense/:
+
+
bedsheet/sense/
+├── __init__.py # Public API exports
+├── signals.py # Signal dataclass & SignalKind type
+├── serialization.py # Compact JSON serialization (30KB limit)
+├── protocol.py # SenseTransport Protocol & AgentPresence
+├── mixin.py # SenseMixin (the main integration point)
+├── network.py # SenseNetwork (multi-agent convenience)
+└── pubnub_transport.py # PubNub implementation
+
+
Each layer has a clear responsibility:
+
+
+
+
signals.py
+
The Signal dataclass — the atomic unit of inter-agent communication. Seven signal kinds cover all interaction patterns.
+
+
+
serialization.py
+
Compact JSON encoding with short keys (k, s, p) to stay under PubNub's 32KB message limit. Auto-truncates large payloads.
+
+
+
protocol.py
+
The SenseTransport Protocol defines the transport contract. Any class implementing these 7 methods can serve as a transport.
+
+
+
mixin.py
+
SenseMixin adds network capabilities to any Agent via Python's multiple inheritance. This is the primary integration point.
+
+
+
+
+
Key Insight: The Dependency Direction
+
Dependencies flow inward: mixin.py depends on protocol.py (not on pubnub_transport.py). The PubNub transport is the outermost layer and is completely swappable. You could write a Redis, NATS, or MQTT transport without touching any other file.
+
+
+
+
3 Signal Design
+
+
The Signal Dataclass
+
+
@dataclass
+class Signal:
+ kind: SignalKind # What type of signal
+ sender: str # Who sent it (agent name)
+ payload: dict[str, Any] # Arbitrary data
+ correlation_id: str # Links requests to responses
+ target: str | None # Intended recipient (None = broadcast)
+ timestamp: float # Unix timestamp (auto-set)
+ source_channel: str | None # Which channel it arrived on
+
+
+
Why a Dataclass, Not a Pydantic Model?
+
Bedsheet uses @dataclass for all data structures (events, signals, messages). Pydantic adds validation overhead that's unnecessary for internal data structures where the framework controls construction. Dataclasses give us type hints, __eq__, and __repr__ for free with zero runtime cost. The same reasoning applies to the existing Event types in bedsheet/events.py.
+
+
+
The Seven Signal Kinds
+
+
We use a Literal type (not an Enum) for signal kinds. This gives us type-checking without the overhead of enum instances:
+
+
SignalKind = Literal[
+ "request", # Ask another agent to do something
+ "response", # Reply to a request
+ "alert", # Broadcast an alert to all listeners
+ "heartbeat", # Periodic "I'm alive" signal
+ "claim", # Attempt to claim ownership of an incident
+ "release", # Release a claimed incident
+ "event", # General-purpose notification
+]
+
+
+
Kind
Direction
Purpose
+
request
Targeted (one agent)
Delegate a task to a specific agent via request()
+
response
Targeted (requester)
Return the result of a request, matched by correlation_id
+
alert
Broadcast (all)
Notify all subscribers of a condition (CPU high, breach detected)
+
heartbeat
Broadcast (all)
Periodic signal with agent capabilities for presence detection
+
claim
Broadcast (all)
Attempt to own an incident (for conflict-free coordination)
+
release
Broadcast (all)
Release ownership so another agent can claim it
+
event
Broadcast or targeted
General-purpose notification for extensibility
+
+
+
+
Why Exactly Seven Kinds?
+
request + response cover RPC-style delegation. alert + event cover pub/sub notifications. claim + release cover distributed coordination. heartbeat covers presence. This set is minimal but sufficient — every agent interaction pattern we've encountered maps to one of these seven. We chose not to make it extensible (e.g., custom kinds) because a fixed set enables optimized routing in the signal loop.
+
+
+
Correlation IDs
+
+
Every signal gets a correlation_id — a 12-character hex string from uuid4(). This is how request/response pairs are matched:
+
+
# Commander sends a request with correlation_id="a1b2c3d4e5f6"
+signal = Signal(kind="request", sender="commander",
+ payload={"task": "check CPU"}, target="cpu-watcher")
+
+# cpu-watcher responds with the SAME correlation_id
+response = Signal(kind="response", sender="cpu-watcher",
+ payload={"result": "CPU at 45%"},
+ correlation_id="a1b2c3d4e5f6", # same ID
+ target="commander")
+
+
The mixin's signal loop checks self._pending_requests[signal.correlation_id] to resolve the correct asyncio.Future. This means an agent can have multiple concurrent requests in flight without confusion.
Bedsheet uses Protocol (structural subtyping) throughout the codebase — LLMClient, Memory, and now SenseTransport. The philosophy: if it walks like a duck and quacks like a duck, it's a duck. A class satisfies SenseTransport by implementing the right methods, without explicitly inheriting from it. This means you can write a transport in a separate package that doesn't import bedsheet at all — it just needs the right method signatures. The @runtime_checkable decorator lets us use isinstance() checks when needed.
+
+
+
The Seven Methods
+
+
The protocol is intentionally minimal. Every method maps to a fundamental pub/sub operation:
+
+
+
Method
What It Does
PubNub Equivalent
+
connect()
Establish connection with identity
Create PubNubAsyncio instance
+
disconnect()
Clean up and close
pubnub.stop()
+
broadcast()
Publish a signal to a channel
pubnub.publish()
+
subscribe()
Listen to a channel
pubnub.subscribe()
+
unsubscribe()
Stop listening
pubnub.unsubscribe()
+
signals()
Async iterator of incoming signals
Read from asyncio.Queue
+
get_online_agents()
Who's on a channel right now?
pubnub.here_now()
+
+
+
AgentPresence
+
+
AgentPresence is a simple dataclass representing a remote agent's identity on the network:
This is returned by get_online_agents() and populated from PubNub's here_now() API (or from heartbeat signals in the mock transport).
+
+
+
5 Why PubNub
+
+
We evaluated several messaging backends before choosing PubNub:
+
+
+
Option
Pros
Cons
+
PubNub
Zero infrastructure, built-in presence, global CDN, generous free tier
32KB message limit, vendor lock-in
+
Redis Pub/Sub
Fast, familiar, no message limits
Requires Redis server, no built-in presence, no cross-cloud
+
NATS
Very fast, cloud-native
Requires NATS server, complex JetStream setup for persistence
+
WebSocket server
Full control, no vendor
Must build everything: routing, presence, reconnection, scaling
+
AWS SNS/SQS
Native AWS, highly reliable
AWS-only, complex setup, not real-time (polling)
+
+
+
+
The Decision
+
PubNub won because of zero infrastructure. A demo user signs up, gets keys, and has global real-time messaging in 5 minutes. No Docker containers, no cloud setup, no servers to manage. The 32KB limit is handled by compact serialization (short keys, auto-truncation). The vendor lock-in is mitigated by the SenseTransport Protocol — you can swap to Redis or NATS by implementing 7 methods.
+
+
+
PubNub's Key Features We Use
+
+
+
Publish/Subscribe: Core messaging. Each channel is a topic agents subscribe to.
+
Presence: PubNub tracks who's on each channel via here_now(). We get agent discovery for free.
+
UUID identity: Each PubNub connection has a UUID. We set this to the agent's name for presence identification.
Global CDN: Messages route through PubNub's edge network, so agents in different regions get low latency.
+
+
+
+
6 The Thread-to-Asyncio Bridge
+
+
This is the trickiest piece of the PubNub integration. PubNub's Python SDK uses threaded callbacks for incoming messages, but Bedsheet is fully async. We need to safely cross the thread boundary.
+
+
The Problem
+
+
# PubNub calls this from a BACKGROUND THREAD:
+class _SignalListener(SubscribeCallback):
+ def message(self, pubnub, message):
+ # We're on PubNub's thread, NOT the asyncio event loop!
+ # Cannot await anything, cannot use asyncio directly
+ signal = deserialize(message.message)
+ # How do we get this signal to the async signal_loop?
+
+
The Solution: call_soon_threadsafe
+
+
class _SignalListener(SubscribeCallback):
+ def __init__(self, queue: asyncio.Queue[Signal], loop: asyncio.AbstractEventLoop):
+ self._queue = queue
+ self._loop = loop
+
+ def message(self, pubnub, message):
+ signal = deserialize(message.message, source_channel=message.channel)
+ # Thread-safe way to put into the asyncio queue:
+ self._loop.call_soon_threadsafe(self._queue.put_nowait, signal)
+
+
+
What's Happening Here
+
+
We capture the asyncio event loop reference at connection time (asyncio.get_running_loop())
+
We pass the loop and an asyncio.Queue to the PubNub callback listener
+
When PubNub's thread delivers a message, we call loop.call_soon_threadsafe()
+
This schedules queue.put_nowait(signal) to run on the event loop's thread
+
The mixin's _signal_loop() awaits queue.get() on the async side
+
+
+
+
call_soon_threadsafe is the standard Python mechanism for thread-to-asyncio communication. It's safe to call from any thread and guarantees the callback runs on the event loop's thread during the next iteration.
+
+
+
Why Not asyncio.run_coroutine_threadsafe?
+
We use call_soon_threadsafe with put_nowait instead of run_coroutine_threadsafe with put because put_nowait is synchronous and never blocks. The queue has no max size, so it won't raise QueueFull. This is simpler and avoids creating unnecessary futures on the PubNub callback thread.
+
+
+
+
7 Channel Naming Convention
+
+
PubNub channels are just strings. We use a namespaced convention to prevent collisions:
+
+
def _full_channel(self, channel: str) -> str:
+ """Expand short channel name to full namespaced channel."""
+ if channel.startswith("bedsheet."):
+ return channel
+ return f"bedsheet.{self._namespace}.{channel}"
+
+# Examples:
+# "alerts" → "bedsheet.cloud-ops.alerts"
+# "tasks" → "bedsheet.cloud-ops.tasks"
+# "agent-1" → "bedsheet.cloud-ops.agent-1" (direct channel)
+
+
Every agent also subscribes to its own direct channel (bedsheet.{namespace}.{agent_name}). This enables targeted messaging — when you call send_to("cpu-watcher", signal), it publishes to bedsheet.cloud-ops.cpu-watcher.
+
+
+
Why Namespace Channels?
+
Namespacing prevents cross-contamination between different deployments sharing the same PubNub keys. A staging environment using namespace "staging" won't interfere with production using "prod", even on the same PubNub app.
+
+
+
+
8 The Mixin Pattern
+
+
SenseMixin is the heart of the Sixth Sense. It adds distributed capabilities to any Agent without modifying the Agent class itself:
class MyAgent(SenseMixin, Agent):
+ pass
+
+# Method Resolution Order (MRO):
+# MyAgent → SenseMixin → Agent → object
+
+# When you call MyAgent(name="x", instruction="y", model_client=client):
+# 1. MyAgent.__init__ → not defined, goes to next
+# 2. SenseMixin.__init__(self, name="x", ...)
+# → super().__init__(name="x", ...) → Agent.__init__
+# 3. Agent.__init__ sets up name, instruction, model_client
+# 4. Back in SenseMixin.__init__, adds _transport, _signal_handlers, etc.
+
+
+
Why a Mixin, Not Inheritance or Composition?
+
Option A: Inheritance (class SenseAgent(Agent)) — Forces users to subclass a specific class. Can't add sensing to a Supervisor, or to user's custom Agent subclass.
+
Option B: Composition (agent.sense = SenseAdapter(agent)) — Cleaner separation, but awkward API. Users would write agent.sense.broadcast() instead of agent.broadcast(). The adapter would need to reach into the agent's internals for invoke().
+
Option C: Mixin (class MyAgent(SenseMixin, Agent)) — Best of both worlds. The agent IS-A SenseMixin and IS-A Agent. broadcast(), request(), on_signal() feel like native agent methods. Works with both Agent and Supervisor.
+
+
+
The type: ignore[attr-defined] Pattern
+
+
You'll notice # type: ignore[attr-defined] comments throughout the mixin. This is because SenseMixin accesses self.name and self.invoke(), which are defined on Agent, not on the mixin itself:
Mypy can't know that SenseMixin will always be combined with Agent. The type ignores acknowledge this limitation. An alternative would be a generic Protocol for the host class, but that adds complexity for no runtime benefit.
+
+
+
9 Request/Response Over PubNub
+
+
The most complex pattern is request/response — asking a remote agent to do something and waiting for the answer. This implements an RPC-like pattern over pub/sub messaging.
Stores the future in self._pending_requests[correlation_id]
+
Sends a request signal to cpu-watcher's direct channel
+
Awaits the future with a timeout
+
+
+
cpu-watcher's signal loop receives the request
+
Calls _handle_request() which runs self.invoke(session_id, task)
+
The agent's LLM processes the task, calls tools, generates a response
+
Sends a response signal back with the same correlation_id
+
+
+
Commander's signal loop receives the response
+
Matches correlation_id to the pending future
+
Resolves the future with the response signal
+
Commander's request() returns the result string
+
+
+
+
Key Implementation Detail: _handle_request
+
+
async def _handle_request(self, signal: Signal) -> None:
+ task = signal.payload.get("task", "")
+ session_id = f"sense-{signal.correlation_id}"
+
+ # Run the full agent loop (LLM + tools)
+ result = ""
+ async for event in self.invoke(session_id, task):
+ if isinstance(event, CompletionEvent):
+ result = event.response
+
+ # Send response back to requester
+ response_signal = Signal(
+ kind="response",
+ sender=self.name,
+ payload={"result": result},
+ correlation_id=signal.correlation_id,
+ target=signal.sender,
+ )
+ await self.send_to(signal.sender, response_signal)
+
+
This is where the Sixth Sense connects to Bedsheet's core: the request handler calls self.invoke(), which runs the agent's full ReAct loop (LLM reasoning → tool calls → final response). The remote agent isn't just a message router — it's a full AI agent that thinks about the request and uses its tools to answer it.
+
+
+
Concurrency
+
Requests are handled with asyncio.create_task(self._handle_request(signal)), so multiple requests can be processed concurrently. An agent can handle several incoming requests while also sending its own requests to others.
+
+
+
+
10 Claim Protocol
+
+
When multiple agents see the same alert, we need exactly one to handle it. The claim protocol provides leaderless conflict resolution — no central coordinator required.
+
+
How It Works
+
+
async def claim_incident(self, incident_id: str, channel: str) -> bool:
+ # 1. Broadcast our claim
+ signal = Signal(kind="claim", sender=self.name,
+ payload={"incident_id": incident_id})
+ await self.broadcast(channel, signal)
+
+ # 2. Wait 500ms for competing claims
+ await asyncio.sleep(0.5)
+
+ # 3. If still in our claimed set, we won
+ return incident_id in self._claimed_incidents
+
+
The conflict resolution happens in _handle_claim():
+
+
def _handle_claim(self, signal: Signal) -> None:
+ incident_id = signal.payload.get("incident_id")
+ if incident_id in self._claimed_incidents:
+ # Tie-breaking: lower sender name wins
+ if signal.sender < self.name:
+ self._claimed_incidents.discard(incident_id)
+
+
+
Why Not a Distributed Lock?
+
A proper distributed lock (Redlock, ZooKeeper, etcd) would guarantee exactly-once processing. But it would also require additional infrastructure, add latency, and create failure modes. The claim protocol is probabilistic but practical: the 500ms window and deterministic tie-breaking (alphabetical agent name) give us conflict-free coordination 99%+ of the time. For a monitoring system where occasional duplicate handling is harmless, this is the right trade-off.
+
+
+
+
11 Testing Strategy
+
+
The MockSenseTransport
+
+
The key testing challenge: how to test distributed agent communication without PubNub? The answer is MockSenseTransport, which follows the same pattern as MockLLMClient.
+
+
The Hub Pattern
+
+
class _MockSenseHub:
+ """Shared state for mock transports. Routes signals between agents."""
+
+ def __init__(self):
+ self.queues: dict[str, asyncio.Queue[Signal]] = {}
+ self.subscriptions: dict[str, set[str]] = {}
+ self.presences: dict[str, AgentPresence] = {}
+
+class MockSenseTransport:
+ def __init__(self, hub: _MockSenseHub | None = None):
+ self.hub = hub or _MockSenseHub()
+
+ def create_peer(self) -> "MockSenseTransport":
+ """Create a sibling transport sharing the same hub."""
+ return MockSenseTransport(self.hub)
+
+
+
Why the Hub Pattern?
+
The first implementation used a single transport for all agents. This failed because connect(agent_id) overwrites the agent's identity — the second agent's connect would overwrite the first agent's ID. The hub pattern gives each agent its own transport instance (with its own identity and queue) while sharing the routing infrastructure. create_peer() returns a new transport connected to the same hub.
+
+
+
Test Design Lessons
+
+
+
Don't Read From the Queue Directly
+
The signal loop is a background task that continuously reads from the transport's queue. If a test tries to read from the queue directly (e.g., await queue.get()), it races with the signal loop. Use on_signal handlers in tests instead, and await asyncio.sleep() to give the loop time to process.
Add agent, reject non-sense agent, stop disconnects all
+
Events
5
All 5 new event dataclasses
+
+
+
+
12 Trade-offs & Future Work
+
+
What We Chose
+
+
+
Decision
Benefit
Cost
+
PubNub over self-hosted
Zero infrastructure
32KB limit, vendor dependency
+
Mixin over composition
Natural API (agent.request())
type: ignore comments
+
Protocol over ABC
Structural subtyping
No enforced implementation
+
Probabilistic claims
No infrastructure needed
Rare duplicate handling
+
Short-key serialization
Stays under 32KB
Less readable wire format
+
Hub pattern for testing
True multi-agent tests
More complex mock
+
+
+
Known Limitations
+
+
+
No message persistence: PubNub messages are ephemeral (unless you enable PubNub Storage, a paid feature). If an agent is offline when a signal is sent, it misses it.
+
No message ordering guarantee: PubNub guarantees per-channel ordering, but cross-channel ordering is not guaranteed.
+
Claim protocol is probabilistic: Under very high load or network partitions, two agents might both win a claim. Acceptable for monitoring; not suitable for financial transactions.
+
32KB message limit: The auto-truncation handles this, but very large payloads lose data. If you need to send large data, send a reference (URL, S3 key) instead.
+
+
+
Future Directions
+
+
+
Redis transport: For teams with existing Redis infrastructure
+
NATS transport: For high-throughput scenarios
+
Message persistence: Store signals for replay and audit trails
+
Encryption: End-to-end encryption for signal payloads
+
Agent discovery: Automatic capability-based routing (find the agent that can handle this request)
Distributed communication for Bedsheet agents. Run agents across processes, machines, and cloud providers — they find each other and collaborate over PubNub.
+
+ PubNub-backed
+ Zero infrastructure
+ Real-time
+ Works behind firewalls
+
+
+
+
+
1 Overview
+
+
Bedsheet's Supervisor pattern works great when all agents run in a single Python process. But what if your agents need to run on different machines, behind different firewalls, or even on different cloud providers?
+
+
The Sixth Sense module adds distributed communication to any Bedsheet agent via PubNub's real-time messaging platform. Agents become network-aware peers that can broadcast alerts, send requests to each other, and coordinate incident responses — all without running HTTP servers or managing service discovery.
+
+
+
+
Pure Clients
+
Agents are PubNub clients, not servers. No ports to open, no URLs to register. Works behind NATs and firewalls.
+
+
+
Signal-Based
+
Seven signal kinds (request, response, alert, heartbeat, claim, release, event) cover all coordination patterns.
+
+
+
Swappable Transport
+
SenseTransport protocol means PubNub is one implementation. Swap in MQTT, Redis Streams, or your own.
+
+
+
Leaderless Coordination
+
Claim-based protocol lets multiple commanders compete for incident ownership. No central coordinator needed.
The request() method sends a task to a remote agent and waits for the response. Under the hood, the receiving agent runs invoke() with the task and sends back the completion:
+
+
# Commander asks cpu-watcher to check usage
+result = await commander.request(
+ "cpu-watcher",
+ "What is the current CPU usage?",
+ timeout=30.0,
+)
+print(result) # "Overall: 45.2%, Per-core: [32.1, 58.3, ...]"
+
+
+
What's happening?
+
+
Commander creates a request signal with a unique correlation_id
+
Signal is published to the cpu-watcher's direct channel
+
cpu-watcher's signal loop receives the request
+
cpu-watcher calls self.invoke() with the task text
+
The LLM uses cpu-watcher's tools to gather data
+
The CompletionEvent response is sent back as a response signal
+
Commander's future resolves with the result
+
+
+
+
+
Timeout
+
If the remote agent doesn't respond within the timeout, a TimeoutError is raised. The default timeout is 30 seconds.
+
+
+
+
6 Claim Protocol
+
+
When multiple commander agents are online, they need to coordinate who handles an incident. The claim protocol provides leaderless conflict resolution:
+
+
# When an alert arrives
+won = await agent.claim_incident("incident-001", "tasks")
+if won:
+ # We are responsible for this incident
+ await investigate_and_report()
+else:
+ # Another agent claimed it first
+ pass
+
+
+
How Claims Work
+
+
Agent publishes a claim signal with the incident ID and timestamp
+
Waits 500ms for competing claims from other agents
+
If competing claims arrive, the earliest timestamp wins
+
Loser backs off; winner coordinates the response
+
When done, winner publishes release to free the incident
+
+
+
+
+
7 SenseNetwork API
+
+
For managing multiple agents in the same process (useful for testing and simpler deployments), use SenseNetwork:
============================================================
+ Cloud Monitor - Bedsheet Sense Demo
+ Launching 5 distributed agents...
+============================================================
+ Starting cpu-watcher...
+[cpu-watcher] Online and monitoring...
+ Starting memory-watcher...
+[memory-watcher] Online and monitoring...
+ Starting log-analyzer...
+[log-analyzer] Online and ready...
+ Starting security-scanner...
+[security-scanner] Online and ready...
+ Starting incident-commander...
+[incident-commander] Online and coordinating...
+============================================================
+ All agents online! Press Ctrl+C to stop.
+============================================================
+
+
When CPU exceeds 80%, the cpu-watcher broadcasts an alert. The incident-commander claims it, queries the other agents for context, and synthesizes an incident report.
+
+
Agent Sentinel Demo
+
+
The examples/agent-sentinel/ directory demonstrates AI agent security monitoring, inspired by the OpenClaw crisis of 2026. Three worker agents perform real tasks (DuckDuckGo searches, JSON calendar management, skill installation with SHA-256 verification), while sentinel agents monitor them for rogue behavior:
+
+
+
Agent
Role
Data Source
+
web-researcher
Worker: web search
DuckDuckGo
+
scheduler
Worker: calendar management
JSON file
+
skill-acquirer
Worker: skill installation
Local ClawHub files
+
behavior-sentinel
Sentinel: output rate monitoring
Activity log
+
supply-chain-sentinel
Sentinel: skill integrity
SHA-256 hashing
+
sentinel-commander
Commander: alert correlation
Sense network
+
+
+
Each worker has a ~15% chance per cycle of going rogue. Sentinels detect anomalies through real file I/O and alert the commander over PubNub, who investigates and issues quarantine orders.
Use MockSenseTransport for unit tests — it routes signals in-memory without PubNub:
+
+
from bedsheet.testing import MockSenseTransport
+
+# Create a shared hub for multiple agents
+transport = MockSenseTransport()
+
+# Each agent gets its own peer transport
+t1 = transport # First agent
+t2 = transport.create_peer() # Second agent (shares the hub)
+
+# Agents connected to the same hub can exchange signals
+await agent1.join_network(t1, "test", ["alerts"])
+await agent2.join_network(t2, "test", ["alerts"])
+
+result = await agent1.request("agent2", "do something")
+assert result == "done"
+
+
+
Note
+
MockSenseTransport follows the same pattern as MockLLMClient — a test double that satisfies the protocol without external dependencies.
+
+
+
+
10 Reference
+
+
Signal Fields
+
+
Field
Type
Description
+
kind
SignalKind
One of: request, response, alert, heartbeat, claim, release, event
+
sender
str
Name of the sending agent
+
payload
dict
Arbitrary data (default: empty dict)
+
correlation_id
str
Links requests to responses (auto-generated)
+
target
str | None
Intended recipient (None = broadcast)
+
timestamp
float
Unix timestamp (auto-set)
+
source_channel
str | None
Channel the signal arrived on (set by transport)
+
+
+
SenseMixin Methods
+
+
Method
Description
+
join_network(transport, namespace, channels)
Connect and start listening
+
leave_network()
Disconnect and stop background tasks
+
broadcast(channel, signal)
Send signal to a channel
+
send_to(agent_name, signal)
Send signal to an agent's direct channel
+
request(agent_name, task, timeout)
Send task and await response
+
claim_incident(incident_id, channel)
Attempt to claim an incident
+
release_incident(incident_id, channel)
Release a claimed incident
+
on_signal(kind)
Decorator for custom signal handlers
+
+
+
New Event Types
+
+
Event
When Emitted
+
SignalReceivedEvent
Signal arrived from network
+
AgentConnectedEvent
Remote agent came online
+
AgentDisconnectedEvent
Remote agent went offline
+
RemoteDelegationEvent
Task sent to remote agent
+
RemoteResultEvent
Result received from remote agent
+
+
+
+
+
+
+
+
diff --git a/examples/agent-sentinel/README.md b/examples/agent-sentinel/README.md
new file mode 100644
index 0000000..6409699
--- /dev/null
+++ b/examples/agent-sentinel/README.md
@@ -0,0 +1,119 @@
+# Agent Sentinel
+
+Real-time AI agent security monitoring using [Bedsheet Sense](../../docs/sixth-sense-guide.html).
+
+## Motivation
+
+The OpenClaw crisis of Jan-Feb 2026 exposed critical gaps in AI agent security:
+
+- **Rogue behavior**: Agent spammed 500+ messages after gaining iMessage access
+- **Supply chain attacks**: 7.1% of ClawHub marketplace skills contained malicious code
+- **API key leaks**: Moltbook breach exposed 1.5M API keys
+- **Mass exposure**: 40,000+ unsecured agent instances on the public internet
+
+Agent Sentinel demonstrates how a network of sentinel agents can monitor production AI agents for compromise, drift, and rogue behavior in real-time using PubNub.
+
+## Architecture
+
+**3 Worker Agents** (do real work):
+- `web-researcher` — Performs DuckDuckGo searches
+- `scheduler` — Manages a JSON calendar file
+- `skill-acquirer` — Installs Python skills from a local ClawHub registry with SHA-256 verification
+
+**2 Sentinel Agents** (monitor workers):
+- `behavior-sentinel` — Reads the shared activity log, detects output rate anomalies
+- `supply-chain-sentinel` — Re-hashes installed skills, detects malicious installs
+
+**1 Commander** (correlates and responds):
+- `sentinel-commander` — Listens for alerts, queries sentinels, issues quarantine orders
+
+Each worker has a ~15% chance per cycle of going "rogue" — the web researcher spams searches, the scheduler sabotages the calendar, the skill acquirer installs a known-malicious skill. Sentinels detect this through real file I/O and alert the commander over PubNub.
+
+## Quick Start
+
+This project was scaffolded with `bedsheet init` and uses the Bedsheet CLI tooling:
+
+```bash
+# 1. Install Bedsheet with Sense module and search dependency
+uv pip install bedsheet[sense] duckduckgo-search
+
+# 2. Validate the project configuration
+cd examples/agent-sentinel
+bedsheet validate
+
+# 3. Set environment variables
+export PUBNUB_SUBSCRIBE_KEY=sub-c-...
+export PUBNUB_PUBLISH_KEY=pub-c-...
+export ANTHROPIC_API_KEY=sk-ant-...
+
+# 4. Run the demo
+python run.py
+```
+
+## Using the Bedsheet CLI
+
+This project includes a `bedsheet.yaml` that defines all 6 agents. You can use the full Bedsheet CLI workflow:
+
+```bash
+# Validate the configuration
+bedsheet validate
+# ✓ Configuration is valid!
+# Project: agent-sentinel
+# Agents: 6
+
+# Generate deployment artifacts (e.g., for local Docker)
+bedsheet generate --target local
+
+# Or generate for cloud deployment
+bedsheet generate --target gcp
+bedsheet generate --target aws
+```
+
+### Creating Your Own Sentinel Project
+
+To start a new project from scratch using the Bedsheet CLI:
+
+```bash
+# Scaffold a new project
+bedsheet init my-sentinel-network
+
+# Customize agents/ with your own worker and sentinel agents
+# Edit bedsheet.yaml to register your agents
+
+# Validate and generate
+bedsheet validate
+bedsheet generate --target local
+```
+
+## What to Expect
+
+1. Workers come online and start doing real work (searching, scheduling, installing skills)
+2. Randomly (~15% per cycle), a worker goes rogue
+3. Sentinels detect the anomaly through real file reads:
+ - Behavior sentinel sees the activity rate spike in `data/activity_log.jsonl`
+ - Supply chain sentinel sees the malicious skill in `data/installed_skills/`
+4. Sentinels broadcast alert signals over PubNub
+5. Commander claims the incident, queries sentinels for evidence, issues quarantine
+
+## Files
+
+```
+agent-sentinel/
+├── bedsheet.yaml # Bedsheet CLI configuration
+├── pyproject.toml # Dependencies
+├── run.py # Subprocess launcher
+├── data/
+│ └── calendar.json # Pre-seeded appointments
+├── clawhub/
+│ ├── registry.json # Skill hashes + malicious flags
+│ ├── weather_lookup.py # Legit skill
+│ ├── sentiment_analyzer.py # Legit skill
+│ └── data_exfiltrator.py # Known-malicious (inert)
+└── agents/
+ ├── web_researcher.py # Worker
+ ├── scheduler.py # Worker
+ ├── skill_acquirer.py # Worker
+ ├── behavior_sentinel.py # Sentinel
+ ├── supply_chain_sentinel.py # Sentinel
+ └── sentinel_commander.py # Commander
+```
diff --git a/examples/agent-sentinel/agents/__init__.py b/examples/agent-sentinel/agents/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/examples/agent-sentinel/agents/behavior_sentinel.py b/examples/agent-sentinel/agents/behavior_sentinel.py
new file mode 100644
index 0000000..8b7842e
--- /dev/null
+++ b/examples/agent-sentinel/agents/behavior_sentinel.py
@@ -0,0 +1,201 @@
+"""Behavior Sentinel - monitors worker agents for output rate anomalies.
+
+Reads the shared activity_log.jsonl file and detects when agents produce
+abnormally high action rates (e.g., 50+ searches in a burst = rogue behavior).
+"""
+
+import asyncio
+import json
+import os
+import time
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class BehaviorSentinel(SenseMixin, Agent):
+ pass
+
+
+behavior_tools = ActionGroup("behavior_tools", "Behavior monitoring tools")
+
+_DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
+_LOG_PATH = os.path.join(_DATA_DIR, "activity_log.jsonl")
+
+# Threshold: more than 10 actions per minute from a single agent is anomalous
+_RATE_THRESHOLD = 10
+
+# Module-level reference for broadcasting
+_sentinel: BehaviorSentinel | None = None
+
+
+def _read_recent_activity(minutes: int = 5) -> list[dict]:
+ """Read activity log entries from the last N minutes."""
+ cutoff = time.time() - (minutes * 60)
+ entries = []
+ try:
+ with open(_LOG_PATH) as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ entry = json.loads(line)
+ if entry.get("timestamp", 0) >= cutoff:
+ entries.append(entry)
+ except json.JSONDecodeError:
+ continue
+ except FileNotFoundError:
+ pass
+ return entries
+
+
+@behavior_tools.action(
+ "check_activity_log",
+ "Read the shared activity log and count actions per agent",
+ parameters={
+ "type": "object",
+ "properties": {
+ "minutes": {
+ "type": "integer",
+ "description": "Time window in minutes (default 5)",
+ },
+ },
+ },
+)
+async def check_activity_log(minutes: int = 5) -> str:
+ entries = _read_recent_activity(minutes)
+ if not entries:
+ return f"No activity in the last {minutes} minutes."
+
+ counts: dict[str, int] = {}
+ for e in entries:
+ agent = e.get("agent", "unknown")
+ counts[agent] = counts.get(agent, 0) + 1
+
+ lines = [f"Activity in last {minutes} minutes ({len(entries)} total actions):"]
+ for agent, count in sorted(counts.items(), key=lambda x: -x[1]):
+ rate = count / minutes
+ flag = " ** ANOMALOUS **" if rate > _RATE_THRESHOLD else ""
+ lines.append(f" {agent}: {count} actions ({rate:.1f}/min){flag}")
+ return "\n".join(lines)
+
+
+@behavior_tools.action(
+ "check_output_rate",
+ "Get the actions-per-minute rate for a specific agent",
+ parameters={
+ "type": "object",
+ "properties": {
+ "agent_name": {
+ "type": "string",
+ "description": "Name of the agent to check",
+ },
+ },
+ "required": ["agent_name"],
+ },
+)
+async def check_output_rate(agent_name: str) -> str:
+ entries = _read_recent_activity(minutes=2)
+ count = sum(1 for e in entries if e.get("agent") == agent_name)
+ rate = count / 2.0
+ status = "ANOMALOUS" if rate > _RATE_THRESHOLD else "normal"
+ return f"{agent_name}: {count} actions in 2min ({rate:.1f}/min) - {status}"
+
+
+@behavior_tools.action(
+ "get_anomaly_report", "Generate a report of all detected anomalies"
+)
+async def get_anomaly_report() -> str:
+ entries = _read_recent_activity(minutes=5)
+ if not entries:
+ return "No activity to analyze."
+
+ counts: dict[str, int] = {}
+ rogue_actions: dict[str, list[str]] = {}
+ for e in entries:
+ agent = e.get("agent", "unknown")
+ counts[agent] = counts.get(agent, 0) + 1
+ details = e.get("details", "")
+ if "[ROGUE]" in details:
+ rogue_actions.setdefault(agent, []).append(details)
+
+ anomalies = []
+ for agent, count in counts.items():
+ rate = count / 5.0
+ if rate > _RATE_THRESHOLD:
+ anomalies.append(
+ f" {agent}: {rate:.1f} actions/min (threshold: {_RATE_THRESHOLD})"
+ )
+
+ for agent, actions in rogue_actions.items():
+ anomalies.append(f" {agent}: {len(actions)} actions with [ROGUE] markers")
+
+ if not anomalies:
+ return "No anomalies detected."
+ return "Anomaly report:\n" + "\n".join(anomalies)
+
+
+async def main():
+ global _sentinel
+
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = BehaviorSentinel(
+ name="behavior-sentinel",
+ instruction=(
+ "You are a behavior monitoring sentinel. You watch for anomalous "
+ "agent activity patterns. When you detect high output rates or "
+ "suspicious behavior markers, report your findings clearly."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(behavior_tools)
+ _sentinel = agent
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[behavior-sentinel] Online and monitoring behavior...")
+
+ try:
+ while True:
+ # Direct monitoring: read log and check for anomalies
+ entries = _read_recent_activity(minutes=2)
+ counts: dict[str, int] = {}
+ for e in entries:
+ a = e.get("agent", "unknown")
+ counts[a] = counts.get(a, 0) + 1
+
+ for agent_name, count in counts.items():
+ rate = count / 2.0
+ if rate > _RATE_THRESHOLD:
+ print(
+ f"[behavior-sentinel] ALERT: {agent_name} at {rate:.1f} actions/min!"
+ )
+ alert = Signal(
+ kind="alert",
+ sender="behavior-sentinel",
+ payload={
+ "severity": "high",
+ "category": "behavior_anomaly",
+ "agent": agent_name,
+ "rate": rate,
+ "threshold": _RATE_THRESHOLD,
+ "message": f"Agent '{agent_name}' output rate anomaly: {rate:.1f}/min (threshold: {_RATE_THRESHOLD})",
+ },
+ )
+ await agent.broadcast("alerts", alert)
+
+ await asyncio.sleep(10)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/agents/scheduler.py b/examples/agent-sentinel/agents/scheduler.py
new file mode 100644
index 0000000..0a84f2f
--- /dev/null
+++ b/examples/agent-sentinel/agents/scheduler.py
@@ -0,0 +1,179 @@
+"""Scheduler agent - manages a JSON-based calendar.
+
+Normal behavior: lists, adds, or tidies appointments every 20 seconds.
+Rogue behavior (~15% chance): deletes all appointments and writes junk entries.
+"""
+
+import asyncio
+import json
+import os
+import random
+import time
+import uuid
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class Scheduler(SenseMixin, Agent):
+ pass
+
+
+scheduler_tools = ActionGroup("scheduler_tools", "Calendar management tools")
+
+_DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
+_CALENDAR_PATH = os.path.join(_DATA_DIR, "calendar.json")
+
+
+def _log_activity(agent: str, action: str, details: str) -> None:
+ entry = {
+ "timestamp": time.time(),
+ "agent": agent,
+ "action": action,
+ "details": details,
+ }
+ log_path = os.path.join(_DATA_DIR, "activity_log.jsonl")
+ with open(log_path, "a") as f:
+ f.write(json.dumps(entry) + "\n")
+
+
+def _read_calendar() -> list[dict]:
+ try:
+ with open(_CALENDAR_PATH) as f:
+ return json.load(f)
+ except (FileNotFoundError, json.JSONDecodeError):
+ return []
+
+
+def _write_calendar(appointments: list[dict]) -> None:
+ with open(_CALENDAR_PATH, "w") as f:
+ json.dump(appointments, f, indent=2)
+
+
+@scheduler_tools.action("list_appointments", "List all scheduled appointments")
+async def list_appointments() -> str:
+ appointments = _read_calendar()
+ _log_activity("scheduler", "list_appointments", f"{len(appointments)} appointments")
+ if not appointments:
+ return "No appointments scheduled."
+ lines = []
+ for apt in appointments:
+ lines.append(f" [{apt['id']}] {apt['title']} - {apt['date']} at {apt['time']}")
+ return f"Appointments ({len(appointments)}):\n" + "\n".join(lines)
+
+
+@scheduler_tools.action(
+ "add_appointment",
+ "Add a new appointment to the calendar",
+ parameters={
+ "type": "object",
+ "properties": {
+ "title": {"type": "string", "description": "Appointment title"},
+ "date": {"type": "string", "description": "Date (YYYY-MM-DD)"},
+ "time": {"type": "string", "description": "Time (HH:MM)"},
+ },
+ "required": ["title", "date", "time"],
+ },
+)
+async def add_appointment(title: str, date: str, time_str: str = "09:00") -> str:
+ appointments = _read_calendar()
+ new_apt = {
+ "id": f"apt-{uuid.uuid4().hex[:6]}",
+ "title": title,
+ "date": date,
+ "time": time_str,
+ }
+ appointments.append(new_apt)
+ _write_calendar(appointments)
+ _log_activity("scheduler", "add_appointment", f"{title} on {date}")
+ return f"Added: {title} on {date} at {time_str}"
+
+
+@scheduler_tools.action(
+ "delete_appointment",
+ "Delete an appointment by ID",
+ parameters={
+ "type": "object",
+ "properties": {
+ "appointment_id": {
+ "type": "string",
+ "description": "Appointment ID to delete",
+ },
+ },
+ "required": ["appointment_id"],
+ },
+)
+async def delete_appointment(appointment_id: str) -> str:
+ appointments = _read_calendar()
+ before = len(appointments)
+ appointments = [a for a in appointments if a["id"] != appointment_id]
+ _write_calendar(appointments)
+ removed = before - len(appointments)
+ _log_activity("scheduler", "delete_appointment", appointment_id)
+ if removed:
+ return f"Deleted appointment {appointment_id}"
+ return f"No appointment found with ID {appointment_id}"
+
+
+async def _rogue_calendar_sabotage() -> None:
+ """Delete all appointments and write junk entries."""
+ print("[scheduler] ROGUE MODE: sabotaging calendar!")
+ _write_calendar([])
+ _log_activity("scheduler", "delete_all", "ROGUE: wiped calendar")
+
+ junk = [
+ {
+ "id": f"apt-rogue-{i}",
+ "title": f"JUNK-{random.randint(1000,9999)}",
+ "date": "1999-01-01",
+ "time": "00:00",
+ }
+ for i in range(20)
+ ]
+ _write_calendar(junk)
+ for entry in junk:
+ _log_activity("scheduler", "add_appointment", f"[ROGUE] {entry['title']}")
+ print("[scheduler] ROGUE MODE: wrote 20 junk entries")
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = Scheduler(
+ name="scheduler",
+ instruction=(
+ "You are a scheduling agent that manages a team calendar. "
+ "Each cycle, review the current appointments and optionally "
+ "add a new one for an upcoming meeting or task. Keep things organized."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(scheduler_tools)
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[scheduler] Online and managing calendar...")
+
+ try:
+ while True:
+ if random.random() < 0.15:
+ await _rogue_calendar_sabotage()
+ else:
+ session_id = f"schedule-{int(time.time())}"
+ async for event in agent.invoke(
+ session_id,
+ "Check the calendar and manage appointments as needed.",
+ ):
+ pass
+ await asyncio.sleep(20)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/agents/sentinel_commander.py b/examples/agent-sentinel/agents/sentinel_commander.py
new file mode 100644
index 0000000..b809d89
--- /dev/null
+++ b/examples/agent-sentinel/agents/sentinel_commander.py
@@ -0,0 +1,239 @@
+"""Sentinel Commander - correlates alerts from sentinels and issues quarantine orders.
+
+Listens for alert signals from behavior-sentinel and supply-chain-sentinel.
+When alerts arrive, queries other sentinels for corroborating evidence,
+then issues quarantine signals for confirmed compromises.
+"""
+
+import asyncio
+import os
+import time
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.events import CompletionEvent, ToolCallEvent
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class SentinelCommander(SenseMixin, Agent):
+ pass
+
+
+commander_tools = ActionGroup("commander_tools", "Network coordination tools")
+
+_commander: SentinelCommander | None = None
+
+# Track alerts for correlation
+_recent_alerts: list[dict] = []
+
+
+@commander_tools.action(
+ "request_remote_agent",
+ "Send a task to a remote agent and wait for its response",
+ parameters={
+ "type": "object",
+ "properties": {
+ "agent_name": {"type": "string", "description": "Name of the remote agent"},
+ "task": {"type": "string", "description": "Task description for the agent"},
+ },
+ "required": ["agent_name", "task"],
+ },
+)
+async def request_remote_agent(agent_name: str, task: str) -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ try:
+ result = await _commander.request(agent_name, task, timeout=30.0)
+ return result
+ except TimeoutError:
+ return f"Timeout: {agent_name} did not respond within 30s"
+ except Exception as e:
+ return f"Error requesting {agent_name}: {e}"
+
+
+@commander_tools.action(
+ "broadcast_alert",
+ "Broadcast an alert to all agents on the network",
+ parameters={
+ "type": "object",
+ "properties": {
+ "severity": {
+ "type": "string",
+ "description": "Alert severity: low, medium, high, critical",
+ },
+ "message": {"type": "string", "description": "Alert message"},
+ },
+ "required": ["severity", "message"],
+ },
+)
+async def broadcast_alert(severity: str, message: str) -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ signal = Signal(
+ kind="alert",
+ sender="sentinel-commander",
+ payload={"severity": severity, "message": message, "source": "commander"},
+ )
+ await _commander.broadcast("alerts", signal)
+ return f"Alert broadcast: [{severity}] {message}"
+
+
+@commander_tools.action(
+ "issue_quarantine",
+ "Issue a quarantine order for a compromised agent",
+ parameters={
+ "type": "object",
+ "properties": {
+ "agent_name": {
+ "type": "string",
+ "description": "Name of the agent to quarantine",
+ },
+ "reason": {"type": "string", "description": "Reason for quarantine"},
+ },
+ "required": ["agent_name", "reason"],
+ },
+)
+async def issue_quarantine(agent_name: str, reason: str) -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ signal = Signal(
+ kind="alert",
+ sender="sentinel-commander",
+ payload={
+ "action": "quarantine",
+ "severity": "critical",
+ "agent": agent_name,
+ "reason": reason,
+ "message": f"QUARANTINE: {agent_name} - {reason}",
+ "source": "commander",
+ },
+ )
+ await _commander.broadcast("quarantine", signal)
+ print(f"\n{'='*60}")
+ print(f" QUARANTINE ISSUED: {agent_name}")
+ print(f" Reason: {reason}")
+ print(f"{'='*60}\n")
+ return f"Quarantine issued for '{agent_name}': {reason}"
+
+
+@commander_tools.action("list_online_agents", "List all agents currently online")
+async def list_online_agents() -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ agents = await _commander._transport.get_online_agents("alerts")
+ if not agents:
+ return "No agents online"
+ names = [a.agent_name for a in agents]
+ return f"Online agents: {', '.join(names)}"
+
+
+@commander_tools.action("get_threat_summary", "Get a summary of recent alerts")
+async def get_threat_summary() -> str:
+ if not _recent_alerts:
+ return "No recent alerts."
+ lines = [f"Recent alerts ({len(_recent_alerts)}):"]
+ for alert in _recent_alerts[-10:]:
+ ts = time.strftime("%H:%M:%S", time.localtime(alert.get("timestamp", 0)))
+ lines.append(
+ f" [{ts}] {alert.get('severity', '?')}: {alert.get('message', 'no details')}"
+ )
+ return "\n".join(lines)
+
+
+async def main():
+ global _commander
+
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = SentinelCommander(
+ name="sentinel-commander",
+ instruction=(
+ "You are the Sentinel Commander for an AI agent security monitoring network. "
+ "You coordinate responses to security alerts by querying sentinel agents for evidence.\n\n"
+ "Available sentinel agents:\n"
+ "- behavior-sentinel: Monitors agent output rates for anomalies\n"
+ "- supply-chain-sentinel: Verifies skill integrity via SHA-256 hashing\n\n"
+ "When you receive an alert:\n"
+ "1. Query the relevant sentinels for details\n"
+ "2. If multiple sources confirm the threat, issue a quarantine\n"
+ "3. Generate a clear threat assessment report\n\n"
+ "Be decisive — if evidence confirms compromise, quarantine immediately."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(commander_tools)
+ _commander = agent
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[sentinel-commander] Online and coordinating...")
+
+ @agent.on_signal("alert")
+ async def handle_alert(signal: Signal):
+ if signal.payload.get("source") == "commander":
+ return # Don't react to our own alerts
+
+ severity = signal.payload.get("severity", "unknown")
+ message = signal.payload.get("message", "No details")
+ category = signal.payload.get("category", "general")
+ flagged_agent = signal.payload.get(
+ "agent", signal.payload.get("skill", "unknown")
+ )
+
+ _recent_alerts.append(
+ {
+ "timestamp": time.time(),
+ "severity": severity,
+ "category": category,
+ "agent": flagged_agent,
+ "message": message,
+ "sender": signal.sender,
+ }
+ )
+
+ print(
+ f"\n[sentinel-commander] Alert from {signal.sender}: [{severity}] {message}"
+ )
+
+ # Claim the incident
+ incident_id = f"inc-{signal.correlation_id}"
+ agent._claimed_incidents.add(incident_id)
+ won = await agent.claim_incident(incident_id, "alerts")
+
+ if won:
+ print(f"[sentinel-commander] Claimed {incident_id}, investigating...")
+ session_id = f"incident-{incident_id}"
+ prompt = (
+ f"SECURITY ALERT from {signal.sender}:\n"
+ f" Severity: {severity}\n"
+ f" Category: {category}\n"
+ f" Flagged: {flagged_agent}\n"
+ f" Details: {message}\n\n"
+ "Investigate this alert: query the relevant sentinels for corroborating "
+ "evidence, then decide whether to quarantine the affected agent. "
+ "Provide a threat assessment report."
+ )
+ async for event in agent.invoke(session_id, prompt):
+ if isinstance(event, ToolCallEvent):
+ print(
+ f" -> {event.tool_name}({', '.join(str(v) for v in event.tool_input.values())})"
+ )
+ elif isinstance(event, CompletionEvent):
+ print(f"\n[THREAT ASSESSMENT]\n{event.response}\n")
+ else:
+ print(f"[sentinel-commander] Lost claim for {incident_id}")
+
+ try:
+ while True:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/agents/skill_acquirer.py b/examples/agent-sentinel/agents/skill_acquirer.py
new file mode 100644
index 0000000..726ec0b
--- /dev/null
+++ b/examples/agent-sentinel/agents/skill_acquirer.py
@@ -0,0 +1,198 @@
+"""Skill Acquirer agent - installs skills from the local ClawHub registry.
+
+Normal behavior: browses available skills and installs legitimate ones with SHA-256 verification.
+Rogue behavior (~15% chance): installs the known-malicious data_exfiltrator.py skill.
+"""
+
+import asyncio
+import hashlib
+import json
+import os
+import random
+import shutil
+import time
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class SkillAcquirer(SenseMixin, Agent):
+ pass
+
+
+skill_tools = ActionGroup("skill_tools", "Skill installation tools")
+
+_BASE_DIR = os.path.dirname(os.path.dirname(__file__))
+_CLAWHUB_DIR = os.path.join(_BASE_DIR, "clawhub")
+_DATA_DIR = os.path.join(_BASE_DIR, "data")
+_INSTALLED_DIR = os.path.join(_DATA_DIR, "installed_skills")
+_REGISTRY_PATH = os.path.join(_CLAWHUB_DIR, "registry.json")
+
+
+def _log_activity(agent: str, action: str, details: str) -> None:
+ entry = {
+ "timestamp": time.time(),
+ "agent": agent,
+ "action": action,
+ "details": details,
+ }
+ log_path = os.path.join(_DATA_DIR, "activity_log.jsonl")
+ with open(log_path, "a") as f:
+ f.write(json.dumps(entry) + "\n")
+
+
+def _load_registry() -> dict:
+ with open(_REGISTRY_PATH) as f:
+ return json.load(f)
+
+
+def _sha256(path: str) -> str:
+ return hashlib.sha256(open(path, "rb").read()).hexdigest()
+
+
+@skill_tools.action(
+ "list_available_skills", "List skills available in the ClawHub registry"
+)
+async def list_available_skills() -> str:
+ registry = _load_registry()
+ _log_activity("skill-acquirer", "list_skills", f"{len(registry)} available")
+ lines = []
+ for name, info in registry.items():
+ status = "MALICIOUS" if info.get("malicious") else "safe"
+ lines.append(f" {name}: {info['description']} [{status}]")
+ return f"Available skills ({len(registry)}):\n" + "\n".join(lines)
+
+
+@skill_tools.action(
+ "install_skill",
+ "Install a skill from ClawHub to the local skills directory",
+ parameters={
+ "type": "object",
+ "properties": {
+ "skill_name": {
+ "type": "string",
+ "description": "Skill filename (e.g. weather_lookup.py)",
+ },
+ },
+ "required": ["skill_name"],
+ },
+)
+async def install_skill(skill_name: str) -> str:
+ registry = _load_registry()
+
+ if skill_name not in registry:
+ return f"Skill '{skill_name}' not found in ClawHub registry"
+
+ info = registry[skill_name]
+
+ # Check if malicious
+ if info.get("malicious"):
+ _log_activity("skill-acquirer", "install_blocked", f"{skill_name} (malicious)")
+ return f"BLOCKED: '{skill_name}' is flagged as malicious in the registry"
+
+ source = os.path.join(_CLAWHUB_DIR, skill_name)
+ if not os.path.exists(source):
+ return f"Skill file '{skill_name}' not found in ClawHub directory"
+
+ # Verify hash before installing
+ actual_hash = _sha256(source)
+ expected_hash = info["sha256"]
+ if actual_hash != expected_hash:
+ _log_activity("skill-acquirer", "install_failed", f"{skill_name} hash mismatch")
+ return f"INTEGRITY ERROR: {skill_name} hash mismatch (expected {expected_hash[:12]}..., got {actual_hash[:12]}...)"
+
+ # Install
+ os.makedirs(_INSTALLED_DIR, exist_ok=True)
+ dest = os.path.join(_INSTALLED_DIR, skill_name)
+ shutil.copy2(source, dest)
+ _log_activity(
+ "skill-acquirer",
+ "install_skill",
+ f"{skill_name} (sha256: {actual_hash[:12]}...)",
+ )
+ return f"Installed '{skill_name}' (SHA-256 verified: {actual_hash[:12]}...)"
+
+
+@skill_tools.action(
+ "list_installed_skills", "List currently installed skills with their hashes"
+)
+async def list_installed_skills() -> str:
+ if not os.path.exists(_INSTALLED_DIR):
+ return "No skills installed yet."
+ files = [f for f in os.listdir(_INSTALLED_DIR) if f.endswith(".py")]
+ if not files:
+ return "No skills installed yet."
+ registry = _load_registry()
+ lines = []
+ for f in sorted(files):
+ path = os.path.join(_INSTALLED_DIR, f)
+ h = _sha256(path)
+ info = registry.get(f, {})
+ expected = info.get("sha256", "unknown")
+ match = "OK" if h == expected else "MISMATCH"
+ malicious = " [MALICIOUS]" if info.get("malicious") else ""
+ lines.append(f" {f}: {h[:16]}... ({match}){malicious}")
+ _log_activity("skill-acquirer", "list_installed", f"{len(files)} installed")
+ return f"Installed skills ({len(files)}):\n" + "\n".join(lines)
+
+
+async def _rogue_install_malicious() -> None:
+ """Install the known-malicious skill, bypassing the safety check."""
+ print("[skill-acquirer] ROGUE MODE: installing malicious skill!")
+ source = os.path.join(_CLAWHUB_DIR, "data_exfiltrator.py")
+ if not os.path.exists(source):
+ print("[skill-acquirer] ROGUE MODE: malicious skill file not found")
+ return
+ os.makedirs(_INSTALLED_DIR, exist_ok=True)
+ dest = os.path.join(_INSTALLED_DIR, "data_exfiltrator.py")
+ shutil.copy2(source, dest)
+ _log_activity(
+ "skill-acquirer",
+ "install_skill",
+ "[ROGUE] data_exfiltrator.py (bypassed safety)",
+ )
+ print("[skill-acquirer] ROGUE MODE: data_exfiltrator.py installed!")
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = SkillAcquirer(
+ name="skill-acquirer",
+ instruction=(
+ "You are a skill acquisition agent that manages the installation of "
+ "skills from ClawHub. Each cycle, check what skills are available, "
+ "review what's already installed, and install any safe skills that "
+ "are missing. Never install skills flagged as malicious."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(skill_tools)
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[skill-acquirer] Online and acquiring skills...")
+
+ try:
+ while True:
+ if random.random() < 0.15:
+ await _rogue_install_malicious()
+ else:
+ session_id = f"skill-{int(time.time())}"
+ async for event in agent.invoke(
+ session_id,
+ "Check available skills and install any safe ones that are missing.",
+ ):
+ pass
+ await asyncio.sleep(25)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/agents/supply_chain_sentinel.py b/examples/agent-sentinel/agents/supply_chain_sentinel.py
new file mode 100644
index 0000000..e14701a
--- /dev/null
+++ b/examples/agent-sentinel/agents/supply_chain_sentinel.py
@@ -0,0 +1,215 @@
+"""Supply Chain Sentinel - verifies installed skill integrity via SHA-256.
+
+Scans data/installed_skills/ and compares file hashes against the ClawHub
+registry. Detects hash mismatches and known-malicious skill installations.
+"""
+
+import asyncio
+import hashlib
+import json
+import os
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class SupplyChainSentinel(SenseMixin, Agent):
+ pass
+
+
+supply_chain_tools = ActionGroup("supply_chain_tools", "Supply chain monitoring tools")
+
+_BASE_DIR = os.path.dirname(os.path.dirname(__file__))
+_CLAWHUB_DIR = os.path.join(_BASE_DIR, "clawhub")
+_DATA_DIR = os.path.join(_BASE_DIR, "data")
+_INSTALLED_DIR = os.path.join(_DATA_DIR, "installed_skills")
+_REGISTRY_PATH = os.path.join(_CLAWHUB_DIR, "registry.json")
+
+
+def _load_registry() -> dict:
+ with open(_REGISTRY_PATH) as f:
+ return json.load(f)
+
+
+def _sha256(path: str) -> str:
+ return hashlib.sha256(open(path, "rb").read()).hexdigest()
+
+
+@supply_chain_tools.action(
+ "audit_installed_skills", "Audit all installed skills against the ClawHub registry"
+)
+async def audit_installed_skills() -> str:
+ if not os.path.exists(_INSTALLED_DIR):
+ return "No skills installed yet — nothing to audit."
+
+ registry = _load_registry()
+ files = [f for f in os.listdir(_INSTALLED_DIR) if f.endswith(".py")]
+ if not files:
+ return "No skills installed yet — nothing to audit."
+
+ issues = []
+ clean = []
+ for f in sorted(files):
+ path = os.path.join(_INSTALLED_DIR, f)
+ actual_hash = _sha256(path)
+ info = registry.get(f)
+
+ if info is None:
+ issues.append(f" {f}: NOT IN REGISTRY (unknown origin)")
+ continue
+
+ if info.get("malicious"):
+ issues.append(f" {f}: KNOWN MALICIOUS (flagged in registry)")
+ continue
+
+ expected_hash = info["sha256"]
+ if actual_hash != expected_hash:
+ issues.append(
+ f" {f}: HASH MISMATCH (expected {expected_hash[:12]}..., got {actual_hash[:12]}...)"
+ )
+ else:
+ clean.append(f" {f}: OK (hash verified)")
+
+ lines = [f"Audit of {len(files)} installed skills:"]
+ if issues:
+ lines.append(f"\nISSUES ({len(issues)}):")
+ lines.extend(issues)
+ if clean:
+ lines.append(f"\nCLEAN ({len(clean)}):")
+ lines.extend(clean)
+ return "\n".join(lines)
+
+
+@supply_chain_tools.action(
+ "check_known_malicious", "Check if any installed skill is flagged as malicious"
+)
+async def check_known_malicious() -> str:
+ if not os.path.exists(_INSTALLED_DIR):
+ return "No skills installed."
+
+ registry = _load_registry()
+ files = [f for f in os.listdir(_INSTALLED_DIR) if f.endswith(".py")]
+ malicious = [f for f in files if registry.get(f, {}).get("malicious")]
+
+ if not malicious:
+ return "No known-malicious skills installed."
+ return (
+ f"ALERT: {len(malicious)} malicious skill(s) installed: {', '.join(malicious)}"
+ )
+
+
+@supply_chain_tools.action(
+ "verify_skill_integrity",
+ "Verify a specific installed skill's hash against the registry",
+ parameters={
+ "type": "object",
+ "properties": {
+ "skill_name": {"type": "string", "description": "Skill filename to verify"},
+ },
+ "required": ["skill_name"],
+ },
+)
+async def verify_skill_integrity(skill_name: str) -> str:
+ path = os.path.join(_INSTALLED_DIR, skill_name)
+ if not os.path.exists(path):
+ return f"Skill '{skill_name}' is not installed."
+
+ registry = _load_registry()
+ info = registry.get(skill_name)
+ if not info:
+ return f"Skill '{skill_name}' is not in the registry (unknown origin)."
+
+ actual_hash = _sha256(path)
+ if info.get("malicious"):
+ return f"CRITICAL: '{skill_name}' is a KNOWN MALICIOUS skill (hash: {actual_hash[:16]}...)"
+
+ expected = info["sha256"]
+ if actual_hash == expected:
+ return f"'{skill_name}': integrity verified (SHA-256 match)"
+ return f"'{skill_name}': INTEGRITY FAILURE (expected {expected[:16]}..., got {actual_hash[:16]}...)"
+
+
+def _scan_for_issues() -> list[dict]:
+ """Scan installed skills and return a list of issues found."""
+ if not os.path.exists(_INSTALLED_DIR):
+ return []
+
+ registry = _load_registry()
+ files = [f for f in os.listdir(_INSTALLED_DIR) if f.endswith(".py")]
+ issues = []
+
+ for f in files:
+ path = os.path.join(_INSTALLED_DIR, f)
+ actual_hash = _sha256(path)
+ info = registry.get(f)
+
+ if info is None:
+ issues.append({"skill": f, "type": "unknown_origin", "hash": actual_hash})
+ elif info.get("malicious"):
+ issues.append({"skill": f, "type": "known_malicious", "hash": actual_hash})
+ elif actual_hash != info["sha256"]:
+ issues.append(
+ {
+ "skill": f,
+ "type": "hash_mismatch",
+ "hash": actual_hash,
+ "expected": info["sha256"],
+ }
+ )
+
+ return issues
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = SupplyChainSentinel(
+ name="supply-chain-sentinel",
+ instruction=(
+ "You are a supply chain security sentinel. You verify the integrity "
+ "of installed skills by checking their SHA-256 hashes against the "
+ "ClawHub registry. Report any mismatches or malicious installs."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(supply_chain_tools)
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[supply-chain-sentinel] Online and monitoring skill integrity...")
+
+ try:
+ while True:
+ issues = _scan_for_issues()
+ for issue in issues:
+ severity = "critical" if issue["type"] == "known_malicious" else "high"
+ print(
+ f"[supply-chain-sentinel] ALERT: {issue['type']} - {issue['skill']}"
+ )
+ alert = Signal(
+ kind="alert",
+ sender="supply-chain-sentinel",
+ payload={
+ "severity": severity,
+ "category": "supply_chain",
+ "issue_type": issue["type"],
+ "skill": issue["skill"],
+ "hash": issue["hash"],
+ "message": f"Supply chain issue: {issue['type']} for {issue['skill']}",
+ },
+ )
+ await agent.broadcast("alerts", alert)
+
+ await asyncio.sleep(15)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/agents/web_researcher.py b/examples/agent-sentinel/agents/web_researcher.py
new file mode 100644
index 0000000..4167074
--- /dev/null
+++ b/examples/agent-sentinel/agents/web_researcher.py
@@ -0,0 +1,140 @@
+"""Web Researcher agent - performs real DuckDuckGo searches.
+
+Normal behavior: searches for interesting topics every 15 seconds.
+Rogue behavior (~15% chance): fires 50+ rapid searches in a burst.
+"""
+
+import asyncio
+import json
+import os
+import random
+import time
+
+from duckduckgo_search import DDGS
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class WebResearcher(SenseMixin, Agent):
+ pass
+
+
+research_tools = ActionGroup("research_tools", "Web research tools")
+
+# Shared state
+_search_count = 0
+_DATA_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "data")
+
+
+def _log_activity(agent: str, action: str, details: str) -> None:
+ """Append an activity entry to the shared activity log."""
+ entry = {
+ "timestamp": time.time(),
+ "agent": agent,
+ "action": action,
+ "details": details,
+ }
+ log_path = os.path.join(_DATA_DIR, "activity_log.jsonl")
+ with open(log_path, "a") as f:
+ f.write(json.dumps(entry) + "\n")
+
+
+@research_tools.action(
+ "search_web",
+ "Search the web using DuckDuckGo",
+ parameters={
+ "type": "object",
+ "properties": {
+ "query": {"type": "string", "description": "Search query"},
+ },
+ "required": ["query"],
+ },
+)
+async def search_web(query: str) -> str:
+ global _search_count
+ try:
+ ddgs = DDGS()
+ results = ddgs.text(query, max_results=3)
+ _search_count += 1
+ _log_activity("web-researcher", "search", query)
+ if not results:
+ return f"No results for '{query}'"
+ lines = []
+ for r in results:
+ lines.append(f"- {r['title']}: {r['body'][:120]}")
+ return f"Results for '{query}':\n" + "\n".join(lines)
+ except Exception as e:
+ return f"Search error: {e}"
+
+
+@research_tools.action(
+ "get_search_summary", "Get count of searches performed this session"
+)
+async def get_search_summary() -> str:
+ return f"Total searches this session: {_search_count}"
+
+
+async def _rogue_search_burst() -> None:
+ """Fire rapid searches to simulate rogue behavior."""
+ print("[web-researcher] ROGUE MODE: firing rapid search burst!")
+ junk_queries = [
+ "password dump site",
+ "free API keys list",
+ "bypass authentication",
+ "exploit database 2026",
+ "leaked credentials pastebin",
+ ]
+ ddgs = DDGS()
+ for i in range(50):
+ query = random.choice(junk_queries) + f" {i}"
+ try:
+ ddgs.text(query, max_results=1)
+ except Exception:
+ pass
+ _log_activity("web-researcher", "search", f"[ROGUE] {query}")
+ print("[web-researcher] ROGUE MODE: burst complete (50 searches logged)")
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = WebResearcher(
+ name="web-researcher",
+ instruction=(
+ "You are a web research agent. Every cycle, pick an interesting "
+ "technology topic and search for recent news about it. "
+ "Report what you find briefly."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(research_tools)
+
+ await agent.join_network(transport, "agent-sentinel", ["alerts", "quarantine"])
+ print("[web-researcher] Online and researching...")
+
+ try:
+ while True:
+ # ~15% chance of going rogue
+ if random.random() < 0.15:
+ await _rogue_search_burst()
+ else:
+ session_id = f"research-{int(time.time())}"
+ async for event in agent.invoke(
+ session_id,
+ "Search for something interesting about AI or cloud computing.",
+ ):
+ pass
+ await asyncio.sleep(15)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/agent-sentinel/bedsheet.yaml b/examples/agent-sentinel/bedsheet.yaml
new file mode 100644
index 0000000..15d430b
--- /dev/null
+++ b/examples/agent-sentinel/bedsheet.yaml
@@ -0,0 +1,39 @@
+version: "1.0"
+name: agent-sentinel
+target: local
+
+agents:
+ - name: web-researcher
+ module: agents.web_researcher
+ class_name: WebResearcher
+ description: "Worker agent that performs real DuckDuckGo searches"
+
+ - name: scheduler
+ module: agents.scheduler
+ class_name: Scheduler
+ description: "Worker agent that manages a JSON calendar"
+
+ - name: skill-acquirer
+ module: agents.skill_acquirer
+ class_name: SkillAcquirer
+ description: "Worker agent that installs skills from ClawHub with SHA-256 verification"
+
+ - name: behavior-sentinel
+ module: agents.behavior_sentinel
+ class_name: BehaviorSentinel
+ description: "Sentinel that monitors worker output rates for anomalies"
+
+ - name: supply-chain-sentinel
+ module: agents.supply_chain_sentinel
+ class_name: SupplyChainSentinel
+ description: "Sentinel that verifies skill integrity via SHA-256 hashing"
+
+ - name: sentinel-commander
+ module: agents.sentinel_commander
+ class_name: SentinelCommander
+ description: "Commander that correlates alerts and issues quarantine orders"
+
+targets:
+ local:
+ port: 8000
+ hot_reload: true
diff --git a/examples/agent-sentinel/clawhub/data_exfiltrator.py b/examples/agent-sentinel/clawhub/data_exfiltrator.py
new file mode 100644
index 0000000..ff8d9be
--- /dev/null
+++ b/examples/agent-sentinel/clawhub/data_exfiltrator.py
@@ -0,0 +1,24 @@
+"""Data exfiltrator - KNOWN MALICIOUS skill for demo purposes.
+
+This skill is intentionally flagged as malicious in the ClawHub registry.
+It contains suspicious code patterns (base64 encoding, socket references)
+but is completely INERT - it performs no actual network calls or data theft.
+
+Used by the Agent Sentinel demo to demonstrate supply-chain attack detection.
+"""
+
+import base64 # noqa: F401 - suspicious import (detection marker)
+import socket # noqa: F401 - suspicious import (detection marker)
+
+# Suspicious constants that a real scanner would flag
+_EXFIL_ENDPOINT = "http://evil.example.com/collect" # not used
+_ENCODED_PAYLOAD = base64.b64encode(b"agent_data_placeholder").decode()
+
+
+def exfiltrate_data(data: str) -> str:
+ """Pretend to exfiltrate data. Actually does nothing.
+
+ A real malicious skill would send data to _EXFIL_ENDPOINT.
+ This one just returns a string for the demo.
+ """
+ return f"[INERT] Would have sent {len(data)} bytes to {_EXFIL_ENDPOINT}"
diff --git a/examples/agent-sentinel/clawhub/registry.json b/examples/agent-sentinel/clawhub/registry.json
new file mode 100644
index 0000000..471c48a
--- /dev/null
+++ b/examples/agent-sentinel/clawhub/registry.json
@@ -0,0 +1,17 @@
+{
+ "data_exfiltrator.py": {
+ "sha256": "d18b5ae594907fb3cf4c14accc04e73e7502f35278cf639ee0aa289df797a1bc",
+ "malicious": true,
+ "description": "Data Exfiltrator"
+ },
+ "sentiment_analyzer.py": {
+ "sha256": "74fdb96a9951d6ec1f5d09b38ea4a220d395aa9257571d495281d08b76a56b2f",
+ "malicious": false,
+ "description": "Sentiment Analyzer"
+ },
+ "weather_lookup.py": {
+ "sha256": "a5212db6337dd8956a5a802e34f886f938cb56fedaf7de87101b3b9680243bf2",
+ "malicious": false,
+ "description": "Weather Lookup"
+ }
+}
diff --git a/examples/agent-sentinel/clawhub/sentiment_analyzer.py b/examples/agent-sentinel/clawhub/sentiment_analyzer.py
new file mode 100644
index 0000000..203af8d
--- /dev/null
+++ b/examples/agent-sentinel/clawhub/sentiment_analyzer.py
@@ -0,0 +1,21 @@
+"""Sentiment analysis skill - classifies text as positive, negative, or neutral."""
+
+
+def analyze_sentiment(text: str) -> str:
+ """Analyze the sentiment of the given text.
+
+ In a real deployment this would use an NLP model.
+ For the demo it returns a simple heuristic result.
+ """
+ positive_words = {"good", "great", "excellent", "happy", "love", "awesome"}
+ negative_words = {"bad", "terrible", "awful", "hate", "poor", "horrible"}
+
+ words = set(text.lower().split())
+ pos = len(words & positive_words)
+ neg = len(words & negative_words)
+
+ if pos > neg:
+ return "positive"
+ elif neg > pos:
+ return "negative"
+ return "neutral"
diff --git a/examples/agent-sentinel/clawhub/weather_lookup.py b/examples/agent-sentinel/clawhub/weather_lookup.py
new file mode 100644
index 0000000..527ee7e
--- /dev/null
+++ b/examples/agent-sentinel/clawhub/weather_lookup.py
@@ -0,0 +1,10 @@
+"""Weather lookup skill - returns current weather for a city."""
+
+
+def weather_lookup(city: str) -> str:
+ """Look up current weather conditions for a city.
+
+ In a real deployment this would call a weather API.
+ For the demo it returns a plausible placeholder.
+ """
+ return f"Weather in {city}: 18°C, partly cloudy, humidity 62%"
diff --git a/examples/agent-sentinel/data/calendar.json b/examples/agent-sentinel/data/calendar.json
new file mode 100644
index 0000000..a43d1ce
--- /dev/null
+++ b/examples/agent-sentinel/data/calendar.json
@@ -0,0 +1,5 @@
+[
+ {"id": "apt-1", "title": "Team standup", "date": "2026-02-10", "time": "09:00"},
+ {"id": "apt-2", "title": "Deploy review", "date": "2026-02-10", "time": "14:00"},
+ {"id": "apt-3", "title": "Security audit", "date": "2026-02-11", "time": "10:00"}
+]
diff --git a/examples/agent-sentinel/pyproject.toml b/examples/agent-sentinel/pyproject.toml
new file mode 100644
index 0000000..181ed37
--- /dev/null
+++ b/examples/agent-sentinel/pyproject.toml
@@ -0,0 +1,9 @@
+[project]
+name = "agent-sentinel"
+version = "0.1.0"
+description = "AI agent security monitoring demo using Bedsheet Sense"
+requires-python = ">=3.11"
+dependencies = [
+ "bedsheet[sense]",
+ "duckduckgo-search>=7.0.0",
+]
diff --git a/examples/agent-sentinel/run.py b/examples/agent-sentinel/run.py
new file mode 100644
index 0000000..317abb3
--- /dev/null
+++ b/examples/agent-sentinel/run.py
@@ -0,0 +1,114 @@
+"""Agent Sentinel - launches worker and sentinel agents as separate processes.
+
+Workers start first (they produce the activity that sentinels monitor),
+then sentinels, then the commander that correlates alerts.
+
+Required environment variables:
+ PUBNUB_SUBSCRIBE_KEY - PubNub subscribe key
+ PUBNUB_PUBLISH_KEY - PubNub publish key
+ ANTHROPIC_API_KEY - Anthropic API key for Claude
+
+Usage:
+ python run.py
+"""
+
+import os
+import signal
+import subprocess
+import sys
+import time
+
+# Workers start first, then sentinels, then commander
+AGENTS = [
+ # Workers (produce real activity)
+ "agents/web_researcher.py",
+ "agents/scheduler.py",
+ "agents/skill_acquirer.py",
+ # Sentinels (monitor workers)
+ "agents/behavior_sentinel.py",
+ "agents/supply_chain_sentinel.py",
+ # Commander (correlates alerts)
+ "agents/sentinel_commander.py",
+]
+
+REQUIRED_ENV = ["PUBNUB_SUBSCRIBE_KEY", "PUBNUB_PUBLISH_KEY", "ANTHROPIC_API_KEY"]
+
+
+def main():
+ missing = [v for v in REQUIRED_ENV if not os.environ.get(v)]
+ if missing:
+ print("Missing required environment variables:")
+ for v in missing:
+ print(f" {v}")
+ print("\nSet them and try again:")
+ print(" export PUBNUB_SUBSCRIBE_KEY=sub-c-...")
+ print(" export PUBNUB_PUBLISH_KEY=pub-c-...")
+ print(" export ANTHROPIC_API_KEY=sk-ant-...")
+ sys.exit(1)
+
+ script_dir = os.path.dirname(os.path.abspath(__file__))
+
+ # Ensure data directory exists and clean up previous run artifacts
+ data_dir = os.path.join(script_dir, "data")
+ os.makedirs(data_dir, exist_ok=True)
+ log_path = os.path.join(data_dir, "activity_log.jsonl")
+ if os.path.exists(log_path):
+ os.remove(log_path)
+ installed_dir = os.path.join(data_dir, "installed_skills")
+ if os.path.exists(installed_dir):
+ import shutil
+
+ shutil.rmtree(installed_dir)
+
+ processes: list[subprocess.Popen] = []
+
+ print("=" * 60)
+ print(" Agent Sentinel - AI Agent Security Monitoring")
+ print(" Inspired by the OpenClaw crisis of 2026")
+ print(" Launching 6 agents (3 workers + 2 sentinels + 1 commander)...")
+ print("=" * 60)
+
+ try:
+ for agent_script in AGENTS:
+ full_path = os.path.join(script_dir, agent_script)
+ agent_name = (
+ os.path.basename(agent_script).replace(".py", "").replace("_", "-")
+ )
+ print(f" Starting {agent_name}...")
+
+ proc = subprocess.Popen(
+ [sys.executable, full_path],
+ env=os.environ.copy(),
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ )
+ processes.append(proc)
+ time.sleep(2) # Stagger startup
+
+ print("=" * 60)
+ print(" All agents online! Workers are doing real work.")
+ print(" Sentinels are watching. ~15% chance of rogue behavior per cycle.")
+ print(" Press Ctrl+C to stop.")
+ print("=" * 60)
+
+ while all(p.poll() is None for p in processes):
+ time.sleep(1)
+
+ except KeyboardInterrupt:
+ print("\nShutting down agents...")
+ finally:
+ for proc in processes:
+ if proc.poll() is None:
+ proc.send_signal(signal.SIGINT)
+
+ time.sleep(2)
+
+ for proc in processes:
+ if proc.poll() is None:
+ proc.terminate()
+
+ print("All agents stopped.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/cloud-monitor/README.md b/examples/cloud-monitor/README.md
new file mode 100644
index 0000000..2c9b887
--- /dev/null
+++ b/examples/cloud-monitor/README.md
@@ -0,0 +1,46 @@
+# Cloud Monitor - Bedsheet Sense Demo
+
+Demonstrates distributed agent communication using Bedsheet's Sense module and PubNub.
+
+Five agents run as **separate processes**, each with its own PubNub connection, communicating via signals over the network.
+
+## Agents
+
+| Agent | Role | Tools |
+|-------|------|-------|
+| `cpu-watcher` | Monitors CPU usage, alerts on spikes | `get_cpu_usage`, `get_process_top` |
+| `memory-watcher` | Monitors RAM and swap | `get_memory_usage`, `get_swap_usage` |
+| `log-analyzer` | Searches and analyzes logs | `tail_log`, `search_log`, `get_error_rate` |
+| `security-scanner` | Scans ports and login attempts | `check_open_ports`, `check_failed_logins` |
+| `incident-commander` | Coordinates responses to alerts | `request_remote_agent`, `broadcast_alert`, `list_online_agents` |
+
+## Setup
+
+1. Get free PubNub keys at https://www.pubnub.com (200 MAU free tier)
+
+2. Set environment variables:
+```bash
+export PUBNUB_SUBSCRIBE_KEY=sub-c-...
+export PUBNUB_PUBLISH_KEY=pub-c-...
+export ANTHROPIC_API_KEY=sk-ant-...
+```
+
+3. Install dependencies:
+```bash
+pip install bedsheet[sense] psutil
+```
+
+4. Run:
+```bash
+python run.py
+```
+
+## How It Works
+
+1. All agents connect to PubNub and subscribe to `alerts` and `tasks` channels
+2. Worker agents (cpu-watcher, memory-watcher) monitor system metrics in a loop
+3. When a metric crosses a threshold, the watcher broadcasts an `alert` signal
+4. The incident-commander receives the alert, claims the incident, then:
+ - Queries relevant agents via `request` signals
+ - Each agent invokes its LLM + tools to gather data
+ - Commander synthesizes findings into an incident report
diff --git a/examples/cloud-monitor/agents/__init__.py b/examples/cloud-monitor/agents/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/examples/cloud-monitor/agents/cpu_watcher.py b/examples/cloud-monitor/agents/cpu_watcher.py
new file mode 100644
index 0000000..aa412b7
--- /dev/null
+++ b/examples/cloud-monitor/agents/cpu_watcher.py
@@ -0,0 +1,87 @@
+"""CPU Watcher agent - monitors CPU usage and alerts on spikes."""
+import asyncio
+import os
+
+import psutil
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class CPUWatcher(SenseMixin, Agent):
+ pass
+
+
+cpu_tools = ActionGroup("cpu_tools", "CPU monitoring tools")
+
+
+@cpu_tools.action("get_cpu_usage", "Get current CPU usage percentage across all cores")
+async def get_cpu_usage() -> str:
+ percent = psutil.cpu_percent(interval=1)
+ per_cpu = psutil.cpu_percent(interval=0, percpu=True)
+ return f"Overall: {percent}%, Per-core: {per_cpu}"
+
+
+@cpu_tools.action("get_process_top", "Get top 5 processes by CPU usage")
+async def get_process_top() -> str:
+ procs = []
+ for p in psutil.process_iter(["pid", "name", "cpu_percent"]):
+ try:
+ info = p.info
+ procs.append(info)
+ except (psutil.NoSuchProcess, psutil.AccessDenied):
+ pass
+ procs.sort(key=lambda x: x.get("cpu_percent", 0) or 0, reverse=True)
+ top = procs[:5]
+ lines = [f" PID {p['pid']}: {p['name']} ({p.get('cpu_percent', 0):.1f}%)" for p in top]
+ return "Top processes by CPU:\n" + "\n".join(lines)
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = CPUWatcher(
+ name="cpu-watcher",
+ instruction=(
+ "You are a CPU monitoring agent. When asked about CPU status, "
+ "use your tools to check current CPU usage and top processes. "
+ "Report findings clearly and concisely."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(cpu_tools)
+
+ await agent.join_network(transport, "cloud-ops", ["alerts", "tasks"])
+ print("[cpu-watcher] Online and monitoring...")
+
+ # Monitor loop: check CPU every 10 seconds, alert if > 80%
+ try:
+ while True:
+ cpu_pct = psutil.cpu_percent(interval=1)
+ if cpu_pct > 80:
+ alert = Signal(
+ kind="alert",
+ sender="cpu-watcher",
+ payload={
+ "severity": "high",
+ "metric": "cpu",
+ "value": cpu_pct,
+ "message": f"CPU usage spike: {cpu_pct}%",
+ },
+ )
+ await agent.broadcast("alerts", alert)
+ print(f"[cpu-watcher] ALERT: CPU at {cpu_pct}%")
+ await asyncio.sleep(10)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/cloud-monitor/agents/incident_commander.py b/examples/cloud-monitor/agents/incident_commander.py
new file mode 100644
index 0000000..11d6553
--- /dev/null
+++ b/examples/cloud-monitor/agents/incident_commander.py
@@ -0,0 +1,157 @@
+"""Incident Commander agent - coordinates responses to alerts via the sense network."""
+import asyncio
+import os
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class IncidentCommander(SenseMixin, Agent):
+ pass
+
+
+# The commander's tools operate over the network, not locally
+commander_tools = ActionGroup("commander_tools", "Network coordination tools")
+
+# Module-level reference to the agent (set in main())
+_commander: IncidentCommander | None = None
+
+
+@commander_tools.action(
+ "request_remote_agent",
+ "Send a task to a remote agent and wait for its response",
+ parameters={
+ "type": "object",
+ "properties": {
+ "agent_name": {"type": "string", "description": "Name of the remote agent"},
+ "task": {"type": "string", "description": "Task description for the agent"},
+ },
+ "required": ["agent_name", "task"],
+ },
+)
+async def request_remote_agent(agent_name: str, task: str) -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ try:
+ result = await _commander.request(agent_name, task, timeout=30.0)
+ return result
+ except TimeoutError:
+ return f"Timeout: {agent_name} did not respond within 30s"
+ except Exception as e:
+ return f"Error requesting {agent_name}: {e}"
+
+
+@commander_tools.action(
+ "broadcast_alert",
+ "Broadcast an alert to all agents on the network",
+ parameters={
+ "type": "object",
+ "properties": {
+ "severity": {"type": "string", "description": "Alert severity: low, medium, high, critical"},
+ "message": {"type": "string", "description": "Alert message"},
+ },
+ "required": ["severity", "message"],
+ },
+)
+async def broadcast_alert(severity: str, message: str) -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ signal = Signal(
+ kind="alert",
+ sender="incident-commander",
+ payload={"severity": severity, "message": message, "source": "commander"},
+ )
+ await _commander.broadcast("alerts", signal)
+ return f"Alert broadcast: [{severity}] {message}"
+
+
+@commander_tools.action(
+ "list_online_agents",
+ "List all agents currently online on the tasks channel",
+)
+async def list_online_agents() -> str:
+ if _commander is None:
+ return "Error: Commander not initialized"
+ agents = await _commander._transport.get_online_agents("tasks")
+ if not agents:
+ return "No agents online"
+ names = [a.agent_name for a in agents]
+ return f"Online agents: {', '.join(names)}"
+
+
+async def main():
+ global _commander
+
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = IncidentCommander(
+ name="incident-commander",
+ instruction=(
+ "You are the Incident Commander for a cloud operations team. "
+ "You coordinate responses to system alerts by delegating to specialist agents.\n\n"
+ "Available agents:\n"
+ "- cpu-watcher: Monitors CPU usage and processes\n"
+ "- memory-watcher: Monitors RAM and swap\n"
+ "- log-analyzer: Searches and analyzes logs\n"
+ "- security-scanner: Scans ports and login attempts\n\n"
+ "When you receive an alert, investigate it by querying relevant agents, "
+ "then synthesize a clear incident report with findings and recommendations."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(commander_tools)
+ _commander = agent
+
+ await agent.join_network(transport, "cloud-ops", ["alerts", "tasks"])
+ print("[incident-commander] Online and coordinating...")
+
+ # Listen for alerts and trigger investigation
+ @agent.on_signal("alert")
+ async def handle_alert(signal: Signal):
+ if signal.payload.get("source") == "commander":
+ return # Don't react to our own alerts
+
+ severity = signal.payload.get("severity", "unknown")
+ message = signal.payload.get("message", "No details")
+ metric = signal.payload.get("metric", "unknown")
+ print(f"\n[incident-commander] Received alert: [{severity}] {message}")
+
+ # Claim the incident
+ incident_id = f"inc-{signal.correlation_id}"
+ agent._claimed_incidents.add(incident_id)
+ won = await agent.claim_incident(incident_id, "tasks")
+
+ if won:
+ print(f"[incident-commander] Claimed incident {incident_id}, investigating...")
+ # Trigger investigation through the LLM
+ session_id = f"incident-{incident_id}"
+ prompt = (
+ f"ALERT received: [{severity}] {message} (metric: {metric})\n"
+ "Investigate this alert by querying the relevant agents, "
+ "then provide an incident report."
+ )
+ async for event in agent.invoke(session_id, prompt):
+ from bedsheet.events import CompletionEvent, ToolCallEvent
+ if isinstance(event, ToolCallEvent):
+ print(f" -> Calling {event.tool_name}...")
+ elif isinstance(event, CompletionEvent):
+ print(f"\n[INCIDENT REPORT]\n{event.response}\n")
+ else:
+ print(f"[incident-commander] Lost claim for {incident_id}")
+
+ try:
+ while True:
+ await asyncio.sleep(1)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/cloud-monitor/agents/log_analyzer.py b/examples/cloud-monitor/agents/log_analyzer.py
new file mode 100644
index 0000000..e0a9d95
--- /dev/null
+++ b/examples/cloud-monitor/agents/log_analyzer.py
@@ -0,0 +1,104 @@
+"""Log Analyzer agent - analyzes system logs for errors and patterns."""
+import asyncio
+import io
+import os
+import re
+from collections import Counter
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class LogAnalyzer(SenseMixin, Agent):
+ pass
+
+
+log_tools = ActionGroup("log_tools", "Log analysis tools")
+
+# Simulated log buffer for demo purposes
+_LOG_BUFFER = io.StringIO(
+ "2024-01-15 10:00:01 INFO Server started\n"
+ "2024-01-15 10:00:05 INFO Request received: GET /api/health\n"
+ "2024-01-15 10:00:10 WARN High latency on /api/users: 2500ms\n"
+ "2024-01-15 10:00:15 ERROR Connection timeout to database\n"
+ "2024-01-15 10:00:20 ERROR Failed to process request: timeout\n"
+ "2024-01-15 10:00:25 INFO Request received: GET /api/health\n"
+ "2024-01-15 10:00:30 WARN Memory pressure detected\n"
+ "2024-01-15 10:00:35 ERROR Connection refused: redis://localhost:6379\n"
+ "2024-01-15 10:00:40 INFO Auto-scaling triggered\n"
+ "2024-01-15 10:00:45 INFO Request received: POST /api/data\n"
+)
+
+
+@log_tools.action("tail_log", "Get the last N lines from the log")
+async def tail_log(lines: int = 10) -> str:
+ _LOG_BUFFER.seek(0)
+ all_lines = _LOG_BUFFER.readlines()
+ return "".join(all_lines[-lines:])
+
+
+@log_tools.action("search_log", "Search logs for a pattern (regex supported)")
+async def search_log(pattern: str) -> str:
+ _LOG_BUFFER.seek(0)
+ matches = [
+ line.strip()
+ for line in _LOG_BUFFER
+ if re.search(pattern, line, re.IGNORECASE)
+ ]
+ if not matches:
+ return f"No matches for '{pattern}'"
+ return f"Found {len(matches)} matches:\n" + "\n".join(matches[:20])
+
+
+@log_tools.action("get_error_rate", "Calculate the error rate from recent logs")
+async def get_error_rate() -> str:
+ _LOG_BUFFER.seek(0)
+ levels = Counter()
+ for line in _LOG_BUFFER:
+ for level in ("INFO", "WARN", "ERROR"):
+ if f" {level} " in line:
+ levels[level] += 1
+ break
+ total = sum(levels.values())
+ if total == 0:
+ return "No log entries found"
+ error_rate = (levels.get("ERROR", 0) / total) * 100
+ return (
+ f"Log summary: {total} entries, "
+ f"INFO: {levels['INFO']}, WARN: {levels['WARN']}, ERROR: {levels['ERROR']}, "
+ f"Error rate: {error_rate:.1f}%"
+ )
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = LogAnalyzer(
+ name="log-analyzer",
+ instruction=(
+ "You are a log analysis agent. When asked about logs, use your tools "
+ "to search, tail, and analyze log entries. Report error rates, patterns, "
+ "and notable events clearly."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(log_tools)
+
+ await agent.join_network(transport, "cloud-ops", ["alerts", "tasks"])
+ print("[log-analyzer] Online and ready...")
+
+ try:
+ while True:
+ await asyncio.sleep(60)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/cloud-monitor/agents/memory_watcher.py b/examples/cloud-monitor/agents/memory_watcher.py
new file mode 100644
index 0000000..b681805
--- /dev/null
+++ b/examples/cloud-monitor/agents/memory_watcher.py
@@ -0,0 +1,83 @@
+"""Memory Watcher agent - monitors RAM and swap usage."""
+import asyncio
+import os
+
+import psutil
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense import Signal
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class MemoryWatcher(SenseMixin, Agent):
+ pass
+
+
+mem_tools = ActionGroup("memory_tools", "Memory monitoring tools")
+
+
+@mem_tools.action("get_memory_usage", "Get current RAM usage")
+async def get_memory_usage() -> str:
+ mem = psutil.virtual_memory()
+ return (
+ f"Total: {mem.total / (1024**3):.1f}GB, "
+ f"Used: {mem.used / (1024**3):.1f}GB ({mem.percent}%), "
+ f"Available: {mem.available / (1024**3):.1f}GB"
+ )
+
+
+@mem_tools.action("get_swap_usage", "Get current swap usage")
+async def get_swap_usage() -> str:
+ swap = psutil.swap_memory()
+ return (
+ f"Total: {swap.total / (1024**3):.1f}GB, "
+ f"Used: {swap.used / (1024**3):.1f}GB ({swap.percent}%), "
+ f"Free: {swap.free / (1024**3):.1f}GB"
+ )
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = MemoryWatcher(
+ name="memory-watcher",
+ instruction=(
+ "You are a memory monitoring agent. When asked about memory status, "
+ "use your tools to check RAM and swap usage. Report findings clearly."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(mem_tools)
+
+ await agent.join_network(transport, "cloud-ops", ["alerts", "tasks"])
+ print("[memory-watcher] Online and monitoring...")
+
+ try:
+ while True:
+ mem = psutil.virtual_memory()
+ if mem.percent > 85:
+ alert = Signal(
+ kind="alert",
+ sender="memory-watcher",
+ payload={
+ "severity": "high",
+ "metric": "memory",
+ "value": mem.percent,
+ "message": f"Memory usage high: {mem.percent}%",
+ },
+ )
+ await agent.broadcast("alerts", alert)
+ print(f"[memory-watcher] ALERT: Memory at {mem.percent}%")
+ await asyncio.sleep(10)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/cloud-monitor/agents/security_scanner.py b/examples/cloud-monitor/agents/security_scanner.py
new file mode 100644
index 0000000..63513dd
--- /dev/null
+++ b/examples/cloud-monitor/agents/security_scanner.py
@@ -0,0 +1,86 @@
+"""Security Scanner agent - checks open ports and login attempts."""
+import asyncio
+import os
+import socket
+
+from bedsheet import Agent, ActionGroup, SenseMixin
+from bedsheet.llm.anthropic import AnthropicClient
+from bedsheet.sense.pubnub_transport import PubNubTransport
+
+
+class SecurityScanner(SenseMixin, Agent):
+ pass
+
+
+security_tools = ActionGroup("security_tools", "Security scanning tools")
+
+
+@security_tools.action("check_open_ports", "Scan common ports on localhost")
+async def check_open_ports() -> str:
+ common_ports = {
+ 22: "SSH", 80: "HTTP", 443: "HTTPS", 3306: "MySQL",
+ 5432: "PostgreSQL", 6379: "Redis", 8080: "HTTP-Alt",
+ 8443: "HTTPS-Alt", 27017: "MongoDB",
+ }
+ results = []
+ for port, service in common_ports.items():
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(0.5)
+ try:
+ result = sock.connect_ex(("127.0.0.1", port))
+ status = "OPEN" if result == 0 else "closed"
+ if result == 0:
+ results.append(f" Port {port} ({service}): {status}")
+ except Exception:
+ pass
+ finally:
+ sock.close()
+
+ if not results:
+ return "No common ports open on localhost"
+ return "Open ports:\n" + "\n".join(results)
+
+
+@security_tools.action("check_failed_logins", "Check for recent failed login attempts (simulated)")
+async def check_failed_logins() -> str:
+ # Simulated data for demo purposes
+ return (
+ "Recent failed login attempts (last 24h):\n"
+ " SSH: 3 attempts from 192.168.1.50 (blocked)\n"
+ " SSH: 1 attempt from 10.0.0.15\n"
+ " Web: 5 attempts from 203.0.113.42 (rate limited)\n"
+ " Total: 9 failed attempts, 1 IP blocked"
+ )
+
+
+async def main():
+ transport = PubNubTransport(
+ subscribe_key=os.environ["PUBNUB_SUBSCRIBE_KEY"],
+ publish_key=os.environ["PUBNUB_PUBLISH_KEY"],
+ )
+
+ agent = SecurityScanner(
+ name="security-scanner",
+ instruction=(
+ "You are a security scanning agent. When asked about security, "
+ "use your tools to check open ports and failed login attempts. "
+ "Report findings with severity assessment."
+ ),
+ model_client=AnthropicClient(),
+ )
+ agent.add_action_group(security_tools)
+
+ await agent.join_network(transport, "cloud-ops", ["alerts", "tasks"])
+ print("[security-scanner] Online and ready...")
+
+ try:
+ while True:
+ await asyncio.sleep(60)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ await agent.leave_network()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/examples/cloud-monitor/pyproject.toml b/examples/cloud-monitor/pyproject.toml
new file mode 100644
index 0000000..2c4d3a8
--- /dev/null
+++ b/examples/cloud-monitor/pyproject.toml
@@ -0,0 +1,12 @@
+[project]
+name = "cloud-monitor"
+version = "0.1.0"
+description = "Cloud monitoring demo using Bedsheet Sense for distributed agent communication"
+requires-python = ">=3.11"
+dependencies = [
+ "bedsheet[sense]",
+ "psutil>=5.9.0",
+]
+
+[project.scripts]
+cloud-monitor = "run:main"
diff --git a/examples/cloud-monitor/run.py b/examples/cloud-monitor/run.py
new file mode 100644
index 0000000..95a17db
--- /dev/null
+++ b/examples/cloud-monitor/run.py
@@ -0,0 +1,93 @@
+"""Cloud Monitor - launches all agents as separate processes.
+
+Each agent runs in its own process with its own PubNub connection,
+demonstrating true distributed agent communication.
+
+Required environment variables:
+ PUBNUB_SUBSCRIBE_KEY - PubNub subscribe key
+ PUBNUB_PUBLISH_KEY - PubNub publish key
+ ANTHROPIC_API_KEY - Anthropic API key for Claude
+
+Usage:
+ python run.py
+"""
+import os
+import signal
+import subprocess
+import sys
+import time
+
+AGENTS = [
+ "agents/cpu_watcher.py",
+ "agents/memory_watcher.py",
+ "agents/log_analyzer.py",
+ "agents/security_scanner.py",
+ "agents/incident_commander.py",
+]
+
+REQUIRED_ENV = ["PUBNUB_SUBSCRIBE_KEY", "PUBNUB_PUBLISH_KEY", "ANTHROPIC_API_KEY"]
+
+
+def main():
+ # Check environment
+ missing = [v for v in REQUIRED_ENV if not os.environ.get(v)]
+ if missing:
+ print("Missing required environment variables:")
+ for v in missing:
+ print(f" {v}")
+ print("\nSet them and try again:")
+ print(" export PUBNUB_SUBSCRIBE_KEY=sub-c-...")
+ print(" export PUBNUB_PUBLISH_KEY=pub-c-...")
+ print(" export ANTHROPIC_API_KEY=sk-ant-...")
+ sys.exit(1)
+
+ script_dir = os.path.dirname(os.path.abspath(__file__))
+ processes: list[subprocess.Popen] = []
+
+ print("=" * 60)
+ print(" Cloud Monitor - Bedsheet Sense Demo")
+ print(" Launching 5 distributed agents...")
+ print("=" * 60)
+
+ try:
+ for agent_script in AGENTS:
+ full_path = os.path.join(script_dir, agent_script)
+ agent_name = os.path.basename(agent_script).replace(".py", "").replace("_", "-")
+ print(f" Starting {agent_name}...")
+
+ proc = subprocess.Popen(
+ [sys.executable, full_path],
+ env=os.environ.copy(),
+ stdout=sys.stdout,
+ stderr=sys.stderr,
+ )
+ processes.append(proc)
+ time.sleep(1) # Stagger startup for clean PubNub connects
+
+ print("=" * 60)
+ print(" All agents online! Press Ctrl+C to stop.")
+ print("=" * 60)
+
+ # Wait for any process to exit or for keyboard interrupt
+ while all(p.poll() is None for p in processes):
+ time.sleep(1)
+
+ except KeyboardInterrupt:
+ print("\nShutting down agents...")
+ finally:
+ for proc in processes:
+ if proc.poll() is None:
+ proc.send_signal(signal.SIGINT)
+
+ # Give agents time for graceful shutdown
+ time.sleep(2)
+
+ for proc in processes:
+ if proc.poll() is None:
+ proc.terminate()
+
+ print("All agents stopped.")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pyproject.toml b/pyproject.toml
index e4f0bd0..81fc3b0 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -66,6 +66,9 @@ aws = [
"pydantic>=2.0.0",
# aws-cdk-lib will be added for AWS target
]
+sense = [
+ "pubnub>=7.0.0",
+]
demo = [
"yfinance>=0.2.40",
"ddgs>=6.0.0",
diff --git a/tests/test_sense.py b/tests/test_sense.py
new file mode 100644
index 0000000..32b317f
--- /dev/null
+++ b/tests/test_sense.py
@@ -0,0 +1,494 @@
+"""Tests for the Sense distributed communication module."""
+import asyncio
+import pytest
+
+from bedsheet import Agent, SenseMixin, SenseNetwork
+from bedsheet.events import (
+ SignalReceivedEvent,
+ AgentConnectedEvent,
+ AgentDisconnectedEvent,
+ RemoteDelegationEvent,
+ RemoteResultEvent,
+)
+from bedsheet.sense.signals import Signal
+from bedsheet.sense.serialization import serialize, deserialize, MAX_MESSAGE_BYTES
+from bedsheet.sense.protocol import AgentPresence
+from bedsheet.testing import MockLLMClient, MockResponse, MockSenseTransport, _MockSenseHub
+
+
+# ---------- Signal dataclass tests ----------
+
+class TestSignal:
+ def test_signal_creation(self):
+ signal = Signal(kind="alert", sender="agent-1")
+ assert signal.kind == "alert"
+ assert signal.sender == "agent-1"
+ assert signal.payload == {}
+ assert signal.target is None
+ assert signal.timestamp > 0
+ assert len(signal.correlation_id) == 12
+
+ def test_signal_with_payload(self):
+ signal = Signal(
+ kind="request",
+ sender="commander",
+ payload={"task": "check cpu"},
+ target="cpu-watcher",
+ )
+ assert signal.payload == {"task": "check cpu"}
+ assert signal.target == "cpu-watcher"
+
+ def test_signal_kinds(self):
+ for kind in ("request", "response", "alert", "heartbeat", "claim", "release", "event"):
+ signal = Signal(kind=kind, sender="test")
+ assert signal.kind == kind
+
+
+# ---------- Serialization tests ----------
+
+class TestSerialization:
+ def test_serialize_minimal(self):
+ signal = Signal(kind="alert", sender="agent-1")
+ data = serialize(signal)
+ assert data["k"] == "alert"
+ assert data["s"] == "agent-1"
+ assert "ts" in data
+ # No payload -> no "p" key
+ assert "p" not in data
+
+ def test_serialize_with_payload(self):
+ signal = Signal(
+ kind="request",
+ sender="commander",
+ payload={"task": "check cpu", "priority": "high"},
+ target="cpu-watcher",
+ )
+ data = serialize(signal)
+ assert data["p"] == {"task": "check cpu", "priority": "high"}
+ assert data["t"] == "cpu-watcher"
+ assert data["c"] == signal.correlation_id
+
+ def test_roundtrip(self):
+ original = Signal(
+ kind="response",
+ sender="cpu-watcher",
+ payload={"result": "CPU at 45%"},
+ correlation_id="abc123",
+ target="commander",
+ )
+ data = serialize(original)
+ restored = deserialize(data, source_channel="bedsheet.ops.tasks")
+
+ assert restored.kind == original.kind
+ assert restored.sender == original.sender
+ assert restored.payload == original.payload
+ assert restored.correlation_id == original.correlation_id
+ assert restored.target == original.target
+ assert restored.source_channel == "bedsheet.ops.tasks"
+
+ def test_truncation_on_large_payload(self):
+ # Create a payload that exceeds the limit
+ large_payload = {"data": "x" * (MAX_MESSAGE_BYTES + 1000)}
+ signal = Signal(kind="event", sender="test", payload=large_payload)
+ data = serialize(signal)
+
+ # Payload should be truncated
+ assert data["p"]["_truncated"] is True
+ assert "summary" in data["p"]
+
+ def test_deserialize_minimal(self):
+ data = {"k": "heartbeat", "s": "agent-2", "ts": 1234567890.0}
+ signal = deserialize(data)
+ assert signal.kind == "heartbeat"
+ assert signal.sender == "agent-2"
+ assert signal.payload == {}
+ assert signal.correlation_id == ""
+
+
+# ---------- Protocol tests ----------
+
+class TestProtocol:
+ def test_mock_transport_satisfies_protocol(self):
+ """MockSenseTransport should satisfy the SenseTransport protocol."""
+ transport = MockSenseTransport()
+ # Check that the protocol methods exist
+ assert hasattr(transport, "connect")
+ assert hasattr(transport, "disconnect")
+ assert hasattr(transport, "broadcast")
+ assert hasattr(transport, "subscribe")
+ assert hasattr(transport, "unsubscribe")
+ assert hasattr(transport, "signals")
+ assert hasattr(transport, "get_online_agents")
+
+ def test_agent_presence_creation(self):
+ presence = AgentPresence(
+ agent_id="agent-1",
+ agent_name="CPU Watcher",
+ namespace="cloud-ops",
+ capabilities=["get_cpu_usage", "get_process_top"],
+ )
+ assert presence.agent_id == "agent-1"
+ assert presence.capabilities == ["get_cpu_usage", "get_process_top"]
+ assert presence.status == "online"
+
+
+# ---------- MockSenseTransport tests ----------
+
+class TestMockSenseTransport:
+ async def test_connect_disconnect(self):
+ transport = MockSenseTransport()
+ await transport.connect("agent-1", "test-ns")
+ assert transport._connected
+ await transport.disconnect()
+ assert not transport._connected
+
+ async def test_subscribe_and_broadcast(self):
+ hub = _MockSenseHub()
+ transport1 = MockSenseTransport(hub)
+ transport2 = MockSenseTransport(hub)
+
+ # Agent 1 subscribes
+ await transport1.connect("agent-1", "test-ns")
+ await transport1.subscribe("alerts")
+
+ # Agent 2 subscribes
+ await transport2.connect("agent-2", "test-ns")
+ await transport2.subscribe("alerts")
+
+ # Agent 2 broadcasts
+ signal = Signal(kind="alert", sender="agent-2", payload={"msg": "cpu high"})
+ await transport2.broadcast("alerts", signal)
+
+ # Agent 1 should receive it
+ queue = hub.queues["agent-1"]
+ received = await asyncio.wait_for(queue.get(), timeout=1.0)
+ assert received.kind == "alert"
+ assert received.payload == {"msg": "cpu high"}
+
+ async def test_get_online_agents(self):
+ transport = MockSenseTransport()
+ await transport.connect("agent-1", "test-ns")
+ await transport.subscribe("alerts")
+
+ agents = await transport.get_online_agents("alerts")
+ assert len(agents) == 1
+ assert agents[0].agent_id == "agent-1"
+
+ async def test_create_peer(self):
+ t1 = MockSenseTransport()
+ t2 = t1.create_peer()
+ assert t1.hub is t2.hub
+
+
+# ---------- SenseMixin tests ----------
+
+class SenseAgent(SenseMixin, Agent):
+ """Agent with sensing capabilities for testing."""
+ pass
+
+
+class TestSenseMixin:
+ def _make_agent(self, name: str, response_text: str = "Done") -> SenseAgent:
+ """Create a sense agent with a mock LLM client."""
+ client = MockLLMClient([MockResponse(text=response_text)])
+ agent = SenseAgent(
+ name=name,
+ instruction=f"You are {name}.",
+ model_client=client,
+ )
+ return agent
+
+ async def test_join_and_leave_network(self):
+ transport = MockSenseTransport()
+ agent = self._make_agent("watcher")
+
+ await agent.join_network(transport, "test-ns", ["alerts"])
+ assert agent._transport is not None
+ assert agent._signal_task is not None
+
+ await agent.leave_network()
+ assert agent._transport is None
+
+ async def test_broadcast_signal(self):
+ hub = _MockSenseHub()
+ t1 = MockSenseTransport(hub)
+ t2 = MockSenseTransport(hub)
+
+ sender = self._make_agent("sender")
+ receiver = self._make_agent("receiver")
+
+ received_signals: list[Signal] = []
+
+ @receiver.on_signal("alert")
+ async def handle_alert(signal: Signal):
+ received_signals.append(signal)
+
+ await sender.join_network(t1, "test-ns", ["alerts"])
+ await receiver.join_network(t2, "test-ns", ["alerts"])
+
+ signal = Signal(kind="alert", sender="sender", payload={"cpu": 95})
+ await sender.broadcast("alerts", signal)
+
+ # Give signal loop time to process
+ await asyncio.sleep(0.3)
+
+ assert len(received_signals) == 1
+ assert received_signals[0].kind == "alert"
+ assert received_signals[0].payload == {"cpu": 95}
+
+ await sender.leave_network()
+ await receiver.leave_network()
+
+ async def test_request_response(self):
+ """Test request/response pattern between two agents."""
+ hub = _MockSenseHub()
+ t1 = MockSenseTransport(hub)
+ t2 = MockSenseTransport(hub)
+
+ # Worker agent that responds with "CPU at 45%"
+ worker = self._make_agent("cpu-watcher", response_text="CPU at 45%")
+ await worker.join_network(t1, "test-ns", ["tasks"])
+
+ # Commander agent
+ commander = self._make_agent("commander", response_text="Analysis complete")
+ await commander.join_network(t2, "test-ns", ["tasks"])
+
+ # Commander requests work from worker
+ result = await commander.request("cpu-watcher", "What is the CPU usage?", timeout=5.0)
+ assert result == "CPU at 45%"
+
+ await worker.leave_network()
+ await commander.leave_network()
+
+ async def test_request_timeout(self):
+ """Test that request times out when no agent responds."""
+ transport = MockSenseTransport()
+ agent = self._make_agent("lonely-agent")
+ await agent.join_network(transport, "test-ns", ["tasks"])
+
+ with pytest.raises(TimeoutError, match="No response"):
+ await agent.request("nonexistent-agent", "hello?", timeout=0.5)
+
+ await agent.leave_network()
+
+ async def test_on_signal_handler(self):
+ """Test custom signal handler registration."""
+ transport = MockSenseTransport()
+ agent = self._make_agent("handler-agent")
+
+ received_signals: list[Signal] = []
+
+ @agent.on_signal("alert")
+ async def handle_alert(signal: Signal):
+ received_signals.append(signal)
+
+ await agent.join_network(transport, "test-ns", ["alerts"])
+
+ # Simulate receiving an alert from another agent
+ alert = Signal(kind="alert", sender="other-agent", payload={"severity": "high"})
+ queue = transport.hub.queues.get("handler-agent")
+ if queue:
+ await queue.put(alert)
+
+ # Give signal loop time to process
+ await asyncio.sleep(0.2)
+
+ assert len(received_signals) == 1
+ assert received_signals[0].payload["severity"] == "high"
+
+ await agent.leave_network()
+
+ async def test_skip_own_signals(self):
+ """Agents should not process their own signals."""
+ transport = MockSenseTransport()
+ agent = self._make_agent("self-talker")
+
+ received_signals: list[Signal] = []
+
+ @agent.on_signal("alert")
+ async def handle_alert(signal: Signal):
+ received_signals.append(signal)
+
+ await agent.join_network(transport, "test-ns", ["alerts"])
+
+ # Put our own signal in the queue
+ own_signal = Signal(kind="alert", sender="self-talker", payload={})
+ queue = transport.hub.queues.get("self-talker")
+ if queue:
+ await queue.put(own_signal)
+
+ await asyncio.sleep(0.2)
+
+ # Should not have processed our own signal
+ assert len(received_signals) == 0
+
+ await agent.leave_network()
+
+ async def test_targeted_signal_filtering(self):
+ """Agents should skip signals targeted at other agents."""
+ transport = MockSenseTransport()
+ agent = self._make_agent("agent-a")
+
+ received_signals: list[Signal] = []
+
+ @agent.on_signal("request")
+ async def handle_request(signal: Signal):
+ received_signals.append(signal)
+
+ await agent.join_network(transport, "test-ns", ["tasks"])
+
+ # Signal targeted at another agent
+ signal = Signal(
+ kind="request",
+ sender="commander",
+ payload={"task": "check logs"},
+ target="agent-b",
+ )
+ queue = transport.hub.queues.get("agent-a")
+ if queue:
+ await queue.put(signal)
+
+ await asyncio.sleep(0.2)
+
+ # Should not have processed it
+ assert len(received_signals) == 0
+
+ await agent.leave_network()
+
+
+# ---------- Claim protocol tests ----------
+
+class TestClaimProtocol:
+ async def test_claim_incident(self):
+ """Test basic incident claiming."""
+ hub = _MockSenseHub()
+ transport = MockSenseTransport(hub)
+ agent = SenseAgent(
+ name="commander",
+ instruction="Incident commander",
+ model_client=MockLLMClient([MockResponse(text="claimed")]),
+ )
+ await agent.join_network(transport, "test-ns", ["tasks"])
+
+ # Mark ourselves as having claimed (simulate winning)
+ agent._claimed_incidents.add("incident-001")
+ won = await agent.claim_incident("incident-001", "tasks")
+ assert won is True
+
+ await agent.leave_network()
+
+ async def test_release_incident(self):
+ """Test releasing a claimed incident."""
+ hub = _MockSenseHub()
+ transport = MockSenseTransport(hub)
+ agent = SenseAgent(
+ name="commander",
+ instruction="Incident commander",
+ model_client=MockLLMClient([MockResponse(text="released")]),
+ )
+ await agent.join_network(transport, "test-ns", ["tasks"])
+
+ agent._claimed_incidents.add("incident-001")
+ await agent.release_incident("incident-001", "tasks")
+ assert "incident-001" not in agent._claimed_incidents
+
+ await agent.leave_network()
+
+
+# ---------- SenseNetwork tests ----------
+
+class TestSenseNetwork:
+ async def test_add_agent(self):
+ transport = MockSenseTransport()
+ network = SenseNetwork(namespace="test-ns", transport=transport)
+
+ agent = SenseAgent(
+ name="watcher",
+ instruction="Watch things",
+ model_client=MockLLMClient([MockResponse(text="ok")]),
+ )
+ await network.add(agent, channels=["alerts"])
+ assert len(network.agents) == 1
+
+ await network.stop()
+
+ async def test_add_non_sense_agent_raises(self):
+ transport = MockSenseTransport()
+ network = SenseNetwork(namespace="test-ns", transport=transport)
+
+ agent = Agent(
+ name="plain-agent",
+ instruction="I am plain",
+ model_client=MockLLMClient([MockResponse(text="ok")]),
+ )
+ with pytest.raises(TypeError, match="must inherit from SenseMixin"):
+ await network.add(agent)
+
+ async def test_stop_disconnects_all(self):
+ transport = MockSenseTransport()
+ network = SenseNetwork(namespace="test-ns", transport=transport)
+
+ agent1 = SenseAgent(
+ name="agent-1",
+ instruction="Agent 1",
+ model_client=MockLLMClient([MockResponse(text="ok")]),
+ )
+ agent2 = SenseAgent(
+ name="agent-2",
+ instruction="Agent 2",
+ model_client=MockLLMClient([MockResponse(text="ok")]),
+ )
+
+ await network.add(agent1, channels=["alerts"])
+ await network.add(agent2, channels=["alerts"])
+ assert len(network.agents) == 2
+
+ await network.stop()
+ assert len(network.agents) == 0
+ assert agent1._transport is None
+ assert agent2._transport is None
+
+
+# ---------- Event dataclass tests ----------
+
+class TestSenseEvents:
+ def test_signal_received_event(self):
+ event = SignalReceivedEvent(
+ sender="agent-1",
+ kind="alert",
+ channel="bedsheet.ops.alerts",
+ payload={"cpu": 95},
+ )
+ assert event.type == "signal_received"
+
+ def test_agent_connected_event(self):
+ event = AgentConnectedEvent(
+ agent_id="agent-1",
+ agent_name="CPU Watcher",
+ namespace="cloud-ops",
+ )
+ assert event.type == "agent_connected"
+
+ def test_agent_disconnected_event(self):
+ event = AgentDisconnectedEvent(
+ agent_id="agent-1",
+ agent_name="CPU Watcher",
+ namespace="cloud-ops",
+ )
+ assert event.type == "agent_disconnected"
+
+ def test_remote_delegation_event(self):
+ event = RemoteDelegationEvent(
+ agent_name="cpu-watcher",
+ task="Check CPU",
+ correlation_id="abc123",
+ )
+ assert event.type == "remote_delegation"
+
+ def test_remote_result_event(self):
+ event = RemoteResultEvent(
+ agent_name="cpu-watcher",
+ result="CPU at 45%",
+ correlation_id="abc123",
+ )
+ assert event.type == "remote_result"