From 21f21e34c9445cfd96349ae68ac573f23497587e Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 4 Dec 2025 15:21:10 +0800 Subject: [PATCH 01/13] Feat:update webhook component --- agent/canvas.py | 2 +- agent/component/base.py | 2 +- agent/component/webhook.py | 20 ++ api/apps/sdk/agents.py | 647 +++++++++++++++++++++++++++++++++++-- 4 files changed, 636 insertions(+), 35 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index b693ed43499..e3feee0161a 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -160,7 +160,7 @@ def get_tenant_id(self): return self._tenant_id def get_value_with_variable(self,value: str) -> Any: - pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*") + pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*") out_parts = [] last = 0 diff --git a/agent/component/base.py b/agent/component/base.py index 6ac95e09a97..dcb3b0136bb 100644 --- a/agent/component/base.py +++ b/agent/component/base.py @@ -394,7 +394,7 @@ def _warn_to_deprecate_param(self, param_name, descr, new_param): class ComponentBase(ABC): component_name: str thread_limiter = trio.CapacityLimiter(int(os.environ.get('MAX_CONCURRENT_CHATS', 10))) - variable_ref_patt = r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*" + variable_ref_patt = r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*" def __str__(self): """ diff --git a/agent/component/webhook.py b/agent/component/webhook.py index c707d455626..61caaa0bc65 100644 --- a/agent/component/webhook.py +++ b/agent/component/webhook.py @@ -23,6 +23,26 @@ class WebhookParam(ComponentParamBase): """ def __init__(self): super().__init__() + self.security = { + "auth_type": "none", + } + self.schema = {} + self.execution_mode = "Immediately" + self.response = {} + self.outputs = { + "query": { + "value": {}, + "type": "object" + }, + "headers": { + "value": {}, + "type": "object" + }, + "body": { + "value": {}, + "type": "object" + } + } def get_input_form(self) -> dict[str, dict]: return getattr(self, "inputs") diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 20e897388ff..13a4798baa7 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -14,14 +14,22 @@ # limitations under the License. # +import asyncio +import hashlib +import hmac +import ipaddress import json import logging +import re import time from typing import Any, cast +import jwt + from agent.canvas import Canvas from api.db import CanvasCategory from api.db.services.canvas_service import UserCanvasService +from api.db.services.file_service import FileService from api.db.services.user_canvas_version import UserCanvasVersionService from common.constants import RetCode from common.misc_utils import get_uuid @@ -132,48 +140,621 @@ def delete_agent(tenant_id: str, agent_id: str): UserCanvasService.delete_by_id(agent_id) return get_json_result(data=True) +_rate_limit_cache = {} -@manager.route('/webhook/', methods=['POST']) # noqa: F821 -@token_required -async def webhook(tenant_id: str, agent_id: str): - req = await get_request_json() - if not UserCanvasService.accessible(req["id"], tenant_id): - return get_json_result( - data=False, message='Only owner of canvas authorized for this operation.', - code=RetCode.OPERATING_ERROR) - - e, cvs = UserCanvasService.get_by_id(req["id"]) - if not e: - return get_data_error_result(message="canvas not found.") - - if not isinstance(cvs.dsl, str): - cvs.dsl = json.dumps(cvs.dsl, ensure_ascii=False) +@manager.route('/webhook/', methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 +async def webhook(agent_id: str): + # 1. Fetch canvas by agent_id + exists, cvs = UserCanvasService.get_by_id(agent_id) + if not exists: + return get_data_error_result(message="Canvas not found.") + # 2. Check canvas category if cvs.canvas_category == CanvasCategory.DataFlow: return get_data_error_result(message="Dataflow can not be triggered by webhook.") + # 3. Load DSL from canvas + dsl = getattr(cvs, "dsl", None) + if not isinstance(dsl, dict): + return get_data_error_result(message="Invalid DSL format.") + + # 4. Check webhook configuration in DSL + webhook_cfg = dsl.get("webhook", {}) + if not webhook_cfg: + return get_data_error_result(message="Webhook not configured for this agent.") + + # 5. Validate request method against webhook_cfg.methods + allowed_methods = webhook_cfg.get("methods", []) + request_method = request.method.upper() + if allowed_methods and request_method not in allowed_methods: + return get_data_error_result( + message=f"HTTP method '{request_method}' not allowed for this webhook." + ) + + # 6. Validate webhook security + async def validate_webhook_security(security_cfg: dict): + """Validate webhook security rules based on security configuration.""" + + if not security_cfg: + return # No security config → allowed by default + + # 1. Validate max body size + await _validate_max_body_size(security_cfg) + + # 2. Validate IP whitelist + _validate_ip_whitelist(security_cfg) + + # # 3. Validate rate limiting + _validate_rate_limit(security_cfg) + + # 4. Validate authentication + auth_type = security_cfg.get("auth_type", "none") + + if auth_type == "none": + return + + if auth_type == "token": + _validate_token_auth(security_cfg) + + elif auth_type == "basic": + _validate_basic_auth(security_cfg) + + elif auth_type == "jwt": + _validate_jwt_auth(security_cfg) + + elif auth_type == "hmac": + await _validate_hmac_auth(security_cfg) + + else: + raise Exception(f"Unsupported auth_type: {auth_type}") + + async def _validate_max_body_size(security_cfg): + """Check request size does not exceed max_body_size.""" + max_size = security_cfg.get("max_body_size") + if not max_size: + return + + # Convert "10MB" → bytes + units = {"kb": 1024, "mb": 1024**2, "gb": 1024**3} + size_str = max_size.lower() + + for suffix, factor in units.items(): + if size_str.endswith(suffix): + limit = int(size_str.replace(suffix, "")) * factor + break + else: + raise Exception("Invalid max_body_size format") + + content_length = request.content_length or 0 + if content_length > limit: + raise Exception(f"Request body too large: {content_length} > {limit}") + + def _validate_ip_whitelist(security_cfg): + """Allow only IPs listed in ip_whitelist.""" + whitelist = security_cfg.get("ip_whitelist", []) + if not whitelist: + return + + client_ip = request.remote_addr + + + for rule in whitelist: + if "/" in rule: + # CIDR notation + if ipaddress.ip_address(client_ip) in ipaddress.ip_network(rule, strict=False): + return + else: + # Single IP + if client_ip == rule: + return + + raise Exception(f"IP {client_ip} is not allowed by whitelist") + + def _validate_rate_limit(security_cfg): + """Simple in-memory rate limiting.""" + rl = security_cfg.get("rate_limit") + if not rl: + return + + limit = rl.get("limit", 60) + per = rl.get("per", "minute") + + window = {"second": 1, "minute": 60, "hour": 3600}.get(per, 60) + key = f"rl:{agent_id}" + + now = int(time.time()) + bucket = _rate_limit_cache.get(key, {"ts": now, "count": 0}) + + # Reset window + if now - bucket["ts"] > window: + bucket = {"ts": now, "count": 0} + + bucket["count"] += 1 + _rate_limit_cache[key] = bucket + + if bucket["count"] > limit: + raise Exception("Too many requests (rate limit exceeded)") + + def _validate_token_auth(security_cfg): + """Validate header-based token authentication.""" + header = security_cfg.get("token_header") + token_value = security_cfg.get("token_value") + + provided = request.headers.get(header) + if provided != token_value: + raise Exception("Invalid token authentication") + + def _validate_basic_auth(security_cfg): + """Validate HTTP Basic Auth credentials.""" + auth_cfg = security_cfg.get("basic_auth", {}) + username = auth_cfg.get("username") + password = auth_cfg.get("password") + + auth = request.authorization + if not auth or auth.username != username or auth.password != password: + raise Exception("Invalid Basic Auth credentials") + + def _validate_jwt_auth(security_cfg): + """Validate JWT token in Authorization header.""" + jwt_cfg = security_cfg.get("jwt", {}) + secret = jwt_cfg.get("secret") + required_claims = jwt_cfg.get("required_claims", []) + + auth_header = request.headers.get("Authorization", "") + if not auth_header.startswith("Bearer "): + raise Exception("Missing Bearer token") + + token = auth_header.replace("Bearer ", "") + + try: + decoded = jwt.decode( + token, + secret, + algorithms=[jwt_cfg.get("algorithm", "HS256")], + audience=jwt_cfg.get("audience"), + issuer=jwt_cfg.get("issuer"), + ) + except Exception as e: + raise Exception(f"Invalid JWT: {str(e)}") + + for claim in required_claims: + if claim not in decoded: + raise Exception(f"Missing JWT claim: {claim}") + + async def _validate_hmac_auth(security_cfg): + """Validate HMAC signature from header.""" + hmac_cfg = security_cfg.get("hmac", {}) + header = hmac_cfg.get("header") + secret = hmac_cfg.get("secret") + algorithm = hmac_cfg.get("algorithm", "sha256") + + provided_sig = request.headers.get(header) + if not provided_sig: + raise Exception("Missing HMAC signature header") + + body = await request.get_data() + if body is None: + body = b"" + elif isinstance(body, str): + body = body.encode("utf-8") + + computed = hmac.new(secret.encode(), body, getattr(hashlib, algorithm)).hexdigest() + + if not hmac.compare_digest(provided_sig, computed): + raise Exception("Invalid HMAC signature") + + try: + await validate_webhook_security(webhook_cfg.get("security", {})) + except Exception as e: + return get_data_error_result(message=str(e)) + + # 7. Parse request body + async def parse_webhook_request(): + """Parse request based on content-type and return structured data.""" + + # 1. Parse query parameters + query_data = {} + for k, v in request.args.items(): + query_data[k] = v + + # 2. Parse headers + header_data = {} + for k, v in request.headers.items(): + header_data[k] = v + + # 3. Parse body based on content-type + ctype = request.headers.get("Content-Type", "").split(";")[0].strip() + raw_files = {} + + if ctype == "application/json": + try: + body_data = await request.get_json() + except: + body_data = None + + elif ctype == "multipart/form-data": + form = await request.form + files = await request.files + raw_files = {name: file for name, file in files.items()} + body_data = { + "form": dict(form), + "files": {name: file.filename for name, file in files.items()}, + } + + elif ctype == "application/x-www-form-urlencoded": + form = await request.form + body_data = dict(form) + + elif ctype == "text/plain": + body_data = (await request.get_data()).decode() + + elif ctype == "application/octet-stream": + body_data = await request.get_data() # raw binary + + else: + # unknown content type → raw body + body_data = await request.get_data() + + return { + "query": query_data, + "headers": header_data, + "body": body_data, + "content_type": ctype, + "raw_files": raw_files + } + + def extract_by_schema(data, schema, name="section"): + """ + Extract only fields defined in schema. + Required fields must exist. + Optional fields default to type-based default values. + Type validation included. + """ + if schema.get("type") != "object": + return {} + + props = schema.get("properties", {}) + required = schema.get("required", []) + + extracted = {} + + for field, field_schema in props.items(): + field_type = field_schema.get("type") + + # 1. Required field missing + if field in required and field not in data: + raise Exception(f"{name} missing required field: {field}") + + # 2. Optional → default value + if field not in data: + extracted[field] = default_for_type(field_type) + continue + + raw_value = data[field] + + # 3. Auto convert value + try: + value = auto_cast_value(raw_value, field_type) + except Exception as e: + raise Exception(f"{name}.{field} auto-cast failed: {str(e)}") + + # 4. Type validation + if not validate_type(value, field_type): + raise Exception( + f"{name}.{field} type mismatch: expected {field_type}, got {type(value).__name__}" + ) + + extracted[field] = value + + return extracted + + + def default_for_type(t): + """Return default value for the given schema type.""" + if t == "file": + return "" + if t == "object": + return {} + if t == "boolean": + return False + if t == "number": + return 0 + if t == "string": + return "" + if t and t.startswith("array"): + return [] + if t == "null": + return None + return None + + def auto_cast_value(value, expected_type): + """Convert string values into schema type when possible.""" + + # Non-string values already good + if not isinstance(value, str): + return value + + v = value.strip() + + # Boolean + if expected_type == "boolean": + if v.lower() in ["true", "1"]: + return True + if v.lower() in ["false", "0"]: + return False + raise Exception(f"Cannot convert '{value}' to boolean") + + # Number + if expected_type == "number": + # integer + if v.isdigit() or (v.startswith("-") and v[1:].isdigit()): + return int(v) + + # float + try: + return float(v) + except: + raise Exception(f"Cannot convert '{value}' to number") + + # Object + if expected_type == "object": + try: + parsed = json.loads(v) + if isinstance(parsed, dict): + return parsed + else: + raise Exception("JSON is not an object") + except: + raise Exception(f"Cannot convert '{value}' to object") + + # Array + if expected_type.startswith("array"): + try: + parsed = json.loads(v) + if isinstance(parsed, list): + return parsed + else: + raise Exception("JSON is not an array") + except: + raise Exception(f"Cannot convert '{value}' to array") + + # String (accept original) + if expected_type == "string": + return value + + # File + if expected_type == "file": + return value + # Default: do nothing + return value + + + def validate_type(value, t): + """Validate value type against schema type t.""" + if t == "file": + return isinstance(value, str) + + if t == "string": + return isinstance(value, str) + + if t == "number": + return isinstance(value, (int, float)) + + if t == "boolean": + return isinstance(value, bool) + + if t == "object": + return isinstance(value, dict) + + # array / array / array + if t.startswith("array"): + if not isinstance(value, list): + return False + + if "<" in t and ">" in t: + inner = t[t.find("<") + 1 : t.find(">")] + + # Check each element type + for item in value: + if not validate_type(item, inner): + return False + + return True + + return True + + def extract_files_by_schema(raw_files, schema, name="files"): + """ + Extract and validate files based on schema. + Only supports type = file (single file). + Does NOT support array. + """ + + if schema.get("type") != "object": + return {} + + props = schema.get("properties", {}) + required = schema.get("required", []) + + cleaned = [] + + for field, field_schema in props.items(): + field_type = field_schema.get("type") + + # 1. Required field must exist + if field in required and field not in raw_files: + raise Exception(f"{name} missing required file field: {field}") + + # 2. Ignore fields that are not file + if field_type != "file": + continue + + # 3. Extract single file + file_obj = raw_files.get(field) + + if file_obj: + cleaned.append({ + "field": field, + "file": file_obj + }) + return cleaned + + parsed = await parse_webhook_request() + SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) + + # Extract strictly by schema + query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") + header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") + body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") + files_clean = extract_files_by_schema(parsed["raw_files"], SCHEMA.get("body", {}), name="files") + + uploaded_files = [] + for item in files_clean: # each {field, file} + file_obj = item["file"] + desc = FileService.upload_info( + cvs.user_id, # user + file_obj, # FileStorage + None # url (None for webhook) + ) + uploaded_files.append(desc) + + clean_request = { + "query": query_clean, + "headers": header_clean, + "body": body_clean + } + + if not isinstance(cvs.dsl, str): + dsl = json.dumps(cvs.dsl, ensure_ascii=False) try: - canvas = Canvas(cvs.dsl, tenant_id, agent_id) + canvas = Canvas(dsl, cvs.user_id, agent_id) except Exception as e: return get_json_result( data=False, message=str(e), code=RetCode.EXCEPTION_ERROR) - async def sse(): - nonlocal canvas - try: - async for ans in canvas.run(query=req.get("query", ""), files=req.get("files", []), user_id=req.get("user_id", tenant_id), webhook_payload=req): - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + execution_mode = webhook_cfg.get("execution_mode", "Immediately") + response_cfg = webhook_cfg.get("response", {}) - cvs.dsl = json.loads(str(canvas)) - UserCanvasService.update_by_id(req["id"], cvs.to_dict()) - except Exception as e: - logging.exception(e) - yield "data:" + json.dumps({"code": 500, "message": str(e), "data": False}, ensure_ascii=False) + "\n\n" - - resp = Response(sse(), mimetype="text/event-stream") - resp.headers.add_header("Cache-control", "no-cache") - resp.headers.add_header("Connection", "keep-alive") - resp.headers.add_header("X-Accel-Buffering", "no") - resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") - return resp + if execution_mode == "Immediately": + status = response_cfg.get("status", 200) + headers_tpl = response_cfg.get("headers_template", {}) + body_tpl = response_cfg.get("body_template", {}) + + + placeholder_pattern = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*") + def extract_placeholder_value(placeholder: str, clean_request: dict): + """ + Extract values from clean_request using placeholders like: + {webhook@body.payload} + {webhook@query.event} + {webhook@headers.X-Trace-ID} + """ + + # Example placeholder: webhook@body.payload + if "@" in placeholder: + prefix, path = placeholder.split("@", 1) + + return get_from_request_path(clean_request, path) + + # sys.xxx / env.xxx handled by canvas, do not resolve here + return None + + def get_from_request_path(clean_request: dict, path: str): + """ + Resolve path like: + body.payload + query.event + headers.X-Token + """ + + parts = path.split(".") + if len(parts) == 0: + return None + + root = parts[0] # body / query / headers + + if root not in clean_request: + return None + + value = clean_request[root] + + for p in parts[1:]: + if isinstance(value, dict) and p in value: + value = value[p] + else: + return None + + return value + + def render_template(text: str, clean_request: dict): + matches = placeholder_pattern.findall(text) + results = {} + + for m in matches: + val = extract_placeholder_value(m, clean_request) + results[m] = val + + return results + # Render "{xxx@query.xxx}" syntax + headers = render_template(headers_tpl, clean_request) + body = render_template(body_tpl, clean_request) + + resp = Response( + json.dumps(body, ensure_ascii=False), + status=status, + content_type="application/json" + ) + + # Add custom headers + for k, v in headers.items(): + resp.headers[k] = v + async def background_run(): + try: + async for _ in canvas.run( + query="", + files=uploaded_files, + user_id=cvs.user_id, + webhook_payload=clean_request + ): + pass # or log/save ans + + cvs.dsl = json.loads(str(canvas)) + UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) + + except Exception as e: + logging.exception(f"Webhook background run failed: {e}") + + asyncio.create_task(background_run()) + return resp + else: + + async def sse(): + nonlocal canvas + + try: + async for ans in canvas.run( + query="", + files=uploaded_files, + user_id=cvs.user_id, + webhook_payload=clean_request + ): + yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + + # save updated canvas + cvs.dsl = json.loads(str(canvas)) + UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) + + except Exception as e: + logging.exception(e) + yield "data:" + json.dumps( + {"code": 500, "message": str(e), "data": False}, + ensure_ascii=False + ) + "\n\n" + + resp = Response(sse(), mimetype="text/event-stream") + resp.headers.add_header("Cache-control", "no-cache") + resp.headers.add_header("Connection", "keep-alive") + resp.headers.add_header("X-Accel-Buffering", "no") + resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") + return resp From 40389244156465d1352abbfc373982e7f6076296 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 11 Dec 2025 19:01:45 +0800 Subject: [PATCH 02/13] update --- agent/component/begin.py | 2 +- api/apps/sdk/agents.py | 37 +++++++++++++++++++++++++++---------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/agent/component/begin.py b/agent/component/begin.py index 1314aff748c..bcbfdbf24b7 100644 --- a/agent/component/begin.py +++ b/agent/component/begin.py @@ -28,7 +28,7 @@ def __init__(self): self.prologue = "Hi! I'm your smart assistant. What can I do for you?" def check(self): - self.check_valid_value(self.mode, "The 'mode' should be either `conversational` or `task`", ["conversational", "task"]) + self.check_valid_value(self.mode, "The 'mode' should be either `conversational` or `task`", ["conversational", "task","Webhook"]) def get_input_form(self) -> dict[str, dict]: return getattr(self, "inputs") diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 13a4798baa7..2f5186a31b9 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -159,7 +159,12 @@ async def webhook(agent_id: str): return get_data_error_result(message="Invalid DSL format.") # 4. Check webhook configuration in DSL - webhook_cfg = dsl.get("webhook", {}) + components = dsl.get("components", {}) + for k, _ in components.items(): + cpn_obj = components[k]["obj"] + if cpn_obj["component_name"].lower() == "begin" and cpn_obj["params"]["mode"] == "Webhook": + webhook_cfg = cpn_obj["params"] + if not webhook_cfg: return get_data_error_result(message="Webhook not configured for this agent.") @@ -259,7 +264,7 @@ def _validate_rate_limit(security_cfg): limit = rl.get("limit", 60) per = rl.get("per", "minute") - window = {"second": 1, "minute": 60, "hour": 3600}.get(per, 60) + window = {"second": 1, "minute": 60, "hour": 3600, "day": 86400}.get(per, 60) key = f"rl:{agent_id}" now = int(time.time()) @@ -277,8 +282,9 @@ def _validate_rate_limit(security_cfg): def _validate_token_auth(security_cfg): """Validate header-based token authentication.""" - header = security_cfg.get("token_header") - token_value = security_cfg.get("token_value") + token_cfg = security_cfg.get("token",{}) + header = token_cfg.get("token_header") + token_value = token_cfg.get("token_value") provided = request.headers.get(header) if provided != token_value: @@ -344,7 +350,8 @@ async def _validate_hmac_auth(security_cfg): raise Exception("Invalid HMAC signature") try: - await validate_webhook_security(webhook_cfg.get("security", {})) + security_config=webhook_cfg.get("security", {}) + await validate_webhook_security(security_config) except Exception as e: return get_data_error_result(message=str(e)) @@ -687,15 +694,25 @@ def get_from_request_path(clean_request: dict, path: str): return value - def render_template(text: str, clean_request: dict): - matches = placeholder_pattern.findall(text) - results = {} + def render_template(tpl, clean_request: dict): + if isinstance(tpl, dict): + return {k: render_template(v, clean_request) for k, v in tpl.items()} + + if isinstance(tpl, list): + return [render_template(item, clean_request) for item in tpl] + + if not isinstance(tpl, str): + return tpl + + matches = placeholder_pattern.findall(tpl) + rendered = tpl for m in matches: val = extract_placeholder_value(m, clean_request) - results[m] = val + rendered = rendered.replace(f"{{{m}}}", str(val)) + + return rendered - return results # Render "{xxx@query.xxx}" syntax headers = render_template(headers_tpl, clean_request) body = render_template(body_tpl, clean_request) From f9239a4c827616aa3f23d3dcb685a018e2e5be4c Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 15 Dec 2025 09:35:51 +0800 Subject: [PATCH 03/13] update --- api/apps/sdk/agents.py | 140 +++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 81 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 2f5186a31b9..bc83a1cbb43 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -310,22 +310,61 @@ def _validate_jwt_auth(security_cfg): if not auth_header.startswith("Bearer "): raise Exception("Missing Bearer token") - token = auth_header.replace("Bearer ", "") + token = auth_header[len("Bearer "):].strip() + if not token: + raise Exception("Empty Bearer token") + alg = (jwt_cfg.get("algorithm") or "HS256").upper() + + decode_kwargs = { + "key": secret, + "algorithms": [alg], + } + options = {} + if jwt_cfg.get("audience"): + decode_kwargs["audience"] = jwt_cfg["audience"] + options["verify_aud"] = True + else: + options["verify_aud"] = False + + if jwt_cfg.get("issuer"): + decode_kwargs["issuer"] = jwt_cfg["issuer"] + options["verify_iss"] = True + else: + options["verify_iss"] = False try: decoded = jwt.decode( token, - secret, - algorithms=[jwt_cfg.get("algorithm", "HS256")], - audience=jwt_cfg.get("audience"), - issuer=jwt_cfg.get("issuer"), + options=options, + **decode_kwargs, ) except Exception as e: raise Exception(f"Invalid JWT: {str(e)}") - for claim in required_claims: - if claim not in decoded: - raise Exception(f"Missing JWT claim: {claim}") + raw_required_claims = jwt_cfg.get("required_claims", []) + if isinstance(raw_required_claims, str): + required_claims = [raw_required_claims] + elif isinstance(raw_required_claims, (list, tuple, set)): + required_claims = list(raw_required_claims) + else: + required_claims = [] + + required_claims = [ + c for c in required_claims + if isinstance(c, str) and c.strip() + ] + + # RESERVED_CLAIMS = {"exp", "sub", "aud", "iss", "nbf", "iat"} + # for claim in required_claims: + # if claim in RESERVED_CLAIMS: + # raise Exception(f"Reserved JWT claim cannot be required: {claim}") + + # for claim in required_claims: + # if claim not in decoded: + # raise Exception(f"Missing JWT claim: {claim}") + + return decoded + async def _validate_hmac_auth(security_cfg): """Validate HMAC signature from header.""" @@ -645,87 +684,26 @@ def extract_files_by_schema(raw_files, schema, name="files"): if execution_mode == "Immediately": status = response_cfg.get("status", 200) - headers_tpl = response_cfg.get("headers_template", {}) - body_tpl = response_cfg.get("body_template", {}) - + body_tpl = response_cfg.get("body_template", "") - placeholder_pattern = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*") - def extract_placeholder_value(placeholder: str, clean_request: dict): - """ - Extract values from clean_request using placeholders like: - {webhook@body.payload} - {webhook@query.event} - {webhook@headers.X-Trace-ID} - """ - - # Example placeholder: webhook@body.payload - if "@" in placeholder: - prefix, path = placeholder.split("@", 1) - - return get_from_request_path(clean_request, path) - - # sys.xxx / env.xxx handled by canvas, do not resolve here - return None + def parse_body(body: str): + if not body: + return None, "application/json" - def get_from_request_path(clean_request: dict, path: str): - """ - Resolve path like: - body.payload - query.event - headers.X-Token - """ - - parts = path.split(".") - if len(parts) == 0: - return None - - root = parts[0] # body / query / headers - - if root not in clean_request: - return None - - value = clean_request[root] - - for p in parts[1:]: - if isinstance(value, dict) and p in value: - value = value[p] - else: - return None - - return value - - def render_template(tpl, clean_request: dict): - if isinstance(tpl, dict): - return {k: render_template(v, clean_request) for k, v in tpl.items()} - - if isinstance(tpl, list): - return [render_template(item, clean_request) for item in tpl] - - if not isinstance(tpl, str): - return tpl - - matches = placeholder_pattern.findall(tpl) - rendered = tpl - - for m in matches: - val = extract_placeholder_value(m, clean_request) - rendered = rendered.replace(f"{{{m}}}", str(val)) - - return rendered + try: + parsed = json.loads(body) + return parsed, "application/json" + except (json.JSONDecodeError, TypeError): + return body, "text/plain" - # Render "{xxx@query.xxx}" syntax - headers = render_template(headers_tpl, clean_request) - body = render_template(body_tpl, clean_request) + body, content_type = parse_body(body_tpl) resp = Response( - json.dumps(body, ensure_ascii=False), + json.dumps(body, ensure_ascii=False) if content_type == "application/json" else body, status=status, - content_type="application/json" + content_type=content_type, ) - # Add custom headers - for k, v in headers.items(): - resp.headers[k] = v async def background_run(): try: async for _ in canvas.run( From 9e904c254a1b4506e6445e1f5dc0b81e03128090 Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 15 Dec 2025 13:26:03 +0800 Subject: [PATCH 04/13] update --- agent/canvas.py | 2 +- api/apps/sdk/agents.py | 216 +++++++++++++---------------------------- 2 files changed, 71 insertions(+), 147 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 15936f52680..70ea6e45cc8 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -368,7 +368,7 @@ async def run(self, **kwargs): if kwargs.get("webhook_payload"): for k, cpn in self.components.items(): - if self.components[k]["obj"].component_name.lower() == "webhook": + if self.components[k]["obj"].component_name.lower() == "begin" and self.components[k]["obj"]._param.mode == "Webhook": for kk, vv in kwargs["webhook_payload"].items(): self.components[k]["obj"].set_output(kk, vv) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index bc83a1cbb43..a3426c39993 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -15,12 +15,9 @@ # import asyncio -import hashlib -import hmac import ipaddress import json import logging -import re import time from typing import Any, cast @@ -207,9 +204,6 @@ async def validate_webhook_security(security_cfg: dict): elif auth_type == "jwt": _validate_jwt_auth(security_cfg) - elif auth_type == "hmac": - await _validate_hmac_auth(security_cfg) - else: raise Exception(f"Unsupported auth_type: {auth_type}") @@ -365,88 +359,85 @@ def _validate_jwt_auth(security_cfg): return decoded - - async def _validate_hmac_auth(security_cfg): - """Validate HMAC signature from header.""" - hmac_cfg = security_cfg.get("hmac", {}) - header = hmac_cfg.get("header") - secret = hmac_cfg.get("secret") - algorithm = hmac_cfg.get("algorithm", "sha256") - - provided_sig = request.headers.get(header) - if not provided_sig: - raise Exception("Missing HMAC signature header") - - body = await request.get_data() - if body is None: - body = b"" - elif isinstance(body, str): - body = body.encode("utf-8") - - computed = hmac.new(secret.encode(), body, getattr(hashlib, algorithm)).hexdigest() - - if not hmac.compare_digest(provided_sig, computed): - raise Exception("Invalid HMAC signature") - try: security_config=webhook_cfg.get("security", {}) await validate_webhook_security(security_config) except Exception as e: return get_data_error_result(message=str(e)) + if not isinstance(cvs.dsl, str): + dsl = json.dumps(cvs.dsl, ensure_ascii=False) + try: + canvas = Canvas(dsl, cvs.user_id, agent_id) + except Exception as e: + return get_json_result( + data=False, message=str(e), + code=RetCode.EXCEPTION_ERROR) # 7. Parse request body - async def parse_webhook_request(): + async def parse_webhook_request(content_type): """Parse request based on content-type and return structured data.""" - # 1. Parse query parameters - query_data = {} - for k, v in request.args.items(): - query_data[k] = v + # 1. Query + query_data = {k: v for k, v in request.args.items()} - # 2. Parse headers - header_data = {} - for k, v in request.headers.items(): - header_data[k] = v + # 2. Headers + header_data = {k: v for k, v in request.headers.items()} - # 3. Parse body based on content-type + # 3. Body ctype = request.headers.get("Content-Type", "").split(";")[0].strip() - raw_files = {} + if ctype != content_type: + raise ValueError( + f"Invalid Content-Type: expect '{content_type}', got '{ctype or 'empty'}'" + ) - if ctype == "application/json": - try: - body_data = await request.get_json() - except: - body_data = None + body_data: dict = {} - elif ctype == "multipart/form-data": - form = await request.form - files = await request.files - raw_files = {name: file for name, file in files.items()} - body_data = { - "form": dict(form), - "files": {name: file.filename for name, file in files.items()}, - } + try: + if ctype == "application/json": + body_data = await request.get_json() or {} - elif ctype == "application/x-www-form-urlencoded": - form = await request.form - body_data = dict(form) + elif ctype == "multipart/form-data": + nonlocal canvas + form = await request.form + files = await request.files - elif ctype == "text/plain": - body_data = (await request.get_data()).decode() + body_data = {} - elif ctype == "application/octet-stream": - body_data = await request.get_data() # raw binary + for key, value in form.items(): + body_data[key] = value - else: - # unknown content type → raw body - body_data = await request.get_data() + for key, file in files.items(): + desc = FileService.upload_info( + cvs.user_id, # user + file, # FileStorage + None # url (None for webhook) + ) + file_parsed= await canvas.get_files_async([desc]) + body_data[key] = file_parsed + + elif ctype == "application/x-www-form-urlencoded": + form = await request.form + body_data = dict(form) + + else: + # text/plain / octet-stream / empty / unknown + raw = await request.get_data() + if raw: + try: + body_data = json.loads(raw.decode("utf-8")) + except Exception: + body_data = {} + else: + body_data = {} + + except Exception: + body_data = {} return { "query": query_data, "headers": header_data, "body": body_data, "content_type": ctype, - "raw_files": raw_files } def extract_by_schema(data, schema, name="section"): @@ -456,9 +447,6 @@ def extract_by_schema(data, schema, name="section"): Optional fields default to type-based default values. Type validation included. """ - if schema.get("type") != "object": - return {} - props = schema.get("properties", {}) required = schema.get("required", []) @@ -498,7 +486,7 @@ def extract_by_schema(data, schema, name="section"): def default_for_type(t): """Return default value for the given schema type.""" if t == "file": - return "" + return [] if t == "object": return {} if t == "boolean": @@ -578,7 +566,7 @@ def auto_cast_value(value, expected_type): def validate_type(value, t): """Validate value type against schema type t.""" if t == "file": - return isinstance(value, str) + return isinstance(value, list) if t == "string": return isinstance(value, str) @@ -609,60 +597,13 @@ def validate_type(value, t): return True - def extract_files_by_schema(raw_files, schema, name="files"): - """ - Extract and validate files based on schema. - Only supports type = file (single file). - Does NOT support array. - """ - - if schema.get("type") != "object": - return {} - - props = schema.get("properties", {}) - required = schema.get("required", []) - - cleaned = [] - - for field, field_schema in props.items(): - field_type = field_schema.get("type") - - # 1. Required field must exist - if field in required and field not in raw_files: - raise Exception(f"{name} missing required file field: {field}") - - # 2. Ignore fields that are not file - if field_type != "file": - continue - - # 3. Extract single file - file_obj = raw_files.get(field) - - if file_obj: - cleaned.append({ - "field": field, - "file": file_obj - }) - return cleaned - - parsed = await parse_webhook_request() + parsed = await parse_webhook_request(webhook_cfg.get("content_types")) SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) # Extract strictly by schema query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") - files_clean = extract_files_by_schema(parsed["raw_files"], SCHEMA.get("body", {}), name="files") - - uploaded_files = [] - for item in files_clean: # each {field, file} - file_obj = item["file"] - desc = FileService.upload_info( - cvs.user_id, # user - file_obj, # FileStorage - None # url (None for webhook) - ) - uploaded_files.append(desc) clean_request = { "query": query_clean, @@ -670,15 +611,6 @@ def extract_files_by_schema(raw_files, schema, name="files"): "body": body_clean } - if not isinstance(cvs.dsl, str): - dsl = json.dumps(cvs.dsl, ensure_ascii=False) - try: - canvas = Canvas(dsl, cvs.user_id, agent_id) - except Exception as e: - return get_json_result( - data=False, message=str(e), - code=RetCode.EXCEPTION_ERROR) - execution_mode = webhook_cfg.get("execution_mode", "Immediately") response_cfg = webhook_cfg.get("response", {}) @@ -708,7 +640,6 @@ async def background_run(): try: async for _ in canvas.run( query="", - files=uploaded_files, user_id=cvs.user_id, webhook_payload=clean_request ): @@ -723,33 +654,26 @@ async def background_run(): asyncio.create_task(background_run()) return resp else: - async def sse(): nonlocal canvas + contents: list[str] = [] try: async for ans in canvas.run( query="", - files=uploaded_files, user_id=cvs.user_id, - webhook_payload=clean_request + webhook_payload=clean_request, ): - yield "data:" + json.dumps(ans, ensure_ascii=False) + "\n\n" + if ans.get("event") == "message": + content = (ans.get("data") or {}).get("content") + if content: + contents.append(content) - # save updated canvas - cvs.dsl = json.loads(str(canvas)) - UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) + final_content = "".join(contents) + yield json.dumps(final_content, ensure_ascii=False) except Exception as e: - logging.exception(e) - yield "data:" + json.dumps( - {"code": 500, "message": str(e), "data": False}, - ensure_ascii=False - ) + "\n\n" - - resp = Response(sse(), mimetype="text/event-stream") - resp.headers.add_header("Cache-control", "no-cache") - resp.headers.add_header("Connection", "keep-alive") - resp.headers.add_header("X-Accel-Buffering", "no") - resp.headers.add_header("Content-Type", "text/event-stream; charset=utf-8") - return resp + yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) + + resp = Response(sse(), mimetype="application/json") + return resp \ No newline at end of file From d2ef9099a7c76eec3f0995c1c6cc29cabb9237b7 Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 15 Dec 2025 13:31:58 +0800 Subject: [PATCH 05/13] update --- api/apps/sdk/agents.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index a3426c39993..fc589a6dfe9 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -348,14 +348,14 @@ def _validate_jwt_auth(security_cfg): if isinstance(c, str) and c.strip() ] - # RESERVED_CLAIMS = {"exp", "sub", "aud", "iss", "nbf", "iat"} - # for claim in required_claims: - # if claim in RESERVED_CLAIMS: - # raise Exception(f"Reserved JWT claim cannot be required: {claim}") - - # for claim in required_claims: - # if claim not in decoded: - # raise Exception(f"Missing JWT claim: {claim}") + RESERVED_CLAIMS = {"exp", "sub", "aud", "iss", "nbf", "iat"} + for claim in required_claims: + if claim in RESERVED_CLAIMS: + raise Exception(f"Reserved JWT claim cannot be required: {claim}") + + for claim in required_claims: + if claim not in decoded: + raise Exception(f"Missing JWT claim: {claim}") return decoded From 7f40dfbf337adbfaeef078013002ac71c81433b9 Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 15 Dec 2025 13:36:21 +0800 Subject: [PATCH 06/13] update --- api/apps/sdk/agents.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index fc589a6dfe9..5bb487c5d97 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -527,7 +527,7 @@ def auto_cast_value(value, expected_type): # float try: return float(v) - except: + except Exception: raise Exception(f"Cannot convert '{value}' to number") # Object @@ -538,7 +538,7 @@ def auto_cast_value(value, expected_type): return parsed else: raise Exception("JSON is not an object") - except: + except Exception: raise Exception(f"Cannot convert '{value}' to object") # Array @@ -549,7 +549,7 @@ def auto_cast_value(value, expected_type): return parsed else: raise Exception("JSON is not an array") - except: + except Exception: raise Exception(f"Cannot convert '{value}' to array") # String (accept original) From 4753e3c49cf36c9cfe3e63689e082675be1e34a2 Mon Sep 17 00:00:00 2001 From: buua436 Date: Mon, 15 Dec 2025 16:50:51 +0800 Subject: [PATCH 07/13] update --- api/apps/sdk/agents.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 5bb487c5d97..89f58d33844 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -144,16 +144,16 @@ async def webhook(agent_id: str): # 1. Fetch canvas by agent_id exists, cvs = UserCanvasService.get_by_id(agent_id) if not exists: - return get_data_error_result(message="Canvas not found.") + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Canvas not found."),RetCode.BAD_REQUEST # 2. Check canvas category if cvs.canvas_category == CanvasCategory.DataFlow: - return get_data_error_result(message="Dataflow can not be triggered by webhook.") + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Dataflow can not be triggered by webhook."),RetCode.BAD_REQUEST # 3. Load DSL from canvas dsl = getattr(cvs, "dsl", None) if not isinstance(dsl, dict): - return get_data_error_result(message="Invalid DSL format.") + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Invalid DSL format."),RetCode.BAD_REQUEST # 4. Check webhook configuration in DSL components = dsl.get("components", {}) @@ -163,15 +163,15 @@ async def webhook(agent_id: str): webhook_cfg = cpn_obj["params"] if not webhook_cfg: - return get_data_error_result(message="Webhook not configured for this agent.") + return get_data_error_result(code=RetCode.BAD_REQUEST,message="Webhook not configured for this agent."),RetCode.BAD_REQUEST # 5. Validate request method against webhook_cfg.methods allowed_methods = webhook_cfg.get("methods", []) request_method = request.method.upper() if allowed_methods and request_method not in allowed_methods: return get_data_error_result( - message=f"HTTP method '{request_method}' not allowed for this webhook." - ) + code=RetCode.BAD_REQUEST,message=f"HTTP method '{request_method}' not allowed for this webhook." + ),RetCode.BAD_REQUEST # 6. Validate webhook security async def validate_webhook_security(security_cfg: dict): @@ -363,7 +363,7 @@ def _validate_jwt_auth(security_cfg): security_config=webhook_cfg.get("security", {}) await validate_webhook_security(security_config) except Exception as e: - return get_data_error_result(message=str(e)) + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST if not isinstance(cvs.dsl, str): dsl = json.dumps(cvs.dsl, ensure_ascii=False) try: @@ -371,7 +371,7 @@ def _validate_jwt_auth(security_cfg): except Exception as e: return get_json_result( data=False, message=str(e), - code=RetCode.EXCEPTION_ERROR) + code=RetCode.EXCEPTION_ERROR), RetCode.EXCEPTION_ERROR # 7. Parse request body async def parse_webhook_request(content_type): @@ -601,9 +601,12 @@ def validate_type(value, t): SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) # Extract strictly by schema - query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") - header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") - body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") + try: + query_clean = extract_by_schema(parsed["query"], SCHEMA.get("query", {}), name="query") + header_clean = extract_by_schema(parsed["headers"], SCHEMA.get("headers", {}), name="headers") + body_clean = extract_by_schema(parsed["body"], SCHEMA.get("body", {}), name="body") + except Exception as e: + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)),RetCode.BAD_REQUEST clean_request = { "query": query_clean, @@ -664,8 +667,12 @@ async def sse(): user_id=cvs.user_id, webhook_payload=clean_request, ): - if ans.get("event") == "message": - content = (ans.get("data") or {}).get("content") + if ans["event"] == "message": + content = ans["data"]["content"] + if ans["data"].get("start_to_think", False): + content = "" + elif ans["data"].get("end_to_think", False): + content = "" if content: contents.append(content) From 4c1de20b96c6a3bd873b9b3c76fd406a68ea5330 Mon Sep 17 00:00:00 2001 From: buua436 Date: Tue, 16 Dec 2025 11:24:06 +0800 Subject: [PATCH 08/13] update webhook trace --- api/apps/sdk/agents.py | 193 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 188 insertions(+), 5 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 89f58d33844..596f1fc55f1 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -33,6 +33,7 @@ from api.utils.api_utils import get_data_error_result, get_error_data_result, get_json_result, get_request_json, token_required from api.utils.api_utils import get_result from quart import request, Response +from rag.utils.redis_conn import REDIS_CONN @manager.route('/agents', methods=['GET']) # noqa: F821 @@ -137,10 +138,17 @@ def delete_agent(tenant_id: str, agent_id: str): UserCanvasService.delete_by_id(agent_id) return get_json_result(data=True) + _rate_limit_cache = {} -@manager.route('/webhook/', methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 +@manager.route("/webhook/", methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 +@manager.route("/webhook_test/",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) async def webhook(agent_id: str): + is_test = request.path.startswith("/api/v1/webhook_test") + print("request.path",request.path) + print("is_test",is_test) + start_ts = time.time() + # 1. Fetch canvas by agent_id exists, cvs = UserCanvasService.get_by_id(agent_id) if not exists: @@ -617,6 +625,34 @@ def validate_type(value, t): execution_mode = webhook_cfg.get("execution_mode", "Immediately") response_cfg = webhook_cfg.get("response", {}) + def append_webhook_trace(agent_id: str, start_ts: float,event: dict, ttl=600): + key = f"webhook-trace-{agent_id}-logs" + + raw = REDIS_CONN.get(key) + obj = json.loads(raw) if raw else {"webhooks": {}} + + ws = obj["webhooks"].setdefault( + str(start_ts), + {"start_ts": start_ts, "events": []} + ) + + ws["events"].append({ + "ts": time.time(), + **event + }) + + REDIS_CONN.set_obj(key, obj, ttl) + + + def normalize_ans(ans): + return { + "task_id": ans.get("task_id"), + "component_id": ans.get("data", {}).get("component_id"), + "content": ans.get("data", {}).get("content"), + "start_to_think": ans.get("data", {}).get("start_to_think"), + "end_to_think": ans.get("data", {}).get("end_to_think"), + } + if execution_mode == "Immediately": status = response_cfg.get("status", 200) body_tpl = response_cfg.get("body_template", "") @@ -641,12 +677,29 @@ def parse_body(body: str): async def background_run(): try: - async for _ in canvas.run( + async for ans in canvas.run( query="", user_id=cvs.user_id, webhook_payload=clean_request ): - pass # or log/save ans + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": ans.get("event"), + "data": normalize_ans(ans), + } + ) + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + } + ) cvs.dsl = json.loads(str(canvas)) UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) @@ -675,7 +728,24 @@ async def sse(): content = "" if content: contents.append(content) - + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": ans.get("event"), + "data": normalize_ans(ans), + } + ) + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + } + ) final_content = "".join(contents) yield json.dumps(final_content, ensure_ascii=False) @@ -683,4 +753,117 @@ async def sse(): yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) resp = Response(sse(), mimetype="application/json") - return resp \ No newline at end of file + return resp + + +@manager.route("/webhook_trace/", methods=["GET"]) +async def webhook_trace(agent_id: str): + since_ts = request.args.get("since_ts", type=float) + webhook_id = request.args.get("webhook_id") + + key = f"webhook-trace-{agent_id}-logs" + raw = REDIS_CONN.get(key) + + if since_ts is None: + now = time.time() + return Response( + json.dumps( + { + "webhook_id": None, + "events": [], + "next_since_ts": now, + "finished": False, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) + + if not raw: + return Response( + json.dumps( + { + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) + + obj = json.loads(raw) + webhooks = obj.get("webhooks", {}) + + if webhook_id is None: + candidates = [ + float(k) for k in webhooks.keys() if float(k) > since_ts + ] + + if not candidates: + return Response( + json.dumps( + { + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) + + start_ts = min(candidates) + webhook_id = str(start_ts) + + return Response( + json.dumps( + { + "webhook_id": webhook_id, + "events": [], + "next_since_ts": start_ts, + "finished": False, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) + + ws = webhooks.get(str(webhook_id)) + if not ws: + return Response( + json.dumps( + { + "webhook_id": webhook_id, + "events": [], + "next_since_ts": since_ts, + "finished": True, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) + + events = ws.get("events", []) + new_events = [e for e in events if e.get("ts", 0) > since_ts] + + next_ts = since_ts + for e in new_events: + next_ts = max(next_ts, e["ts"]) + + finished = any(e.get("event") == "finished" for e in new_events) + + return Response( + json.dumps( + { + "webhook_id": webhook_id, + "events": new_events, + "next_since_ts": next_ts, + "finished": finished, + }, + ensure_ascii=False, + ), + content_type="application/json; charset=utf-8", + ) From 2833c86ec960ecf3273266bf8eb84d71c01f82fd Mon Sep 17 00:00:00 2001 From: buua436 Date: Tue, 16 Dec 2025 11:33:02 +0800 Subject: [PATCH 09/13] update --- api/apps/sdk/agents.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 596f1fc55f1..58e838802bb 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -142,11 +142,9 @@ def delete_agent(tenant_id: str, agent_id: str): _rate_limit_cache = {} @manager.route("/webhook/", methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 -@manager.route("/webhook_test/",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) +@manager.route("/webhook_test/",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) # noqa: F821 async def webhook(agent_id: str): is_test = request.path.startswith("/api/v1/webhook_test") - print("request.path",request.path) - print("is_test",is_test) start_ts = time.time() # 1. Fetch canvas by agent_id @@ -756,7 +754,7 @@ async def sse(): return resp -@manager.route("/webhook_trace/", methods=["GET"]) +@manager.route("/webhook_trace/", methods=["GET"]) # noqa: F821 async def webhook_trace(agent_id: str): since_ts = request.args.get("since_ts", type=float) webhook_id = request.args.get("webhook_id") From fff1eaf21453acf57a5dd934e8ba9273139c3a5a Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 18 Dec 2025 15:22:32 +0800 Subject: [PATCH 10/13] Enhance webhook handling and rate limiting logic --- agent/canvas.py | 7 +- api/apps/sdk/agents.py | 222 +++++++++++++++++++++------------------- rag/utils/redis_conn.py | 43 ++++++++ 3 files changed, 165 insertions(+), 107 deletions(-) diff --git a/agent/canvas.py b/agent/canvas.py index 70ea6e45cc8..953beb7e38c 100644 --- a/agent/canvas.py +++ b/agent/canvas.py @@ -369,7 +369,12 @@ async def run(self, **kwargs): if kwargs.get("webhook_payload"): for k, cpn in self.components.items(): if self.components[k]["obj"].component_name.lower() == "begin" and self.components[k]["obj"]._param.mode == "Webhook": - for kk, vv in kwargs["webhook_payload"].items(): + payload = kwargs.get("webhook_payload", {}) + if "input" in payload: + self.components[k]["obj"].set_input_value("request", payload["input"]) + for kk, vv in payload.items(): + if kk == "input": + continue self.components[k]["obj"].set_output(kk, vv) for k in kwargs.keys(): diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 58e838802bb..fbe6a0b2e31 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -15,6 +15,9 @@ # import asyncio +import base64 +import hashlib +import hmac import ipaddress import json import logging @@ -138,9 +141,6 @@ def delete_agent(tenant_id: str, agent_id: str): UserCanvasService.delete_by_id(agent_id) return get_json_result(data=True) - -_rate_limit_cache = {} - @manager.route("/webhook/", methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"]) # noqa: F821 @manager.route("/webhook_test/",methods=["POST", "GET", "PUT", "PATCH", "DELETE", "HEAD"],) # noqa: F821 async def webhook(agent_id: str): @@ -220,7 +220,7 @@ async def _validate_max_body_size(security_cfg): return # Convert "10MB" → bytes - units = {"kb": 1024, "mb": 1024**2, "gb": 1024**3} + units = {"kb": 1024, "mb": 1024**2} size_str = max_size.lower() for suffix, factor in units.items(): @@ -229,6 +229,9 @@ async def _validate_max_body_size(security_cfg): break else: raise Exception("Invalid max_body_size format") + MAX_LIMIT = 10 * 1024 * 1024 # 10MB + if limit > MAX_LIMIT: + raise Exception("max_body_size exceeds maximum allowed size (10MB)") content_length = request.content_length or 0 if content_length > limit: @@ -261,24 +264,41 @@ def _validate_rate_limit(security_cfg): if not rl: return - limit = rl.get("limit", 60) + limit = int(rl.get("limit", 60)) + if limit <= 0: + raise Exception("rate_limit.limit must be > 0") per = rl.get("per", "minute") - window = {"second": 1, "minute": 60, "hour": 3600, "day": 86400}.get(per, 60) - key = f"rl:{agent_id}" + window = { + "second": 1, + "minute": 60, + "hour": 3600, + "day": 86400, + }.get(per) + + if not window: + raise Exception(f"Invalid rate_limit.per: {per}") - now = int(time.time()) - bucket = _rate_limit_cache.get(key, {"ts": now, "count": 0}) + capacity = limit + rate = limit / window + cost = 1 + + key = f"rl:tb:{agent_id}" + now = time.time() - # Reset window - if now - bucket["ts"] > window: - bucket = {"ts": now, "count": 0} + try: + res = REDIS_CONN.lua_token_bucket( + keys=[key], + args=[capacity, rate, now, cost], + client=REDIS_CONN.REDIS, + ) - bucket["count"] += 1 - _rate_limit_cache[key] = bucket + allowed = int(res[0]) + if allowed != 1: + raise Exception("Too many requests (rate limit exceeded)") - if bucket["count"] > limit: - raise Exception("Too many requests (rate limit exceeded)") + except Exception as e: + raise Exception(f"Rate limit error: {e}") def _validate_token_auth(security_cfg): """Validate header-based token authentication.""" @@ -304,6 +324,8 @@ def _validate_jwt_auth(security_cfg): """Validate JWT token in Authorization header.""" jwt_cfg = security_cfg.get("jwt", {}) secret = jwt_cfg.get("secret") + if not secret: + raise Exception("JWT secret not configured") required_claims = jwt_cfg.get("required_claims", []) auth_header = request.headers.get("Authorization", "") @@ -391,9 +413,9 @@ async def parse_webhook_request(content_type): # 3. Body ctype = request.headers.get("Content-Type", "").split(";")[0].strip() - if ctype != content_type: + if ctype and ctype != content_type: raise ValueError( - f"Invalid Content-Type: expect '{content_type}', got '{ctype or 'empty'}'" + f"Invalid Content-Type: expect '{content_type}', got '{ctype}'" ) body_data: dict = {} @@ -412,6 +434,8 @@ async def parse_webhook_request(content_type): for key, value in form.items(): body_data[key] = value + if len(files) > 10: + raise Exception("Too many uploaded files") for key, file in files.items(): desc = FileService.upload_info( cvs.user_id, # user @@ -617,7 +641,8 @@ def validate_type(value, t): clean_request = { "query": query_clean, "headers": header_clean, - "body": body_clean + "body": body_clean, + "input": parsed } execution_mode = webhook_cfg.get("execution_mode", "Immediately") @@ -641,18 +666,16 @@ def append_webhook_trace(agent_id: str, start_ts: float,event: dict, ttl=600): REDIS_CONN.set_obj(key, obj, ttl) - - def normalize_ans(ans): - return { - "task_id": ans.get("task_id"), - "component_id": ans.get("data", {}).get("component_id"), - "content": ans.get("data", {}).get("content"), - "start_to_think": ans.get("data", {}).get("start_to_think"), - "end_to_think": ans.get("data", {}).get("end_to_think"), - } - if execution_mode == "Immediately": status = response_cfg.get("status", 200) + try: + status = int(status) + except (TypeError, ValueError): + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}")),RetCode.BAD_REQUEST + + if not (200 <= status <= 399): + return get_data_error_result(code=RetCode.BAD_REQUEST,message=str(f"Invalid response status code: {status}, must be between 200 and 399")),RetCode.BAD_REQUEST + body_tpl = response_cfg.get("body_template", "") def parse_body(body: str): @@ -684,10 +707,7 @@ async def background_run(): append_webhook_trace( agent_id, start_ts, - { - "event": ans.get("event"), - "data": normalize_ans(ans), - } + ans ) if is_test: append_webhook_trace( @@ -730,10 +750,7 @@ async def sse(): append_webhook_trace( agent_id, start_ts, - { - "event": ans.get("event"), - "data": normalize_ans(ans), - } + ans ) if is_test: append_webhook_trace( @@ -756,6 +773,20 @@ async def sse(): @manager.route("/webhook_trace/", methods=["GET"]) # noqa: F821 async def webhook_trace(agent_id: str): + def encode_webhook_id(start_ts: str) -> str: + WEBHOOK_ID_SECRET = "webhook_id_secret" + sig = hmac.new( + WEBHOOK_ID_SECRET.encode("utf-8"), + start_ts.encode("utf-8"), + hashlib.sha256, + ).digest() + return base64.urlsafe_b64encode(sig).decode("utf-8").rstrip("=") + + def decode_webhook_id(enc_id: str, webhooks: dict) -> str | None: + for ts in webhooks.keys(): + if encode_webhook_id(ts) == enc_id: + return ts + return None since_ts = request.args.get("since_ts", type=float) webhook_id = request.args.get("webhook_id") @@ -764,31 +795,23 @@ async def webhook_trace(agent_id: str): if since_ts is None: now = time.time() - return Response( - json.dumps( - { - "webhook_id": None, - "events": [], - "next_since_ts": now, - "finished": False, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": now, + "finished": False, + } ) if not raw: - return Response( - json.dumps( - { - "webhook_id": None, - "events": [], - "next_since_ts": since_ts, - "finished": False, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + } ) obj = json.loads(raw) @@ -800,50 +823,41 @@ async def webhook_trace(agent_id: str): ] if not candidates: - return Response( - json.dumps( - { - "webhook_id": None, - "events": [], - "next_since_ts": since_ts, - "finished": False, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + return get_json_result( + data={ + "webhook_id": None, + "events": [], + "next_since_ts": since_ts, + "finished": False, + } ) start_ts = min(candidates) - webhook_id = str(start_ts) + real_id = str(start_ts) + webhook_id = encode_webhook_id(real_id) - return Response( - json.dumps( - { - "webhook_id": webhook_id, - "events": [], - "next_since_ts": start_ts, - "finished": False, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": [], + "next_since_ts": start_ts, + "finished": False, + } ) - ws = webhooks.get(str(webhook_id)) - if not ws: - return Response( - json.dumps( - { - "webhook_id": webhook_id, - "events": [], - "next_since_ts": since_ts, - "finished": True, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + real_id = decode_webhook_id(webhook_id, webhooks) + + if not real_id: + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": [], + "next_since_ts": since_ts, + "finished": True, + } ) + ws = webhooks.get(str(real_id)) events = ws.get("events", []) new_events = [e for e in events if e.get("ts", 0) > since_ts] @@ -853,15 +867,11 @@ async def webhook_trace(agent_id: str): finished = any(e.get("event") == "finished" for e in new_events) - return Response( - json.dumps( - { - "webhook_id": webhook_id, - "events": new_events, - "next_since_ts": next_ts, - "finished": finished, - }, - ensure_ascii=False, - ), - content_type="application/json; charset=utf-8", + return get_json_result( + data={ + "webhook_id": webhook_id, + "events": new_events, + "next_since_ts": next_ts, + "finished": finished, + } ) diff --git a/rag/utils/redis_conn.py b/rag/utils/redis_conn.py index 5a8aece1de6..d7f0dcd9dfa 100644 --- a/rag/utils/redis_conn.py +++ b/rag/utils/redis_conn.py @@ -59,6 +59,7 @@ def get_msg_id(self): @singleton class RedisDB: lua_delete_if_equal = None + lua_token_bucket = None LUA_DELETE_IF_EQUAL_SCRIPT = """ local current_value = redis.call('get', KEYS[1]) if current_value and current_value == ARGV[1] then @@ -68,6 +69,47 @@ class RedisDB: return 0 """ + LUA_TOKEN_BUCKET_SCRIPT = """ + -- KEYS[1] = rate limit key + -- ARGV[1] = capacity + -- ARGV[2] = rate + -- ARGV[3] = now + -- ARGV[4] = cost + + local key = KEYS[1] + local capacity = tonumber(ARGV[1]) + local rate = tonumber(ARGV[2]) + local now = tonumber(ARGV[3]) + local cost = tonumber(ARGV[4]) + + local data = redis.call("HMGET", key, "tokens", "timestamp") + local tokens = tonumber(data[1]) + local last_ts = tonumber(data[2]) + + if tokens == nil then + tokens = capacity + last_ts = now + end + + local delta = math.max(0, now - last_ts) + tokens = math.min(capacity, tokens + delta * rate) + + if tokens < cost then + return {0, tokens} + end + + tokens = tokens - cost + + redis.call("HMSET", key, + "tokens", tokens, + "timestamp", now + ) + + redis.call("EXPIRE", key, math.ceil(capacity / rate * 2)) + + return {1, tokens} + """ + def __init__(self): self.REDIS = None self.config = REDIS @@ -77,6 +119,7 @@ def register_scripts(self) -> None: cls = self.__class__ client = self.REDIS cls.lua_delete_if_equal = client.register_script(cls.LUA_DELETE_IF_EQUAL_SCRIPT) + cls.lua_token_bucket = client.register_script(cls.LUA_TOKEN_BUCKET_SCRIPT) def __open__(self): try: From 6e4b4215c76aa281993ca76abf50009b20f6f290 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 18 Dec 2025 17:08:16 +0800 Subject: [PATCH 11/13] Refactor webhook error handling and improve logging for background tasks --- api/apps/sdk/agents.py | 60 +++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index fbe6a0b2e31..87681f18382 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -396,11 +396,8 @@ def _validate_jwt_auth(security_cfg): dsl = json.dumps(cvs.dsl, ensure_ascii=False) try: canvas = Canvas(dsl, cvs.user_id, agent_id) - except Exception as e: - return get_json_result( - data=False, message=str(e), - code=RetCode.EXCEPTION_ERROR), RetCode.EXCEPTION_ERROR - + except Exception: + raise # 7. Parse request body async def parse_webhook_request(content_type): """Parse request based on content-type and return structured data.""" @@ -626,7 +623,6 @@ def validate_type(value, t): return True return True - parsed = await parse_webhook_request(webhook_cfg.get("content_types")) SCHEMA = webhook_cfg.get("schema", {"query": {}, "headers": {}, "body": {}}) @@ -704,11 +700,8 @@ async def background_run(): webhook_payload=clean_request ): if is_test: - append_webhook_trace( - agent_id, - start_ts, - ans - ) + append_webhook_trace(agent_id, start_ts, ans) + if is_test: append_webhook_trace( agent_id, @@ -716,6 +709,7 @@ async def background_run(): { "event": "finished", "elapsed_time": time.time() - start_ts, + "success": True, } ) @@ -723,7 +717,29 @@ async def background_run(): UserCanvasService.update_by_id(cvs.user_id, cvs.to_dict()) except Exception as e: - logging.exception(f"Webhook background run failed: {e}") + logging.exception("Webhook background run failed") + if is_test: + try: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "error", + "message": str(e), + "error_type": type(e).__name__, + } + ) + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": False, + } + ) + except Exception: + logging.exception("Failed to append webhook trace") asyncio.create_task(background_run()) return resp @@ -759,12 +775,32 @@ async def sse(): { "event": "finished", "elapsed_time": time.time() - start_ts, + "success": True, } ) final_content = "".join(contents) yield json.dumps(final_content, ensure_ascii=False) except Exception as e: + if is_test: + append_webhook_trace( + agent_id, + start_ts, + { + "event": "error", + "message": str(e), + "error_type": type(e).__name__, + } + ) + append_webhook_trace( + agent_id, + start_ts, + { + "event": "finished", + "elapsed_time": time.time() - start_ts, + "success": False, + } + ) yield json.dumps({"code": 500, "message": str(e)}, ensure_ascii=False) resp = Response(sse(), mimetype="application/json") From a1f5e121a45aeb7ee3c143a4f1d1401867c696a5 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 18 Dec 2025 18:55:43 +0800 Subject: [PATCH 12/13] Fix: Improve error handling in webhook function to return proper response --- api/apps/sdk/agents.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 87681f18382..821a577e7c5 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -33,7 +33,7 @@ from api.db.services.user_canvas_version import UserCanvasVersionService from common.constants import RetCode from common.misc_utils import get_uuid -from api.utils.api_utils import get_data_error_result, get_error_data_result, get_json_result, get_request_json, token_required +from api.utils.api_utils import get_data_error_result, get_error_data_result, get_json_result, get_request_json, server_error_response, token_required from api.utils.api_utils import get_result from quart import request, Response from rag.utils.redis_conn import REDIS_CONN @@ -396,8 +396,11 @@ def _validate_jwt_auth(security_cfg): dsl = json.dumps(cvs.dsl, ensure_ascii=False) try: canvas = Canvas(dsl, cvs.user_id, agent_id) - except Exception: - raise + except Exception as e: + resp=get_data_error_result(code=RetCode.BAD_REQUEST,message=str(e)) + resp.status_code = RetCode.BAD_REQUEST + return resp + # 7. Parse request body async def parse_webhook_request(content_type): """Parse request based on content-type and return structured data.""" From d73040d00e455827e2881d5a04411df190a11c55 Mon Sep 17 00:00:00 2001 From: buua436 Date: Thu, 18 Dec 2025 19:03:17 +0800 Subject: [PATCH 13/13] update --- agent/component/webhook.py | 58 -------------------------------------- api/apps/sdk/agents.py | 2 +- 2 files changed, 1 insertion(+), 59 deletions(-) delete mode 100644 agent/component/webhook.py diff --git a/agent/component/webhook.py b/agent/component/webhook.py deleted file mode 100644 index 61caaa0bc65..00000000000 --- a/agent/component/webhook.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from agent.component.base import ComponentParamBase, ComponentBase - - -class WebhookParam(ComponentParamBase): - - """ - Define the Begin component parameters. - """ - def __init__(self): - super().__init__() - self.security = { - "auth_type": "none", - } - self.schema = {} - self.execution_mode = "Immediately" - self.response = {} - self.outputs = { - "query": { - "value": {}, - "type": "object" - }, - "headers": { - "value": {}, - "type": "object" - }, - "body": { - "value": {}, - "type": "object" - } - } - - def get_input_form(self) -> dict[str, dict]: - return getattr(self, "inputs") - - -class Webhook(ComponentBase): - component_name = "Webhook" - - def _invoke(self, **kwargs): - pass - - def thoughts(self) -> str: - return "" diff --git a/api/apps/sdk/agents.py b/api/apps/sdk/agents.py index 821a577e7c5..2a6e539a069 100644 --- a/api/apps/sdk/agents.py +++ b/api/apps/sdk/agents.py @@ -33,7 +33,7 @@ from api.db.services.user_canvas_version import UserCanvasVersionService from common.constants import RetCode from common.misc_utils import get_uuid -from api.utils.api_utils import get_data_error_result, get_error_data_result, get_json_result, get_request_json, server_error_response, token_required +from api.utils.api_utils import get_data_error_result, get_error_data_result, get_json_result, get_request_json, token_required from api.utils.api_utils import get_result from quart import request, Response from rag.utils.redis_conn import REDIS_CONN