Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/langchain-decorator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
"cross-env": "^7.0.3",
"mocha": "^10.2.0",
"ts-node": "^10.9.1",
"typescript": "^5.0.4"
"typescript": "^5.0.4",
"zod": "^3.24.4"
},
"gitHead": "5f24bacd9131435188be15568d86ef4575f85636"
}
2 changes: 1 addition & 1 deletion plugin/controller/test/http/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('plugin/controller/test/http/request.test.ts', () => {
});
const [ nodeMajor ] = process.versions.node.split('.').map(v => Number(v));
if (nodeMajor >= 16) {
it.only('Request should work', async () => {
it('Request should work', async () => {
app.mockCsrf();
const param = {
name: 'foo',
Expand Down
9 changes: 8 additions & 1 deletion plugin/langchain/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import { CompiledStateGraphObject } from './lib/graph/CompiledStateGraphObject';
import { BoundModelObjectHook } from './lib/boundModel/BoundModelObjectHook';
import { GraphPrototypeHook } from './lib/graph/GraphPrototypeHook';
import { GraphBuildHook } from './lib/graph/GraphBuildHook';
import { AgentHttpLoadUnitLifecycleHook } from './lib/agent/AgentHttpLoadUnitLifecycleHook';

export default class ModuleLangChainHook implements IBoot {
readonly #app: Application;
readonly #graphObjectHook: GraphObjectHook;
readonly #graphLoadUnitHook: GraphLoadUnitHook;
readonly #boundModelObjectHook: BoundModelObjectHook;
readonly #graphPrototypeHook: GraphPrototypeHook;
#agentHttpLoadUnitHook: AgentHttpLoadUnitLifecycleHook;

constructor(app: Application) {
this.#app = app;
Expand All @@ -25,9 +27,11 @@ export default class ModuleLangChainHook implements IBoot {
}

configWillLoad() {
this.#agentHttpLoadUnitHook = new AgentHttpLoadUnitLifecycleHook(this.#app.moduleConfigs);
this.#app.loadUnitLifecycleUtil.registerLifecycle(this.#agentHttpLoadUnitHook);
this.#app.eggObjectLifecycleUtil.registerLifecycle(this.#graphObjectHook);
this.#app.eggObjectLifecycleUtil.registerLifecycle(this.#boundModelObjectHook);
this.#app.eggObjectFactory.registerEggObjectCreateMethod(CompiledStateGraphProto, CompiledStateGraphObject.createObject);
this.#app.eggObjectFactory.registerEggObjectCreateMethod(CompiledStateGraphProto, CompiledStateGraphObject.createObject(this.#app));
this.#app.eggPrototypeLifecycleUtil.registerLifecycle(this.#graphPrototypeHook);
}

Expand All @@ -36,6 +40,9 @@ export default class ModuleLangChainHook implements IBoot {
}

async beforeClose() {
if (this.#agentHttpLoadUnitHook) {
this.#app.loadUnitLifecycleUtil.deleteLifecycle(this.#agentHttpLoadUnitHook);
}
this.#app.eggObjectLifecycleUtil.deleteLifecycle(this.#graphObjectHook);
this.#app.eggObjectLifecycleUtil.deleteLifecycle(this.#boundModelObjectHook);
this.#app.loadUnitLifecycleUtil.deleteLifecycle(this.#graphLoadUnitHook);
Expand Down
140 changes: 140 additions & 0 deletions plugin/langchain/lib/agent/AgentHttpLoadUnitLifecycleHook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import {
ConfigSourceQualifier,
Context,
HTTPBody,
HTTPController,
HTTPMethod,
HTTPMethodEnum,
LifecycleHook,
} from '@eggjs/tegg';
import { ClassProtoDescriptor, EggContainerFactory, EggPrototypeCreatorFactory, EggPrototypeFactory, ProtoDescriptorHelper } from '@eggjs/tegg/helper';
import type { LoadUnit, LoadUnitLifecycleContext } from '@eggjs/tegg-metadata';
import { ModuleConfig, ModuleReference } from 'egg';
import { LangChainConfigSchemaType } from 'typings';
import { Readable, Transform } from 'stream';
import { CompiledStateGraph } from '@langchain/langgraph';
import { AIMessage, HumanMessage, SystemMessage, ToolMessage } from '@langchain/core/messages';


export interface ModuleConfigHolder {
name: string;
config: ModuleConfig;
reference: ModuleReference;
}

type ValueOf<T> = T[keyof T];

export class AgentHttpLoadUnitLifecycleHook implements LifecycleHook<LoadUnitLifecycleContext, LoadUnit> {
readonly moduleConfigs: Record<string, ModuleConfigHolder>;

constructor(moduleConfigs: Record<string, ModuleConfigHolder>) {
this.moduleConfigs = moduleConfigs;
}

async preCreate(_: LoadUnitLifecycleContext, loadUnit: LoadUnit): Promise<void> {
const moduleConfigs = this.#getModuleConfig(loadUnit);
if (moduleConfigs.length > 0) {
for (const [ graphName, config ] of moduleConfigs) {
if (config?.type === 'http') {
const GraphHttpController = this.#createGraphHttpControllerClass(loadUnit, graphName, config);
const protoDescriptor = ProtoDescriptorHelper.createByInstanceClazz(GraphHttpController, {
moduleName: loadUnit.name,
unitPath: loadUnit.unitPath,
}) as ClassProtoDescriptor;

const proto = await EggPrototypeCreatorFactory.createProtoByDescriptor(protoDescriptor, loadUnit);
EggPrototypeFactory.instance.registerPrototype(proto, loadUnit);
}
}
}
}

#createGraphHttpControllerClass(loadUnit: LoadUnit, graphName: string, config: ValueOf<LangChainConfigSchemaType['agents']>) {
class GraphHttpController {
@HTTPMethod({
path: config.path!,
method: HTTPMethodEnum.POST,
timeout: config.timeout,
})
Comment on lines +54 to +58
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add validation for config.path before using non-null assertion.

The path property in the agent configuration is optional, so config.path can be undefined. Using the non-null assertion (!) will cause a runtime error if the path is not provided. This should be validated before reaching this point.

The validation should occur in preCreate before calling #createGraphHttpControllerClass:

       for (const [ graphName, config ] of moduleConfigs) {
-        if (config?.type === 'http') {
+        if (config?.type === 'http' && config.path) {
           const GraphHttpController = this.#createGraphHttpControllerClass(loadUnit, graphName, config);

Or add an explicit error if path is missing:

   #createGraphHttpControllerClass(loadUnit: LoadUnit, graphName: string, config: ValueOf<LangChainConfigSchemaType['agents']>) {
+    if (!config.path) {
+      throw new Error(`Agent ${graphName} is configured as HTTP but missing 'path' field`);
+    }
     class GraphHttpController {

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In plugin/langchain/lib/agent/AgentHttpLoadUnitLifecycleHook.ts around lines 54
to 58, the code uses config.path! with a non-null assertion but config.path is
optional; add validation in preCreate (before calling
#createGraphHttpControllerClass) to check that config.path is defined and is a
non-empty string, and if not either supply a default path or throw a clear,
descriptive error (e.g., throw new Error("Agent HTTP config.path is required"))
so that #createGraphHttpControllerClass never receives an undefined path; ensure
the validated path is passed into the decorator call instead of using the
non-null assertion.

async invoke(@Context() ctx, @HTTPBody() args) {
const eggObj = await EggContainerFactory.getOrCreateEggObjectFromName(`compiled${graphName}`);
const invokeFunc = (eggObj.obj as CompiledStateGraph<any, any>).invoke;
const streamFunc = (eggObj.obj as CompiledStateGraph<any, any>).stream;
const genArgs = Object.entries(args).reduce((acc, [ key, value ]) => {
if (Array.isArray(value) && typeof value[0] === 'object') {
acc[key] = value.map(obj => {
switch (obj.role) {
case 'human':
return new HumanMessage(obj);
case 'ai':
return new AIMessage(obj);
case 'system':
return new SystemMessage(obj);
case 'tool':
return new ToolMessage(obj);
default:
throw new Error('unknown message type');
}
});
} else {
acc[key] = value;
}
return acc;
}, {});

const defaultConfig = {
configurable: {
thread_id: process.pid.toString(),
},
};
Comment on lines +85 to +89
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: thread_id using process.pid is not unique per request.

Using process.pid as thread_id means all concurrent requests within the same process share the same conversation thread. This causes conversation state corruption when multiple requests run simultaneously. Use a request-specific identifier instead.

         const defaultConfig = {
           configurable: {
-            thread_id: process.pid.toString(),
+            thread_id: ctx.request.get('x-thread-id') || ctx.traceId || crypto.randomUUID(),
           },
         };

Add the import at the top of the file:

import crypto from 'node:crypto';
🤖 Prompt for AI Agents
In plugin/langchain/lib/agent/AgentHttpLoadUnitLifecycleHook.ts around lines 85
to 89, the defaultConfig sets configurable.thread_id to process.pid which is
shared across all requests; replace this with a request-unique identifier by
importing crypto (import crypto from 'node:crypto') at the top and setting
configurable.thread_id = crypto.randomUUID() (or
crypto.randomBytes(16).toString('hex')) when building the config per request
rather than using process.pid, ensuring the config is created inside the request
lifecycle so each request gets its own thread_id.


const res = await Reflect.apply(config.stream ? streamFunc : invokeFunc, (eggObj.obj as CompiledStateGraph<any, any>), [ genArgs, defaultConfig ]);

if (config.stream) {
ctx.set({
'content-type': 'text/event-stream',
'cache-control': 'no-cache',
'transfer-encoding': 'chunked',
'X-Accel-Buffering': 'no',
});
const transformStream = new Transform({
objectMode: true,
transform(chunk: any, _encoding: string, callback) {
try {
// 如果 chunk 是对象,转换为 JSON
let data: string;
if (typeof chunk === 'string') {
data = chunk;
} else if (typeof chunk === 'object') {
data = JSON.stringify(chunk);
} else {
data = String(chunk);
}

// 格式化为 SSE 格式
const sseFormatted = `data: ${data}\n\n`;
callback(null, sseFormatted);
} catch (error) {
callback(error);
}
Comment on lines +117 to +119
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

TypeScript type error: error is unknown in catch block.

The error variable is typed as unknown in the catch block, but callback expects Error | null | undefined. Add proper type handling.

             } catch (error) {
-              callback(error);
+              callback(error instanceof Error ? error : new Error(String(error)));
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error) {
callback(error);
}
} catch (error) {
callback(error instanceof Error ? error : new Error(String(error)));
}
🤖 Prompt for AI Agents
In plugin/langchain/lib/agent/AgentHttpLoadUnitLifecycleHook.ts around lines 117
to 119, the catch block receives an unknown-typed error but calls callback with
that value; change the handling to narrow the type before calling callback: test
if error is an instance of Error and pass it directly, otherwise create a new
Error(String(error)) (or wrap with a descriptive message) and pass that to
callback so the callback receives Error | null | undefined as expected.

},
});
return Readable.fromWeb(res as any, { objectMode: true }).pipe(transformStream);
}
return res;
}
}
HTTPController({ controllerName: `${graphName}HttpController`, protoName: `${graphName}HttpController` })(GraphHttpController);
ConfigSourceQualifier(loadUnit.name)(GraphHttpController.prototype, 'moduleConfig');

return GraphHttpController;
}

#getModuleConfig(loadUnit: LoadUnit) {
const moduleConfig: LangChainConfigSchemaType = (this.moduleConfigs[loadUnit.name]?.config as any)?.langchain;
if (moduleConfig && Object.keys(moduleConfig?.agents || {}).length > 0) {
return Object.entries(moduleConfig.agents);
}
return [];
}
}
17 changes: 11 additions & 6 deletions plugin/langchain/lib/graph/CompiledStateGraphObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { EggPrototype } from '@eggjs/tegg-metadata';
import { ChatCheckpointSaverInjectName, ChatCheckpointSaverQualifierAttribute, GRAPH_EDGE_METADATA, GRAPH_NODE_METADATA, GraphEdgeMetadata, GraphMetadata, GraphNodeMetadata, IGraph, IGraphEdge, IGraphNode, TeggToolNode } from '@eggjs/tegg-langchain-decorator';
import { LangGraphTracer } from '../tracing/LangGraphTracer';
import { BaseCheckpointSaver, CompiledStateGraph } from '@langchain/langgraph';
import { Application } from 'egg';

export class CompiledStateGraphObject implements EggObject {
private status: EggObjectStatus = EggObjectStatus.PENDING;
Expand All @@ -19,17 +20,19 @@ export class CompiledStateGraphObject implements EggObject {
readonly proto: CompiledStateGraphProto;
readonly ctx: EggContext;
readonly daoName: string;
private _obj: object;
_obj: object;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _obj property was changed from public (default access modifier) to private. This is a good practice for encapsulation. However, in this PR, it has been changed back to public. Since it's only accessed within the class's methods (via a self closure in createHttpHandler), it can and should remain private to maintain proper encapsulation.

Suggested change
_obj: object;
private _obj: object;

readonly graphMetadata: GraphMetadata;
readonly graphName: string;
readonly app: Application;

constructor(name: EggObjectName, proto: CompiledStateGraphProto) {
constructor(name: EggObjectName, proto: CompiledStateGraphProto, app: Application) {
this.name = name;
this.proto = proto;
this.ctx = ContextHandler.getContext()!;
this.id = IdenticalUtil.createObjectId(this.proto.id, this.ctx?.id);
this.graphMetadata = proto.graphMetadata;
this.graphName = proto.graphName;
this.app = app;
}

async init() {
Expand Down Expand Up @@ -122,9 +125,11 @@ export class CompiledStateGraphObject implements EggObject {
return this._obj;
}

static async createObject(name: EggObjectName, proto: EggPrototype): Promise<CompiledStateGraphObject> {
const compiledStateGraphObject = new CompiledStateGraphObject(name, proto as CompiledStateGraphProto);
await compiledStateGraphObject.init();
return compiledStateGraphObject;
static createObject(app: Application) {
return async function(name: EggObjectName, proto: EggPrototype): Promise<CompiledStateGraphObject> {
const compiledStateGraphObject = new CompiledStateGraphObject(name, proto as CompiledStateGraphProto, app);
await compiledStateGraphObject.init();
return compiledStateGraphObject;
};
}
}
2 changes: 2 additions & 0 deletions plugin/langchain/lib/graph/CompiledStateGraphProto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ export class CompiledStateGraphProto implements EggPrototype {
readonly name: EggPrototypeName;
readonly graphMetadata: GraphMetadata;
readonly graphName: string;
readonly unitPath: string;

constructor(loadUnit: LoadUnit, protoName: string, graphName: string, graphMetadata: GraphMetadata) {
this.loadUnitId = loadUnit.id;
this.qualifiers = [];
this.name = protoName;
this.graphMetadata = graphMetadata;
this.graphName = graphName;
this.unitPath = loadUnit.unitPath;
this.id = IdenticalUtil.createProtoId(loadUnit.id, protoName);
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/langchain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"typescript": true
},
"engines": {
"node": ">=18.0.0"
"node": ">=20.0.0"
},
"dependencies": {
"@eggjs/egg-module-common": "^3.64.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ mcp:
clientName: barSse
version: 1.0.0
transportType: SSE
type: http
type: http

langchain:
agents:
FooGraph:
path: /graph/stream
type: http
stream: true
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ module.exports = function() {
enable: false,
},
},
bodyParser: {
enable: false,
},
};
return config;
};
55 changes: 55 additions & 0 deletions plugin/langchain/test/llm.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,60 @@ describe('plugin/langchain/test/llm.test.ts', () => {
.get('/llm/graph')
.expect(200, { value: 'hello graph toolhello world' });
});

it('should agent controller work', async () => {
const url = await app.httpRequest()
.post('/graph/stream').url;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ messages: [{ role: 'human', content: 'hello world' }] }),
});


if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}

if (!response.body) {
throw new Error('Response body is null');
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';

const messages: object[] = [];

try {
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read();

if (done) break;

buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';

lines.forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6);
try {
const parsed = JSON.parse(data);
messages.push(parsed);
} catch (e) {
throw e;
}
}
});
}
} finally {
reader.releaseLock();
}
assert(messages.length === 3);
});
}
});
23 changes: 23 additions & 0 deletions plugin/langchain/typings/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,34 @@ export const ChatModelConfigModuleConfigSchema = Type.Object({
name: 'ChatModel',
});


export const LangChainConfigSchema = Type.Object({
agents: Type.Record(Type.String(), Type.Object({
path: Type.Optional(Type.String({
description: 'http path',
})),
stream: Type.Optional(Type.Boolean({
description: '是否流式返回',
})),
type: Type.Optional(Type.String({
description: 'Http',
})),
timeout: Type.Optional(Type.Number({
description: '接口超时时间',
})),
})),
}, {
title: 'langchain 设置',
name: 'langchain',
});

export type ChatModelConfigModuleConfigType = Static<typeof ChatModelConfigModuleConfigSchema>;
export type LangChainConfigSchemaType = Static<typeof LangChainConfigSchema>;

declare module '@eggjs/tegg' {
export type LangChainModuleConfig = {
ChatModel?: ChatModelConfigModuleConfigType;
langchain?: LangChainConfigSchema;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Type error: Use the type alias instead of the schema constant.

LangChainConfigSchema is a runtime schema object, not a type. The property should use LangChainConfigSchemaType to represent the inferred TypeScript type.

   export type LangChainModuleConfig = {
     ChatModel?: ChatModelConfigModuleConfigType;
-    langchain?: LangChainConfigSchema;
+    langchain?: LangChainConfigSchemaType;
   };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
langchain?: LangChainConfigSchema;
langchain?: LangChainConfigSchemaType;
🤖 Prompt for AI Agents
In plugin/langchain/typings/index.d.ts around line 138, the declaration uses the
runtime schema constant LangChainConfigSchema where a TypeScript type is
required; replace LangChainConfigSchema with the inferred type alias
LangChainConfigSchemaType so the langchain? property is typed correctly,
updating the type annotation to use LangChainConfigSchemaType instead of the
schema constant.

};

export interface ModuleConfig extends LangChainModuleConfig {
Expand Down
Loading