Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[project]
name = "uipath-langchain"
version = "0.1.37"
version = "0.2.0"
description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
dependencies = [
"uipath>=2.2.35, <2.3.0",
"uipath==2.3.0.dev1010343497",
"uipath-runtime>=0.3.0, <0.4.0",
"langgraph>=1.0.0, <2.0.0",
"langchain-core>=1.0.0, <2.0.0",
"aiosqlite==0.21.0",
Expand Down Expand Up @@ -124,3 +125,11 @@ name = "testpypi"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true

[tool.uv.sources]
uipath = { index = "testpypi" }

[tool.uv]
override-dependencies = [
"uipath>=2.3.0.dev1010340000,<2.3.0.dev1010350000",
]
1 change: 1 addition & 0 deletions src/uipath_langchain/runtime/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ async def _create_runtime_instance(
delegate=base_runtime,
storage=storage,
trigger_manager=trigger_manager,
runtime_id=runtime_id,
)

async def new_runtime(
Expand Down
187 changes: 142 additions & 45 deletions src/uipath_langchain/runtime/storage.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""SQLite implementation of UiPathResumableStorageProtocol."""

import json
from typing import cast
from typing import Any, cast

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
from pydantic import BaseModel
Expand All @@ -14,25 +14,31 @@


class SqliteResumableStorage:
"""SQLite storage for resume triggers."""
"""SQLite storage for resume triggers and arbitrary kv pairs."""

def __init__(
self, memory: AsyncSqliteSaver, table_name: str = "__uipath_resume_triggers"
self,
memory: AsyncSqliteSaver,
rs_table_name: str = "__uipath_resume_triggers",
kv_table_name: str = "__uipath_runtime_kv",
):
self.memory = memory
self.table_name = table_name
self.rs_table_name = rs_table_name
self.kv_table_name = kv_table_name
self._initialized = False

async def _ensure_table(self) -> None:
"""Create table if needed."""
"""Create tables if needed."""
if self._initialized:
return

await self.memory.setup()
async with self.memory.lock, self.memory.conn.cursor() as cur:
await cur.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} (
await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.rs_table_name} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
runtime_id TEXT NOT NULL,
type TEXT NOT NULL,
name TEXT NOT NULL,
key TEXT,
Expand All @@ -41,75 +47,166 @@ async def _ensure_table(self) -> None:
payload TEXT,
timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
)
""")
"""
)

await cur.execute(
f"""
CREATE TABLE IF NOT EXISTS {self.kv_table_name} (
runtime_id TEXT NOT NULL,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we are adding runtime ID? at a glance these DBs look to be per job (since I didn't see any kind of primary key before) - is this just good hygiene?

namespace TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT,
timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')),
PRIMARY KEY (runtime_id, namespace, key)
)
"""
)

await self.memory.conn.commit()
self._initialized = True

async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
"""Save resume trigger to database."""
self._initialized = True

async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None:
"""Save resume trigger to database (scoped by runtime_id)."""
await self._ensure_table()

trigger_key = (
trigger.api_resume.inbox_id if trigger.api_resume else trigger.item_key
)
payload = trigger.payload
if payload:
payload = (
(
payload.model_dump()
if isinstance(payload, BaseModel)
else json.dumps(payload)
)
if isinstance(payload, dict)
else str(payload)
)
payload_text = self._dump_value(trigger.payload)

async with self.memory.lock, self.memory.conn.cursor() as cur:
await cur.execute(
f"INSERT INTO {self.table_name} (type, key, name, payload, folder_path, folder_key) VALUES (?, ?, ?, ?, ?, ?)",
f"""
INSERT INTO {self.rs_table_name}
(runtime_id, type, key, name, payload, folder_path, folder_key)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
runtime_id,
trigger.trigger_type.value,
trigger_key,
trigger.trigger_name.value,
payload,
payload_text,
trigger.folder_path,
trigger.folder_key,
),
)
await self.memory.conn.commit()

async def get_latest_trigger(self) -> UiPathResumeTrigger | None:
"""Get most recent trigger from database."""
async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None:
"""Get most recent trigger for runtime_id from database."""
await self._ensure_table()

async with self.memory.lock, self.memory.conn.cursor() as cur:
await cur.execute(f"""
await cur.execute(
f"""
SELECT type, key, name, folder_path, folder_key, payload
FROM {self.table_name}
FROM {self.rs_table_name}
WHERE runtime_id = ?
ORDER BY timestamp DESC
LIMIT 1
""")
""",
(runtime_id,),
)
result = await cur.fetchone()

if not result:
return None
if not result:
return None

trigger_type, key, name, folder_path, folder_key, payload_text = cast(
tuple[str, str, str, str | None, str | None, str | None], tuple(result)
)

payload = self._load_value(payload_text)

trigger_type, key, name, folder_path, folder_key, payload = cast(
tuple[str, str, str, str, str, str], tuple(result)
resume_trigger = UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType(trigger_type),
trigger_name=UiPathResumeTriggerName(name),
item_key=key,
folder_path=folder_path,
folder_key=folder_key,
payload=payload,
)

if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
resume_trigger.api_resume = UiPathApiTrigger(
inbox_id=resume_trigger.item_key,
request=resume_trigger.payload,
)

resume_trigger = UiPathResumeTrigger(
trigger_type=UiPathResumeTriggerType(trigger_type),
trigger_name=UiPathResumeTriggerName(name),
item_key=key,
folder_path=folder_path,
folder_key=folder_key,
payload=payload,
return resume_trigger

async def set_value(
self,
runtime_id: str,
namespace: str,
key: str,
value: Any,
) -> None:
"""Save arbitrary key-value pair to database."""
Copy link

Copilot AI Dec 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states "Save arbitrary key-value pair" but the implementation only accepts str, dict, or BaseModel types. Other common JSON-serializable types like int, float, bool, list, or tuple are rejected. Consider either expanding the accepted types to truly support arbitrary values, or updating the documentation to accurately reflect the restrictions.

Copilot uses AI. Check for mistakes.
if not (
isinstance(value, str)
or isinstance(value, dict)
or isinstance(value, BaseModel)
or value is None
):
raise TypeError("Value must be str, dict, BaseModel or None.")

await self._ensure_table()

value_text = self._dump_value(value)

async with self.memory.lock, self.memory.conn.cursor() as cur:
await cur.execute(
f"""
INSERT INTO {self.kv_table_name} (runtime_id, namespace, key, value)
VALUES (?, ?, ?, ?)
ON CONFLICT(runtime_id, namespace, key)
DO UPDATE SET
value = excluded.value,
timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
""",
(runtime_id, namespace, key, value_text),
)
await self.memory.conn.commit()

if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
resume_trigger.api_resume = UiPathApiTrigger(
inbox_id=resume_trigger.item_key, request=resume_trigger.payload
)
async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any:
"""Get arbitrary key-value pair from database (scoped by runtime_id + namespace)."""
await self._ensure_table()

return resume_trigger
async with self.memory.lock, self.memory.conn.cursor() as cur:
await cur.execute(
f"""
SELECT value
FROM {self.kv_table_name}
WHERE runtime_id = ? AND namespace = ? AND key = ?
LIMIT 1
""",
(runtime_id, namespace, key),
)
row = await cur.fetchone()

if not row:
return None

return self._load_value(cast(str | None, row[0]))

def _dump_value(self, value: str | dict[str, Any] | BaseModel | None) -> str | None:
if value is None:
return None
if isinstance(value, BaseModel):
return "j:" + json.dumps(value.model_dump())
if isinstance(value, dict):
return "j:" + json.dumps(value)
return "s:" + value

def _load_value(self, raw: str | None) -> Any:
if raw is None:
return None
if raw.startswith("s:"):
return raw[2:]
if raw.startswith("j:"):
return json.loads(raw[2:])
return raw
2 changes: 1 addition & 1 deletion tests/hitl/test_hitl_api_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_agent(
assert type == "Api"
assert name == "Api"
assert folder_path == folder_key is None
assert payload == "interrupt message"
assert payload == "s:interrupt message"
finally:
if conn:
conn.close()
Expand Down
23 changes: 14 additions & 9 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.