@@ -13,9 +13,12 @@ import { nanoid } from "nanoid";
1313
1414import { AsyncLocalStorage } from "node:async_hooks" ;
1515import { MCPClientManager } from "./mcp/client" ;
16+ import { genericObservability , type Observability } from "./observability" ;
1617
1718export type { Connection , WSMessage , ConnectionContext } from "partyserver" ;
1819
20+ export type { ObservabilityEvent , Observability } from "./observability" ;
21+
1922/**
2023 * RPC request message from client
2124 */
@@ -264,6 +267,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {
264267 hibernate : true , // default to hibernate
265268 } ;
266269
270+ /**
271+ * The observability implementation to use for the Agent
272+ */
273+ observability ?: Observability = genericObservability ;
274+
267275 /**
268276 * Execute SQL queries against the Agent's database
269277 * @template T Type of the returned rows
@@ -369,6 +377,35 @@ export class Agent<Env, State = unknown> extends Server<Env> {
369377
370378 // For regular methods, execute and send response
371379 const result = await methodFn . apply ( this , args ) ;
380+
381+ const displayArgs = args . map ( ( a ) => {
382+ if ( typeof a === "object" ) {
383+ if ( Array . isArray ( a ) ) {
384+ return "[...]" ;
385+ }
386+
387+ return "{...}" ;
388+ }
389+
390+ return String ( a ) ;
391+ } ) ;
392+
393+ this . observability ?. emit (
394+ {
395+ id : nanoid ( ) ,
396+ type : "rpc" ,
397+ displayMessage : `RPC call to ${ method } args: ${ displayArgs . join ( ", " ) } ` ,
398+ timestamp : Date . now ( ) ,
399+ payload : {
400+ method,
401+ args,
402+ success : true ,
403+ streaming : metadata ?. streaming ,
404+ } ,
405+ } ,
406+ this . ctx
407+ ) ;
408+
372409 const response : RPCResponse = {
373410 type : "rpc" ,
374411 id,
@@ -413,6 +450,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
413450 } )
414451 ) ;
415452 }
453+
454+ this . observability ?. emit (
455+ {
456+ id : nanoid ( ) ,
457+ type : "connect" ,
458+ displayMessage : `Connection ${ connection . id } established` ,
459+ timestamp : Date . now ( ) ,
460+ payload : {
461+ connectionId : connection . id ,
462+ } ,
463+ } ,
464+ this . ctx
465+ ) ;
416466 return this . #tryCatch( ( ) => _onConnect ( connection , ctx ) ) ;
417467 } , 20 ) ;
418468 }
@@ -421,6 +471,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
421471 }
422472
423473 #setStateInternal( state : State , source : Connection | "server" = "server" ) {
474+ const previousState = this . #state;
424475 this . #state = state ;
425476 this . sql `
426477 INSERT OR REPLACE INTO cf_agents_state (id, state)
@@ -442,6 +493,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
442493 return agentContext . run (
443494 { agent : this , connection, request } ,
444495 async ( ) => {
496+ this . observability ?. emit (
497+ {
498+ id : nanoid ( ) ,
499+ type : "state:update" ,
500+ displayMessage : "State updated" ,
501+ timestamp : Date . now ( ) ,
502+ payload : {
503+ state,
504+ previousState,
505+ } ,
506+ } ,
507+ this . ctx
508+ ) ;
445509 return this . onStateUpdate ( state , source ) ;
446510 }
447511 ) ;
@@ -535,6 +599,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
535599 ) : Promise < Schedule < T > > {
536600 const id = nanoid ( 9 ) ;
537601
602+ const emitScheduleCreate = ( schedule : Schedule < T > ) =>
603+ this . observability ?. emit (
604+ {
605+ id : nanoid ( ) ,
606+ type : "schedule:create" ,
607+ displayMessage : `Schedule ${ schedule . id } created` ,
608+ timestamp : Date . now ( ) ,
609+ payload : schedule ,
610+ } ,
611+ this . ctx
612+ ) ;
613+
538614 if ( typeof callback !== "string" ) {
539615 throw new Error ( "Callback must be a string" ) ;
540616 }
@@ -554,13 +630,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
554630
555631 await this . #scheduleNextAlarm( ) ;
556632
557- return {
633+ const schedule : Schedule < T > = {
558634 id,
559635 callback : callback ,
560636 payload : payload as T ,
561637 time : timestamp ,
562638 type : "scheduled" ,
563639 } ;
640+
641+ emitScheduleCreate ( schedule ) ;
642+
643+ return schedule ;
564644 }
565645 if ( typeof when === "number" ) {
566646 const time = new Date ( Date . now ( ) + when * 1000 ) ;
@@ -575,14 +655,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
575655
576656 await this . #scheduleNextAlarm( ) ;
577657
578- return {
658+ const schedule : Schedule < T > = {
579659 id,
580660 callback : callback ,
581661 payload : payload as T ,
582662 delayInSeconds : when ,
583663 time : timestamp ,
584664 type : "delayed" ,
585665 } ;
666+
667+ emitScheduleCreate ( schedule ) ;
668+
669+ return schedule ;
586670 }
587671 if ( typeof when === "string" ) {
588672 const nextExecutionTime = getNextCronTime ( when ) ;
@@ -597,14 +681,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
597681
598682 await this . #scheduleNextAlarm( ) ;
599683
600- return {
684+ const schedule : Schedule < T > = {
601685 id,
602686 callback : callback ,
603687 payload : payload as T ,
604688 cron : when ,
605689 time : timestamp ,
606690 type : "cron" ,
607691 } ;
692+
693+ emitScheduleCreate ( schedule ) ;
694+
695+ return schedule ;
608696 }
609697 throw new Error ( "Invalid schedule type" ) ;
610698 }
@@ -680,6 +768,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
680768 * @returns true if the task was cancelled, false otherwise
681769 */
682770 async cancelSchedule ( id : string ) : Promise < boolean > {
771+ const schedule = await this . getSchedule ( id ) ;
772+ if ( schedule ) {
773+ this . observability ?. emit (
774+ {
775+ id : nanoid ( ) ,
776+ type : "schedule:delete" ,
777+ displayMessage : `Schedule ${ id } deleted` ,
778+ timestamp : Date . now ( ) ,
779+ payload : schedule ,
780+ } ,
781+ this . ctx
782+ ) ;
783+ }
683784 this . sql `DELETE FROM cf_agents_schedules WHERE id = ${ id } ` ;
684785
685786 await this . #scheduleNextAlarm( ) ;
@@ -724,6 +825,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
724825 { agent : this , connection : undefined , request : undefined } ,
725826 async ( ) => {
726827 try {
828+ this . observability ?. emit (
829+ {
830+ id : nanoid ( ) ,
831+ type : "schedule:execute" ,
832+ displayMessage : `Schedule ${ row . id } executed` ,
833+ timestamp : Date . now ( ) ,
834+ payload : row ,
835+ } ,
836+ this . ctx
837+ ) ;
838+
727839 await (
728840 callback as (
729841 payload : unknown ,
@@ -766,6 +878,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
766878 // delete all alarms
767879 await this . ctx . storage . deleteAlarm ( ) ;
768880 await this . ctx . storage . deleteAll ( ) ;
881+
882+ this . observability ?. emit (
883+ {
884+ id : nanoid ( ) ,
885+ type : "destroy" ,
886+ displayMessage : "Agent destroyed" ,
887+ timestamp : Date . now ( ) ,
888+ payload : { } ,
889+ } ,
890+ this . ctx
891+ ) ;
769892 }
770893
771894 /**
0 commit comments