From 376285281d21fd4cce721b5e61241e886875122a Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Wed, 23 Jul 2025 13:49:14 +1000 Subject: [PATCH 01/12] Sri smells like shit --- llm/langchain_metrics.py | 206 +++++++++++++++++++++++++++++++-------- 1 file changed, 164 insertions(+), 42 deletions(-) diff --git a/llm/langchain_metrics.py b/llm/langchain_metrics.py index c9390c0..6b2cb93 100644 --- a/llm/langchain_metrics.py +++ b/llm/langchain_metrics.py @@ -1,51 +1,173 @@ from langsmith import Client import os +import json from datetime import datetime, timedelta +import csv +import clickhouse_connect +from dotenv import load_dotenv + +load_dotenv() # Check if API key is set from environment variable print("Current system time:", datetime.now()) -# Initialize client (uses LANGSMITH_API_KEY env var) -client = Client(api_key="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") - -def get_recent_runs(): - # Get runs from the last hour - print("Getting recent runs...") - try: - recent_runs = list(client.list_runs( - project_name="default", # Your LangSmith project - limit=50, - )) - print(f"Found {len(recent_runs)} runs") - - # Get metrics from recent runs - for run in recent_runs: - print(f"Run ID: {run.id}") - print(f"Name: {run.name}") - print(f"Status: {run.status}") - print(f"Tokens: {run.total_tokens}") - print(f"Cost: ${run.total_cost}") - print(f"Start time: {run.start_time}") - print(f"End time: {run.end_time}") - print(f"Duration: {run.end_time - run.start_time}") - print(f"Input: {run.inputs}") - print(f"Output: {run.outputs}") - print(f"Error: {run.error}") - print(f"Tags: {run.tags}") - print(f"Metadata: {run.metadata}") - print(f"Parent Run ID: {run.parent_run_id}") - print(f"Child Runs: {run.child_runs}") - - # get the run metrics - run_metrics = client.get_run_metrics(run.id) - print(f"Run Metrics: {run_metrics}") - - # get the run events - run_events = client.get_run_events(run.id) - print(f"Run Events: {run_events}") - print("---") - except Exception as e: - print(f"Error: {e}") + +class LangchainMetrics: + def __init__(self): + self.API_KEY = os.getenv("LANGSMITH_API_KEY") + # Initialize client (uses LANGSMITH_API_KEY env var) + self.client = Client(api_key=self.API_KEY) + + + def connect_clickhouse(self): + """Connect to ClickHouse database""" + try: + self.clickhouse_client = clickhouse_connect.get_client( + host='10.0.100.92', + port=8123, + username='user', + password='default', + database='guardian' + ) + print("Connected to ClickHouse successfully") + return True + except Exception as e: + print(f"Failed to connect to ClickHouse: {e}") + self.clickhouse_client = None + return False + + def save_to_clickhouse(self, run): + # Upload data to ClickHouse table (assumes connection already established) + table_name = "langchain_metrics" + + query = f""" + CREATE TABLE IF NOT EXISTS {table_name} ( + id String, + name String, + status String, + total_tokens Int32, + total_cost Float64, + start_time DateTime, + end_time DateTime, + duration Float64, + inputs String, + outputs String, + error String, + tags String, + metadata String, + parent_run_id String, + child_runs String + ) ENGINE = MergeTree() + ORDER BY id + """ + # Create table if not exists (simple schema) + self.clickhouse_client.command(query) + + # Prepare row for insertion + row = [ + str(run.id), + str(run.name), + str(run.status), + int(run.total_tokens) if run.total_tokens is not None else 0, + float(run.total_cost) if run.total_cost is not None else 0.0, + run.start_time, + run.end_time, + float((run.end_time - run.start_time).total_seconds()) if run.end_time and run.start_time else 0.0, + json.dumps(run.inputs), + json.dumps(run.outputs), + str(run.error), + json.dumps(run.tags), + json.dumps(run.metadata), + str(run.parent_run_id), + json.dumps(run.child_runs) + ] + + self.clickhouse_client.insert( + table_name, + [row], # a list of rows + column_names=[ + "id", "name", "status", "total_tokens", "total_cost", "start_time", "end_time", + "duration", "inputs", "outputs", "error", "tags", "metadata", "parent_run_id", "child_runs" + ] + ) + print(f"Run {run.id} saved to ClickHouse") + + def get_recent_runs(self): + # Get runs from the last hour + print("Getting recent runs...") + try: + recent_runs = list(self.client.list_runs( + project_name="default", # Your LangSmith project + limit=1, + )) + print(f"Found {len(recent_runs)} runs") + + # Get metrics from recent runs and convert to a json + for run in recent_runs: + print(f"Run ID: {run.id}") + print(f"Name: {run.name}") + print(f"Status: {run.status}") + print(f"Tokens: {run.total_tokens}") + print(f"Cost: ${run.total_cost}") + print(f"Start time: {run.start_time}") + print(f"End time: {run.end_time}") + print(f"Duration: {run.end_time - run.start_time}") + print(f"Input: {run.inputs}") + print(f"Output: {run.outputs}") + print(f"Error: {run.error}") + print(f"Tags: {run.tags}") + print(f"Metadata: {run.metadata}") + print(f"Parent Run ID: {run.parent_run_id}") + print(f"Child Runs: {run.child_runs}") + print("---") + + + + # csv_file = "recent_runs.csv" + # fieldnames = [ + # "id", "name", "status", "total_tokens", "total_cost", "start_time", "end_time", + # "duration", "inputs", "outputs", "error", "tags", "metadata", "parent_run_id", "child_runs" + # ] + # # Check if file exists to write header only once + # write_header = not os.path.exists(csv_file) + # with open(csv_file, mode="a", newline="", encoding="utf-8") as f: + # writer = csv.DictWriter(f, fieldnames=fieldnames) + # if write_header: + # writer.writeheader() + # writer.writerow({ + # "id": run.id, + # "name": run.name, + # "status": run.status, + # "total_tokens": run.total_tokens, + # "total_cost": run.total_cost, + # "start_time": run.start_time, + # "end_time": run.end_time, + # "duration": (run.end_time - run.start_time) if run.end_time and run.start_time else None, + # "inputs": json.dumps(run.inputs), + # "outputs": json.dumps(run.outputs), + # "error": run.error, + # "tags": json.dumps(run.tags), + # "metadata": json.dumps(run.metadata), + # "parent_run_id": run.parent_run_id, + # "child_runs": json.dumps(run.child_runs), + # }) + return recent_runs + + + + except Exception as e: + print(f"Error: {e}") if __name__ == "__main__": - get_recent_runs() \ No newline at end of file + langchain_metrics = LangchainMetrics() + langchain_metrics.connect_clickhouse() + runs = langchain_metrics.get_recent_runs() + for run in runs: + langchain_metrics.save_to_clickhouse(run=run) + + + + + + + From 872eca965810ea5852e3a1ab5757151026efdfc9 Mon Sep 17 00:00:00 2001 From: Srijith Gomattam Date: Thu, 24 Jul 2025 09:34:43 +1000 Subject: [PATCH 02/12] additional metrics tracking --- llm/langchain_metrics2.py | 147 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 llm/langchain_metrics2.py diff --git a/llm/langchain_metrics2.py b/llm/langchain_metrics2.py new file mode 100644 index 0000000..1417bd2 --- /dev/null +++ b/llm/langchain_metrics2.py @@ -0,0 +1,147 @@ +from langsmith import Client +import os +import csv +from io import StringIO +from datetime import datetime, timedelta +from typing import List, Optional, Dict, Any, Union +from dotenv import load_dotenv +load_dotenv() + + +class LangChainMetrics: + def __init__(self, project_name: str = "default"): + self.api_key = os.getenv("LANGSMITH_API_KEY") + self.project_name = project_name + self.client = Client(api_key=self.api_key) + + def get_latest_runs(self, limit: int = 50, hours_back: int = 24) -> List[Any]: + try: + start_time = datetime.now() - timedelta(hours=hours_back) + return list(self.client.list_runs( + project_name=self.project_name, + start_time=start_time, + limit=limit + )) + except Exception as e: + print(f"Error getting latest runs: {e}") + return [] + + def get_runs_by_id(self, run_ids: List[str]) -> List[Any]: + runs = [] + for run_id in run_ids: + try: + runs.append(self.client.read_run(run_id)) + except Exception as e: + print(f"Error retrieving run {run_id}: {e}") + return runs + + def get_runs_by_tags(self, tags: List[str], match_all: bool = True, limit: int = 100) -> List[Any]: + try: + operator = " and " if match_all else " or " + filter_str = operator.join([f'has(tags, "{tag}")' for tag in tags]) + + return list(self.client.list_runs( + project_name=self.project_name, + filter=filter_str, + limit=limit + )) + except Exception as e: + print(f"Error searching runs by tags {tags}: {e}") + return [] + + def get_runs_by_start_time( + self, + start_time_gte: datetime, + start_time_lte: Optional[datetime] = None, + limit: int = 100 + ) -> List[Any]: + try: + filters = [f'start_time >= "{start_time_gte.isoformat()}"'] + if start_time_lte: + filters.append(f'start_time <= "{start_time_lte.isoformat()}"') + + return list(self.client.list_runs( + project_name=self.project_name, + filter=" and ".join(filters), + limit=limit + )) + except Exception as e: + print(f"Error searching runs by start time: {e}") + return [] + + def pretty_print_metrics(self, runs: List[Any], group_by: str = "parent_id") -> None: + # Group runs + grouped_runs = {} + if group_by == "parent_id": + for run in runs: + parent_id = str(run.parent_run_id) if run.parent_run_id else str(run.id) + if parent_id not in grouped_runs: + grouped_runs[parent_id] = [] + grouped_runs[parent_id].append(run) + elif group_by == "tag": + for run in runs: + tags = run.tags if run.tags else ["no_tag"] + for tag in tags: + if tag not in grouped_runs: + grouped_runs[tag] = [] + grouped_runs[tag].append(run) + + # CSV format output + csv_buffer = StringIO() + + # Summary CSV + print("=== SUMMARY CSV ===") + summary_writer = csv.writer(csv_buffer) + summary_writer.writerow([ + "group_key", "group_type", "total_runs", "total_cost", "total_tokens", + "success_count", "error_count", "success_rate" + ]) + + for group_key, group_runs in grouped_runs.items(): + success_count = sum(1 for run in group_runs if run.status == "success") + summary_writer.writerow([ + group_key, + group_by, + len(group_runs), + sum(float(run.total_cost or 0) for run in group_runs), + sum(int(run.total_tokens or 0) for run in group_runs), + success_count, + len(group_runs) - success_count, + f"{(success_count / len(group_runs) * 100):.1f}%" if group_runs else "0%" + ]) + + #print(csv_buffer.getvalue()) + csv_buffer.seek(0) + csv_buffer.truncate(0) + + # Detailed runs CSV + print("\n=== DETAILED RUNS CSV ===") + detail_writer = csv.writer(csv_buffer) + detail_writer.writerow([ + "group_key", "run_id", "name", "status", "total_tokens", "total_cost", + "start_time", "end_time", "duration_seconds", "tags", "parent_run_id", "error" + ]) + + for group_key, group_runs in grouped_runs.items(): + for run in group_runs: + detail_writer.writerow([ + group_key, + str(run.id) if run.id else "", + str(run.name) if run.name else "", + str(run.status) if run.status else "", + int(run.total_tokens) if run.total_tokens else 0, + float(run.total_cost) if run.total_cost else 0.0, + run.start_time.isoformat() if run.start_time else "", + run.end_time.isoformat() if run.end_time else "", + (run.end_time - run.start_time).total_seconds() if run.start_time and run.end_time else "", + "|".join(run.tags) if run.tags else "", + str(run.parent_run_id) if run.parent_run_id else "", + str(run.error) if run.error else "" + ]) + + #print(csv_buffer.getvalue()) + +if __name__ == "__main__": + metrics = LangChainMetrics() + runs = metrics.get_latest_runs(limit=10) + metrics.pretty_print_metrics(runs, group_by="parent_id") \ No newline at end of file From b8c7436ebcf1d75c88d1b921743bf761c6145035 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Fri, 25 Jul 2025 10:50:57 +1000 Subject: [PATCH 03/12] uploads to clickhouse on langchain_pipeline run --- llm/langchain_metrics.py | 5 +++-- llm/langchain_pipeline.py | 8 ++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/llm/langchain_metrics.py b/llm/langchain_metrics.py index 6b2cb93..9fcbb44 100644 --- a/llm/langchain_metrics.py +++ b/llm/langchain_metrics.py @@ -92,7 +92,7 @@ def save_to_clickhouse(self, run): ) print(f"Run {run.id} saved to ClickHouse") - def get_recent_runs(self): + def get_runs(self): # Get runs from the last hour print("Getting recent runs...") try: @@ -157,11 +157,12 @@ def get_recent_runs(self): except Exception as e: print(f"Error: {e}") + return [] if __name__ == "__main__": langchain_metrics = LangchainMetrics() langchain_metrics.connect_clickhouse() - runs = langchain_metrics.get_recent_runs() + runs = langchain_metrics.get_runs() for run in runs: langchain_metrics.save_to_clickhouse(run=run) diff --git a/llm/langchain_pipeline.py b/llm/langchain_pipeline.py index bc2c77c..16b02ab 100644 --- a/llm/langchain_pipeline.py +++ b/llm/langchain_pipeline.py @@ -8,6 +8,8 @@ from langchain.prompts import PromptTemplate from pydantic import SecretStr import requests +from langchain_metrics import LangchainMetrics + # === LangGraph imports === from langgraph.graph import StateGraph, START @@ -54,6 +56,7 @@ def generate(state: State, app: "RAGApplication") -> Dict[str, Any]: ) prompt_str = app.rag_prompt.format(question=state["question"], context=ctx) response = app.llm.invoke(prompt_str) + return {"answer": response.content} @@ -141,3 +144,8 @@ def answer_question(self, question: str) -> Dict[str, Any]: print(f"Answer: {res['answer']}") print(f"Articles used: {res['articles_used']}") print("-" * 60) + langchain_metrics = LangchainMetrics() + langchain_metrics.connect_clickhouse() + runs = langchain_metrics.get_runs() + for run in runs: + langchain_metrics.save_to_clickhouse(run=run) From ad33982d86e17d5d0b270947a23cc4dcc9c2c6bb Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 14:45:33 +1000 Subject: [PATCH 04/12] Adding live metrics --- llm/langchain_metrics.py | 91 +++++++---- llm/llm_utils/langchain_pipeline.py | 108 +++++++++---- llm/llm_utils/langchain_pipeline_2.py | 211 ++++++++++++++++++++++++++ 3 files changed, 351 insertions(+), 59 deletions(-) create mode 100644 llm/llm_utils/langchain_pipeline_2.py diff --git a/llm/langchain_metrics.py b/llm/langchain_metrics.py index 9fcbb44..c071761 100644 --- a/llm/langchain_metrics.py +++ b/llm/langchain_metrics.py @@ -5,7 +5,8 @@ import csv import clickhouse_connect from dotenv import load_dotenv - +from typing import List, Any +from pytz import timezone, utc load_dotenv() @@ -36,6 +37,26 @@ def connect_clickhouse(self): self.clickhouse_client = None return False + def get_runs_by_id(self, run_ids: List[str]) -> List[Any]: + # Convert UUID objects to strings if needed + run_ids = [str(run_id) for run_id in run_ids] + runs = list(self.client.list_runs(run_ids=run_ids)) + return runs[0] if runs else None + + def get_runs_by_id_safe(self, run_ids: List[str]) -> Any: + """Safe version that returns None instead of raising errors""" + try: + return self.get_runs_by_id(run_ids) + except Exception as e: + print(f"Error getting run by ID: {e}") + return None + + def find_root_run_id(self, run_id): + current_run = self.get_runs_by_id([run_id]) + while current_run and current_run.parent_run_id: + current_run = self.get_runs_by_id([current_run.parent_run_id]) + return current_run.id if current_run else None + def save_to_clickhouse(self, run): # Upload data to ClickHouse table (assumes connection already established) table_name = "langchain_metrics" @@ -56,55 +77,71 @@ def save_to_clickhouse(self, run): tags String, metadata String, parent_run_id String, - child_runs String + child_runs String, ) ENGINE = MergeTree() ORDER BY id """ # Create table if not exists (simple schema) self.clickhouse_client.command(query) - # Prepare row for insertion + # Prepare row for insertion with safe datetime handling + start_time = run.start_time if run.start_time else datetime.now() + # end_time might be None if run is still in progress + end_time = run.end_time if run.end_time else datetime.now() + duration = (end_time - start_time).total_seconds() if end_time and start_time else 0.0 + + # make sure start and end time are in the same timezone + start_time = start_time.astimezone(timezone('Australia/Sydney')) + end_time = end_time.astimezone(timezone('Australia/Sydney')) + row = [ str(run.id), - str(run.name), - str(run.status), + str(run.name) if run.name else "", + str(run.status) if run.status else "", int(run.total_tokens) if run.total_tokens is not None else 0, float(run.total_cost) if run.total_cost is not None else 0.0, - run.start_time, - run.end_time, - float((run.end_time - run.start_time).total_seconds()) if run.end_time and run.start_time else 0.0, - json.dumps(run.inputs), - json.dumps(run.outputs), - str(run.error), - json.dumps(run.tags), - json.dumps(run.metadata), - str(run.parent_run_id), - json.dumps(run.child_runs) + start_time, + end_time, + float(duration), + json.dumps(run.inputs) if run.inputs else "{}", + json.dumps(run.outputs) if run.outputs else "{}", + str(run.error) if run.error else "", + json.dumps(run.tags) if run.tags else "[]", + json.dumps(run.metadata) if run.metadata else "{}", + str(run.parent_run_id) if run.parent_run_id else "", + json.dumps(run.child_runs) if run.child_runs else "[]", ] - self.clickhouse_client.insert( - table_name, - [row], # a list of rows - column_names=[ - "id", "name", "status", "total_tokens", "total_cost", "start_time", "end_time", - "duration", "inputs", "outputs", "error", "tags", "metadata", "parent_run_id", "child_runs" - ] - ) - print(f"Run {run.id} saved to ClickHouse") - - def get_runs(self): + # Only insert if we have valid data + if start_time: + self.clickhouse_client.insert( + table_name, + [row], # a list of rows + column_names=[ + "id", "name", "status", "total_tokens", "total_cost", "start_time", "end_time", + "duration", "inputs", "outputs", "error", "tags", "metadata", "parent_run_id", "child_runs" + ] + ) + print(f"Run {run.id} saved to ClickHouse") + else: + print(f"Run {run.id} not saved - missing start_time") + + def get_runs(self, num_runs: int = 1, run_ids: List[str] = None, run_name: str = None): # Get runs from the last hour print("Getting recent runs...") try: recent_runs = list(self.client.list_runs( project_name="default", # Your LangSmith project - limit=1, + limit = num_runs, + run_ids = run_ids, + name = run_name, # Filter by run name )) print(f"Found {len(recent_runs)} runs") # Get metrics from recent runs and convert to a json for run in recent_runs: print(f"Run ID: {run.id}") + print(f"Trace ID: {run.trace_id}") print(f"Name: {run.name}") print(f"Status: {run.status}") print(f"Tokens: {run.total_tokens}") diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 9319b54..74b0b73 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -8,8 +8,8 @@ from langchain.prompts import PromptTemplate from pydantic import SecretStr import requests -from langchain_metrics import LangchainMetrics - +from llm.langchain_metrics import LangchainMetrics +from langchain_core.callbacks.base import BaseCallbackHandler # === LangGraph imports === from langgraph.graph import StateGraph, START @@ -24,14 +24,78 @@ class State(TypedDict): context: List[Document] answer: str +class RunIdCollector(BaseCallbackHandler): + def __init__(self): + self.run_ids = [] + self.last_run_id = None # Add this line + self.chain_run_ids = [] + + def on_llm_start(self, serialized, prompts, *, run_id, parent_run_id=None, **kwargs): + self.run_ids.append(run_id) + print(f"LLM run started with run_id: {run_id}") + # Store run_id for later processing in on_llm_end + self.current_run_id = run_id + + def on_llm_end(self, response, *, run_id, parent_run_id=None, **kwargs): + print(f"LLM run ended with run_id: {run_id}") + # Process the run after it's completed + import time + time.sleep(1) # Give LangSmith time to save the run + langchain_metrics = LangchainMetrics() + langchain_metrics.connect_clickhouse() # Connect to ClickHouse + # run = langchain_metrics.get_runs_by_id_safe([run_id]) + # if run: + # # Process the run here + # langchain_metrics.save_to_clickhouse(run) + # else: + # print(f"No run found for run_id: {run_id}") + + run_name = "retrieve" + runs = langchain_metrics.get_runs(num_runs=1, run_ids=None, run_name=run_name) + print(f"Runs: {runs}") + + if runs and len(runs) > 0: + # Get the first run from the list + run = runs[0] + print(f"Found run with name: {run.name}") + #save run to clickhouse + langchain_metrics.save_to_clickhouse(run) + else: + print(f"No run found for run_name: {run_name}") + + + # def on_chain_start(self, inputs, *, run_id, parent_run_id=None, **kwargs): + # print(f"Chain run started with run_id: {run_id}") + # self.chain_run_ids.append(run_id) + + # def on_chain_end(self, outputs, *, run_id, parent_run_id=None, **kwargs): + # print(f"Chain run ended with run_id: {run_id}") + # # Process the run here + # langchain_metrics = LangchainMetrics() + # langchain_metrics.connect_clickhouse() # Connect to ClickHouse + # run = langchain_metrics.get_runs_by_id_safe([run_id]) + # if run: + # print(f"Found run: {run.trace_id}") + + + + + # 2. Step 1: retrieve relevant articles def retrieve(state: State) -> Dict[str, Any]: # Choose port based on database type - if os.getenv("DATABASE_TYPE", "").lower() == "clickhouse": + database_type = os.getenv("DATABASE_TYPE", "") + print(f"DEBUG: DATABASE_TYPE is '{database_type}'") + + if database_type.lower() == "clickhouse": port = 8000 # Port from clickhouse docker-compose - elif os.getenv("DATABASE_TYPE", "").lower() == "postgres": + print(f"DEBUG: Using ClickHouse port {port}") + elif database_type.lower() == "postgres": port = 8001 # Port from postgres docker-compose + print(f"DEBUG: Using PostgreSQL port {port}") + else: + raise ValueError(f"DATABASE_TYPE must be either clickhouse or postgres, got '{database_type}'") docs = requests.get(f"http://localhost:{port}/related-articles?query={state['question']}").json() # convert to LangChain Documents @@ -63,20 +127,6 @@ def generate(state: State, app: "RAGApplication") -> Dict[str, Any]: response = app.llm.invoke(prompt_str) return {"answer": response.content} -======= - - # ctx = "\n\n".join( - # f"Title: {doc.metadata['title']}\n" - # f"Date: {doc.metadata['publication_date']}\n" - # f"Content: {doc.page_content}" - # for doc in state["context"] - # ) - # prompt_str = app.rag_prompt.format(question=state["question"], context=ctx) - # response = app.llm.invoke(prompt_str) - # return {"answer": response.content} - - return {"answer": "lorem ipsum dolor sit amet \n\n ******* this is a sample response so that we dont call the LLM and waste money. If you want to see the real response, uncomment the code in def generate in langchain_pipeline.py"} ->>>>>>> 3817806ba64e4d258b41ebde6e06725516fabfcb:llm/llm_utils/langchain_pipeline.py class RAGApplication: @@ -87,13 +137,16 @@ def __init__(self, max_articles: int = 5): api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY is required") + + collector = RunIdCollector() self.llm = ChatAnthropic( model_name="claude-3-5-sonnet-latest", api_key=SecretStr(api_key), temperature=0.1, timeout=60, - stop=[] + stop=[], + callbacks=[collector] ) self.rag_prompt = PromptTemplate( input_variables=["question", "context"], @@ -108,9 +161,7 @@ def __init__(self, max_articles: int = 5): Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to answer the question, say so. Use the Guardian articles as your primary source of information. Answer:""" - ) - - # 4. Build the LangGraph orchestration + ) # 4. Build the LangGraph orchestration builder = StateGraph(State).add_sequence([ retrieve, lambda state: generate(state, self) @@ -142,6 +193,8 @@ def answer_question(self, question: str) -> Dict[str, Any]: for d in docs ] } + + except Exception as e: logging.error(f"RAG pipeline failed: {e}") return { @@ -156,15 +209,6 @@ def answer_question(self, question: str) -> Dict[str, Any]: if __name__ == "__main__": state_app = RAGApplication(max_articles=5) for q in [ - "Give me the latest on news corp columnist Lucy Zelić.", + "Give me the latest on Trump.", ]: res = state_app.answer_question(q) - print(f"\nQuestion: {res['question']}") - print(f"Answer: {res['answer']}") - print(f"Articles used: {res['articles_used']}") - print("-" * 60) - langchain_metrics = LangchainMetrics() - langchain_metrics.connect_clickhouse() - runs = langchain_metrics.get_runs() - for run in runs: - langchain_metrics.save_to_clickhouse(run=run) diff --git a/llm/llm_utils/langchain_pipeline_2.py b/llm/llm_utils/langchain_pipeline_2.py new file mode 100644 index 0000000..bf513dd --- /dev/null +++ b/llm/llm_utils/langchain_pipeline_2.py @@ -0,0 +1,211 @@ +import os +import logging +from dotenv import load_dotenv +from typing import List, Dict, Any +from anthropic import Anthropic +from langchain_anthropic import ChatAnthropic +from langchain.schema import Document +from langchain.prompts import PromptTemplate +from pydantic import SecretStr +import requests +from llm.langchain_metrics import LangchainMetrics +from langchain_core.callbacks.base import BaseCallbackHandler +from langchain_core.runnables import RunnableSequence +from typing_extensions import TypedDict + +load_dotenv() + + +# 1. Define the RAG chain +class RAGChain: + """Custom RAG chain that combines retrieval and generation.""" + + def __init__(self, llm: ChatAnthropic, rag_prompt: PromptTemplate): + self.llm = llm + self.rag_prompt = rag_prompt + + def _retrieve_documents(self, question: str) -> List[Document]: + """Retrieve relevant documents.""" + if os.getenv("DATABASE_TYPE", "").lower() == "clickhouse": + port = 8000 + elif os.getenv("DATABASE_TYPE", "").lower() == "postgres": + port = 8001 + else: + raise ValueError("DATABASE_TYPE must be either clickhouse or postgres") + + docs = requests.get(f"http://localhost:{port}/related-articles?query={question}").json() + return [ + Document( + page_content=body, + metadata={ + "url": url, + "title": title, + "publication_date": pub_date, + "similarity_score": score + } + ) + for url, title, body, pub_date, score in docs + ] + + def invoke(self, inputs: Dict[str, Any], callbacks=None) -> Dict[str, Any]: + """Invoke the RAG chain with proper callback handling.""" + question = inputs["question"] + + # Step 1: Retrieve documents + docs = self._retrieve_documents(question) + + # Step 2: Generate answer using LLMChain for proper callback support + ctx = "\n\n".join( + f"Title: {doc.metadata['title']}\n" + f"Date: {doc.metadata['publication_date']}\n" + f"Content: {doc.page_content}" + for doc in docs + ) + + # Create a temporary runnable for the generation step + runnable = self.rag_prompt | self.llm + response = runnable.invoke( + {"question": question, "context": ctx}, + config={"callbacks": callbacks} + ) + + return { + "answer": response.content, + "context": [ + { + "title": d.metadata["title"], + "url": d.metadata["url"], + "publication_date": d.metadata["publication_date"], + "similarity_score": d.metadata["similarity_score"], + "snippet": (d.page_content[:200] + "...") if len(d.page_content) > 200 else d.page_content + } + for d in docs + ], + "articles_used": len(docs) + } + +class RunIdCollector(BaseCallbackHandler): + def __init__(self): + self.run_ids = [] + self.last_run_id = None # Add this line + self.chain_run_ids = [] + + def on_llm_start(self, serialized, prompts, *, run_id, parent_run_id=None, **kwargs): + self.run_ids.append(run_id) + print(f"LLM run started with run_id: {run_id}") + # Store run_id for later processing in on_llm_end + self.current_run_id = run_id + + def on_llm_end(self, response, *, run_id, parent_run_id=None, **kwargs): + print(f"LLM run ended with run_id: {run_id}") + # Process the run after it's completed + import time + time.sleep(1) # Give LangSmith time to save the run + langchain_metrics = LangchainMetrics() + langchain_metrics.connect_clickhouse() # Connect to ClickHouse + run = langchain_metrics.get_runs_by_id_safe([run_id]) + if run: + print(f"Found run: {run.trace_id}") + # Process the run here + langchain_metrics.save_to_clickhouse(run) + else: + print(f"No run found for run_id: {run_id}") + + def on_chain_start(self, serialized, inputs, *, run_id, parent_run_id=None, **kwargs): + print(f"Chain run started with run_id: {run_id}") + print(f"Inputs: {inputs}") + self.chain_run_ids.append(run_id) + + def on_chain_end(self, outputs, *, run_id, parent_run_id=None, **kwargs): + print(f"Chain run ended with run_id: {run_id}") + print(f"Outputs: {outputs}") + # Process the run here + import time + time.sleep(1) # Give LangSmith time to save the run + langchain_metrics = LangchainMetrics() + langchain_metrics.connect_clickhouse() # Connect to ClickHouse + run = langchain_metrics.get_runs_by_id_safe([run_id]) + if run: + print(f"Found run: {run.trace_id}") + langchain_metrics.save_to_clickhouse(run) + else: + print(f"No run found for run_id: {run_id}") + + def on_llm_start(self, serialized, prompts, *, run_id, parent_run_id=None, **kwargs): + print(f"LLM run started with run_id: {run_id}") + self.run_ids.append(run_id) + + def on_llm_end(self, response, *, run_id, parent_run_id=None, **kwargs): + print(f"LLM run ended with run_id: {run_id}") + self.last_run_id = run_id + + + + +# These functions are now integrated into the RAGChain class + + +class RAGApplication: + def __init__(self, max_articles: int = 5): + self.max_articles = max_articles + self.anthropic = Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")) + api_key = os.getenv("ANTHROPIC_API_KEY") + if not api_key: + raise ValueError("ANTHROPIC_API_KEY is required") + + self.llm = ChatAnthropic( + model_name="claude-3-5-sonnet-latest", + api_key=SecretStr(api_key), + temperature=0.1, + timeout=60, + stop=[] + ) + self.rag_prompt = PromptTemplate( + input_variables=["question", "context"], + template=""" + You are a helpful AI assistant that answers questions based on the provided context from Guardian articles. + + Context from Guardian articles: + {context} + + Question: {question} + + Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to answer the question, say so. Use the Guardian articles as your primary source of information. + + Answer:""" + ) + + # Create the RAG chain + self.rag_chain = RAGChain(llm=self.llm, rag_prompt=self.rag_prompt) + + def answer_question(self, question: str) -> Dict[str, Any]: + """Invoke the RAG chain with callbacks.""" + try: + collector = RunIdCollector() + # Invoke the chain with callbacks + result = self.rag_chain.invoke({"question": question}, callbacks=[collector]) + + return { + "question": question, + "answer": result["answer"], + "articles_used": result["articles_used"], + "context": result["context"] + } + + except Exception as e: + logging.error(f"RAG pipeline failed: {e}") + return { + "question": question, + "answer": f"Error: {e}", + "context": [], + "articles_used": 0 + } + + +# === Example usage === +if __name__ == "__main__": + state_app = RAGApplication(max_articles=5) + for q in [ + "Give me the latest on Epstein.", + ]: + res = state_app.answer_question(q) From b0b900448c71bca66701559252616c8244ccaf1e Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 14:47:45 +1000 Subject: [PATCH 05/12] Adding live metrics --- .gitignore | 1 + README.md | 50 ++++- run.bat => dev/run.bat | 2 +- run.sh => dev/run.sh | 0 test_response.py => dev/test_response.py | 0 llm/dashboard.json | 249 +++++++++++++++++++++ llm/docker-compose.yml | 13 ++ llm/langchain_gui.py | 19 +- llm/llm_utils/langchain_pipeline.py | 23 +- models/__init__.py | 1 - requirements.txt | 23 +- routes/__init__.py | 1 - scripts/pull_docs_cassandra.py | 120 ++++++++++ services/cassandra/Dockerfile | 30 +++ services/cassandra/cassandra_controller.py | 26 +++ services/cassandra/cassandra_dao.py | 76 +++++++ services/cassandra/docker-compose.yml | 43 ++++ services/cassandra/init/01-schema.cql | 18 ++ services/clickhouse/Dockerfile | 7 +- services/clickhouse/docker-compose.yaml | 3 +- 20 files changed, 679 insertions(+), 26 deletions(-) rename run.bat => dev/run.bat (64%) rename run.sh => dev/run.sh (100%) mode change 100755 => 100644 rename test_response.py => dev/test_response.py (100%) create mode 100644 llm/dashboard.json create mode 100644 llm/docker-compose.yml delete mode 100644 models/__init__.py delete mode 100644 routes/__init__.py create mode 100644 scripts/pull_docs_cassandra.py create mode 100644 services/cassandra/Dockerfile create mode 100644 services/cassandra/cassandra_controller.py create mode 100644 services/cassandra/cassandra_dao.py create mode 100644 services/cassandra/docker-compose.yml create mode 100644 services/cassandra/init/01-schema.cql diff --git a/.gitignore b/.gitignore index 8470558..45a99d5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .env .venv +venv .DS_STORE scripts/.idea .idea diff --git a/README.md b/README.md index d18f86f..ae36760 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ POSTGRES_HOST="localhost" POSTGRES_PORT=5432 GUARDIAN_API_KEY="" + +HOST="localhost" or shared IP address holding the container ``` 3. Install PostgreSQL; ensure you **write down your login somewhere safe**. @@ -58,23 +60,63 @@ Use the `POST` endpoint on the Docker container to upload articles to either dat If you're hosting a shared database, run `docker compose up --build`; this loads up the app with uvicorn at `0.0.0.0:8000`. +#### Streamlit (Docker) + +Run `docker compose build`(if not built previously) & `docker compose up` in rag/services/streamlit + +#### Streamlit (Locally) + +Ensure you've pip installed requirements in your local virtual env. + +On Mac, run the script at dev/run.sh + +On Windows, run the script at dev/run.bat (by clicking it in file explorer) + + ## Local PostgreSQL -Run `uvicorn services.postgres_controller:app --reload`. +Run `uvicorn services.postgres_controller:app --reload --port 8001`. + +(You will need to activate your venv & `pip install -r requirements.txt`) + ## Local Clickhouse -Run `uvicorn services.clickhouse_controller:app --reload`. +Run `uvicorn services.clickhouse_controller:app --reload --port 8000`. + +(You will need to activate your venv & `pip install -r requirements.txt`) + +## Local Cassandra + +Run `uvicorn services.cassandra.cassandra_controller:app --reload`. + +## Local Grafana + +1. `cd` into the `llm` folder. +2. Run `docker-compose up`; this will create a Docker container for Grafana on port `3000`. +3. In your browser, open `localhost:3000`, and login using username `admin` and password `admin`. +4. Add a new data source with the following parameters: + 1. **Server address**: the IP of your container. + 2. **Server port**: `8123` + 3. **Protocol**: `HTTP` + 4. **Skip TLS Verify**: `true` + 5. **Username**: `user` + 6. **Password**: `default` + 7. **Default database**: `guardian` + 8. **Default table**: `langchain_metrics` +5. Save and test the data source, and verify it connects successfully. +6. Import `dashboard.json` from the `llm` folder. +7. You should now have a local Grafana with metrics data; the metrics tab on the Streamlit application should now also work. # Endpoints ## GET -`http://localhost:8000/related-articles?query=` +curl "http://localhost:8000/related-articles?query=" ## POST -`http://localhost:8000/upload-articles` +curl -X POST "http://localhost:8000/upload-articles" # License diff --git a/run.bat b/dev/run.bat similarity index 64% rename from run.bat rename to dev/run.bat index da5abad..6122679 100644 --- a/run.bat +++ b/dev/run.bat @@ -1,4 +1,4 @@ @echo off -call venv\Scripts\activate +call .venv\Scripts\activate streamlit run llm/langchain_gui.py pause \ No newline at end of file diff --git a/run.sh b/dev/run.sh old mode 100755 new mode 100644 similarity index 100% rename from run.sh rename to dev/run.sh diff --git a/test_response.py b/dev/test_response.py similarity index 100% rename from test_response.py rename to dev/test_response.py diff --git a/llm/dashboard.json b/llm/dashboard.json new file mode 100644 index 0000000..4e3adb5 --- /dev/null +++ b/llm/dashboard.json @@ -0,0 +1,249 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 1, + "links": [], + "panels": [ + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "aesy917veeltsd" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "always", + "stacking": "none", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "xField": "total_tokens", + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "pluginVersion": "12.1.0", + "targets": [ + { + "editorType": "sql", + "format": 1, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.10.1", + "queryType": "table", + "rawSql": "SELECT parent_run_id, total_tokens, total_cost, status\r\nFROM \"guardian\".\"langchain_metrics\"\r\nORDER BY start_time", + "refId": "A" + } + ], + "title": "Token Metrics", + "type": "barchart" + }, + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "aesy917veeltsd" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "continuous-BlYlRd" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "fillOpacity": 80, + "gradientMode": "hue", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineWidth": 1, + "scaleDistribution": { + "type": "linear" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "barRadius": 0, + "barWidth": 0.97, + "colorByField": "duration", + "fullHighlight": false, + "groupWidth": 0.7, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "orientation": "auto", + "showValue": "auto", + "stacking": "none", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + }, + "xTickLabelRotation": 0, + "xTickLabelSpacing": 0 + }, + "pluginVersion": "12.1.0", + "targets": [ + { + "datasource": { + "type": "grafana-clickhouse-datasource", + "uid": "aesy917veeltsd" + }, + "editorType": "sql", + "format": 1, + "meta": { + "builderOptions": { + "columns": [], + "database": "", + "limit": 1000, + "mode": "list", + "queryType": "table", + "table": "" + } + }, + "pluginVersion": "4.10.1", + "queryType": "table", + "rawSql": "SELECT parent_run_id, duration, status, outputs\r\nFROM \"guardian\".\"langchain_metrics\"\r\nWHERE parent_run_id != 'None'\r\nORDER BY start_time ASC\r\nLIMIT 1000", + "refId": "A" + } + ], + "title": "Duration Metrics", + "transparent": true, + "type": "barchart" + } + ], + "preload": false, + "schemaVersion": 41, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "2025-07-27T10:59:29.712Z", + "to": "2025-07-28T10:59:29.712Z" + }, + "timepicker": {}, + "timezone": "browser", + "title": "DB Metrics Visualization", + "uid": "90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c", + "version": 11 +} \ No newline at end of file diff --git a/llm/docker-compose.yml b/llm/docker-compose.yml new file mode 100644 index 0000000..a70a8b3 --- /dev/null +++ b/llm/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3.8" + +services: + grafana: + image: grafana/grafana-oss + container_name: grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ALLOW_EMBEDDING=true + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Viewer + restart: unless-stopped \ No newline at end of file diff --git a/llm/langchain_gui.py b/llm/langchain_gui.py index ecaa0d5..e74922d 100644 --- a/llm/langchain_gui.py +++ b/llm/langchain_gui.py @@ -6,6 +6,8 @@ LOGO_URL = "https://cdn.brandfetch.io/idEaoqZ5uv/w/400/h/400/theme/dark/icon.png?c=1dxbfHSJFAPEGdCLU4o5B" LOADING_URL = "https://cdn.pixabay.com/animation/2025/04/08/09/08/09-08-31-655_512.gif" +DURATION_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753395832378&to=1753417432378&timezone=browser&panelId=1&__feature.dashboardSceneSolo=true" +TOKEN_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753645594350&to=1753667194350&timezone=browser&panelId=2&__feature.dashboardSceneSolo=true" # --- Custom styles --- st.markdown(f""" @@ -203,7 +205,7 @@ def run_api(): - uvicorn.run(controller.app, host="0.0.0.0", port=8001, log_level="info") + uvicorn.run(controller.app, host="0.0.0.0", port=8002, log_level="info") api_thread = threading.Thread(target=run_api, daemon=True) @@ -211,7 +213,7 @@ def run_api(): st.title("RAGuardian") -tab1, tab2, tab3 = st.tabs(["Single Query", "Bulk Query", "Multi Query"]) +tab1, tab2, tab3, tab4 = st.tabs(["Single Query", "Bulk Query", "Multi Query", "Metrics"]) with tab1: @@ -397,3 +399,16 @@ def run_api(): st.markdown(context_html, unsafe_allow_html=True) else: st.warning("Please enter at least one valid query.") + +with tab4: + tab4a, tab4b = st.tabs(["Duration Metrics", "Token Metrics"]) + + with tab4a: + st.components.v1.html( + f'', + height=600) + + with tab4b: + st.components.v1.html( + f'', + height=600) diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 74b0b73..4d408db 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -8,8 +8,12 @@ from langchain.prompts import PromptTemplate from pydantic import SecretStr import requests +<<<<<<< HEAD from llm.langchain_metrics import LangchainMetrics from langchain_core.callbacks.base import BaseCallbackHandler +======= +from enum import Enum +>>>>>>> 67719a3242105152268b66ef0a7192122bf13992 # === LangGraph imports === from langgraph.graph import StateGraph, START @@ -17,6 +21,14 @@ load_dotenv() +class AvailableRAGDatabases(Enum): + CLICKHOUSE = "clickhouse" + POSTGRES = "postgres" + +PORT_MAPPING = { + AvailableRAGDatabases.CLICKHOUSE: 8000, + AvailableRAGDatabases.POSTGRES: 8001 +} # 1. Define the shared state for orchestration class State(TypedDict): @@ -85,6 +97,7 @@ def on_llm_end(self, response, *, run_id, parent_run_id=None, **kwargs): # 2. Step 1: retrieve relevant articles def retrieve(state: State) -> Dict[str, Any]: # Choose port based on database type +<<<<<<< HEAD database_type = os.getenv("DATABASE_TYPE", "") print(f"DEBUG: DATABASE_TYPE is '{database_type}'") @@ -96,8 +109,14 @@ def retrieve(state: State) -> Dict[str, Any]: print(f"DEBUG: Using PostgreSQL port {port}") else: raise ValueError(f"DATABASE_TYPE must be either clickhouse or postgres, got '{database_type}'") - - docs = requests.get(f"http://localhost:{port}/related-articles?query={state['question']}").json() +======= + port = PORT_MAPPING[AvailableRAGDatabases(os.getenv("DATABASE_TYPE", "").lower())] + logging.info(f"Using port {port} for {AvailableRAGDatabases(os.getenv('DATABASE_TYPE', '').lower())}") +>>>>>>> 67719a3242105152268b66ef0a7192122bf13992 + + # hostname is based on local machine or docker + hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", False) else "host.docker.internal" + docs = requests.get(f"http://{hostname}:{port}/related-articles?query={state['question']}").json() # convert to LangChain Documents documents = [ Document( diff --git a/models/__init__.py b/models/__init__.py deleted file mode 100644 index 73e5111..0000000 --- a/models/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Models package for RAG application \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index c9f2978..0bbe95e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,16 +1,15 @@ -clickhouse_connect==0.8.18 - # Postgres dependencies -fastapi==0.116.1 -pgvector==0.4.1 -psycopg>=3.0 -pydantic==2.11.7 -python-dotenv==1.1.1 -Requests==2.32.4 -sentence_transformers==5.0.0 -uvicorn==0.35.0 +#fastapi==0.116.1 +#pgvector==0.4.1 +#psycopg>=3.0 +#no? pydantic==2.11.7 +#python-dotenv==1.1.1 +#Requests==2.32.4 +#sentence_transformers==5.0.0 +#uvicorn==0.35.0 # LangChain dependencies +#clickhouse_connect==0.8.18 langchain~=0.3.26 langchain-community langchain-core @@ -22,6 +21,9 @@ langchain-anthropic beautifulsoup4~=4.13.4 anthropic~=0.58.2 +# Cassandra dependencies +cassandra-driver + numpy~=2.0.2 pillow~=11.3.0 pip~=22.3.1 @@ -63,3 +65,4 @@ pgvector~=0.4.1 streamlit~=1.47.0 tornado~=6.5.1 pandas~=2.3.1 + diff --git a/routes/__init__.py b/routes/__init__.py deleted file mode 100644 index 61ed7fc..0000000 --- a/routes/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Routes package for RAG application API endpoints \ No newline at end of file diff --git a/scripts/pull_docs_cassandra.py b/scripts/pull_docs_cassandra.py new file mode 100644 index 0000000..6cfc5c0 --- /dev/null +++ b/scripts/pull_docs_cassandra.py @@ -0,0 +1,120 @@ +import os +import logging +import requests +from sentence_transformers import SentenceTransformer +from dotenv import load_dotenv +from cassandra.cluster import Cluster +from cassandra.query import SimpleStatement + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + + +def pull_docs(total_needed: int = 1000, page_size: int = 1): + + load_dotenv() + API_KEY = os.getenv("GUARDIAN_API_KEY") + BASE = "https://content.guardianapis.com/search" + all_articles = [] + pages = total_needed // page_size + + logging.info(f"Starting to fetch {total_needed} articles with page_size={page_size}") + logging.info(f"API Key present: {'Yes' if API_KEY else 'No'}") + + cluster = Cluster([os.getenv("CASSANDRA_HOST", "127.0.0.1")], port=int(os.getenv("CASSANDRA_PORT", 9042))) + session = cluster.connect() + + keyspace = os.getenv("CASSANDRA_KEYSPACE", "vectorembeds") + session.execute(f""" + CREATE KEYSPACE IF NOT EXISTS {keyspace} + WITH replication = {{'class':'SimpleStrategy', 'replication_factor':1}}; + """) + session.set_keyspace(keyspace) + + session.execute(f""" + CREATE TABLE IF NOT EXISTS articles ( + url text PRIMARY KEY, + title text, + body text, + publication_date text, + vector vector + ); + """) + + model = SentenceTransformer("all-MiniLM-L6-v2") + articles_inserted = 0 + articles_skipped = 0 + + try: + for page in range(1, pages + 1): + params = { + "api-key": API_KEY, + "order-by": "newest", + "page-size": page_size, + "page": page, + "show-fields": "all", + } + + logging.info(f"--- Fetching page {page} ---") + logging.debug(f"Request URL: {BASE}") + + resp = requests.get(BASE, params=params) + + data = resp.json().get("response", {}) + results = data.get("results", []) + all_articles.extend(results) + + logging.info(f"Fetched {len(results)} items from page {page}") + + if not results: + logging.warning("No results returned, stopping...") + break + + # Process each article + for i, result in enumerate(results): + article = result['fields'] + url = article['shortUrl'] + title = article['headline'] + body = article['bodyText'] + publication_date = article['firstPublicationDate'] + + logging.info(f" Title: {title[:100]}...") + + embedding = model.encode(body) + embedding_list = [float(x) for x in embedding] + + insert_cql = SimpleStatement(f""" + INSERT INTO articles (url, title, body, publication_date, vector) + VALUES (%s, %s, %s, %s, %s) + IF NOT EXISTS; + """) + + result = session.execute(insert_cql, (url, title, body, publication_date, embedding_list)) + + if result.was_applied: + articles_inserted += 1 + logging.info(f" ✅ INSERTED: {title[:50]}...") + else: + articles_skipped += 1 + logging.info(f" ⏭️ SKIPPED (duplicate): {title[:50]}...") + + logging.info(f"Page {page} summary: {articles_inserted} inserted, {articles_skipped} skipped") + + logging.info("=== FINAL SUMMARY ===") + logging.info(f"Articles inserted: {articles_inserted}") + logging.info(f"Articles skipped (duplicates): {articles_skipped}") + logging.info(f"Total processed: {articles_inserted + articles_skipped}") + + cluster.shutdown() + return True + except Exception as e: + logging.error(f"❌ Pipeline failed: {e}") + cluster.shutdown() + return False + +if __name__ == "__main__": + pull_docs(total_needed=50, page_size=10) \ No newline at end of file diff --git a/services/cassandra/Dockerfile b/services/cassandra/Dockerfile new file mode 100644 index 0000000..0e7e061 --- /dev/null +++ b/services/cassandra/Dockerfile @@ -0,0 +1,30 @@ +FROM python:3.11 + +WORKDIR /app + +RUN echo "deb https://deb.debian.org/debian bookworm main" > /etc/apt/sources.list && \ + echo "deb https://deb.debian.org/debian-security bookworm-security main" >> /etc/apt/sources.list && \ + echo "deb https://deb.debian.org/debian bookworm-updates main" >> /etc/apt/sources.list && \ + apt-get update && \ + apt-get install -y --no-install-recommends gcc && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* +# Install system dependencies including PostgreSQL client libraries +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + libpq-dev \ + gcc \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +# Debug: List files in build context +RUN ls -la + +# Install dependencies +COPY requirements.txt . +RUN pip install --upgrade pip +RUN pip install --no-cache-dir -r requirements.txt +RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu + +# Default command if not overridden by docker-compose +CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8002"] diff --git a/services/cassandra/cassandra_controller.py b/services/cassandra/cassandra_controller.py new file mode 100644 index 0000000..80d7643 --- /dev/null +++ b/services/cassandra/cassandra_controller.py @@ -0,0 +1,26 @@ +import time +from fastapi import FastAPI +from services.cassandra.cassandra_dao import CassandraDao +from scripts.pull_docs_cassandra import pull_docs +import logging + +app = FastAPI() + +# Create controller instance +cassandra_dao = CassandraDao() + +@app.get("/related-articles") +async def related_articles(query: str): + start_time = time.time() + result = cassandra_dao.related_articles(query) + end_time = time.time() + logging.info(f"GET Time taken: {end_time - start_time} seconds...you got that!") + return result + +@app.post("/upload-articles") +async def upload_articles(): + start_time = time.time() + result = pull_docs(10) + end_time = time.time() + logging.info(f"POST Time taken: {end_time - start_time} seconds...you posted up!") + return result \ No newline at end of file diff --git a/services/cassandra/cassandra_dao.py b/services/cassandra/cassandra_dao.py new file mode 100644 index 0000000..6916de3 --- /dev/null +++ b/services/cassandra/cassandra_dao.py @@ -0,0 +1,76 @@ +from http.client import HTTPException + +from sentence_transformers import SentenceTransformer +import os +import logging +from dotenv import load_dotenv +from cassandra.cluster import Cluster + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.StreamHandler() + ] +) + +load_dotenv() + +class CassandraDao: + def __init__(self): + self.API_KEY = os.getenv("GUARDIAN_API_KEY") + self.BASE = "https://content.guardianapis.com/search" + self.model = SentenceTransformer('all-MiniLM-L6-v2') + self.client = None + logging.info("DAO initialized.") + + def connect_cassandra(self): + """Connect to Cassandra database""" + try: + cassandra_host = os.getenv("CASSANDRA_HOST", "localhost") + cassandra_port = int(os.getenv("CASSANDRA_PORT", 9042)) + cassandra_keyspace = os.getenv("CASSANDRA_KEYSPACE", "your_keyspace") + + cluster = Cluster([cassandra_host], port=cassandra_port) + self.client = cluster.connect(cassandra_keyspace) + + logging.info("Connected to Cassandra successfully.") + print("Connected to Cassandra successfully") + return True + except Exception as e: + logging.error(f"Failed to connect to Cassandra: {e}") + print(f"Failed to connects to Cassandra: {e}") + self.client = None + return False + + def related_articles(self, query: str, limit: int = 5): + try: + if not self.connect_cassandra(): + raise HTTPException(500, "Failed to connect to database") + conn = self.client + if conn is None: + raise HTTPException(500, "Database connection is None") + + emb = self.model.encode(query).tolist() + + query_cql = """ + SELECT url, title, body, publication_date + FROM articles + ORDER BY vector ANN OF ? + LIMIT ? + """ + + prepared = self.client.prepare(query_cql) + rows = self.client.execute(prepared, (emb, limit)) + + results = [(row.url, row.title, row.body, row.publication_date, "No Similarity Score") for row in rows] + + if not results: + raise HTTPException(404, "No matches found") + + return results + + except Exception as e: + logging.error(f"Exception in /search endpoint: {e}", exc_info=True) + raise HTTPException(500, str(e)) diff --git a/services/cassandra/docker-compose.yml b/services/cassandra/docker-compose.yml new file mode 100644 index 0000000..3243641 --- /dev/null +++ b/services/cassandra/docker-compose.yml @@ -0,0 +1,43 @@ +services: + db: + image: cassandra:latest + container_name: rag-cassandra-db + restart: unless-stopped + volumes: + - cassandra_data:/var/lib/cassandra + - ./init:/init + ports: + - 19042:9042 + environment: + - CASSANDRA_CLUSTER_NAME=Test Cluster + ulimits: + nofile: + soft: 262144 + hard: 262144 + + app: + build: + context: ../.. + dockerfile: services/cassandra/Dockerfile + container_name: rag-cassandra-app + depends_on: + - db + ports: + - 8003:8003 + environment: + - DATABASE_TYPE=cassandra + - CASSANDRA_HOST=db # refers to service name 'db' here + - CASSANDRA_PORT=9042 + - CASSANDRA_KEYSPACE=vectorembeds + volumes: + - ../../scripts/:/app/scripts + - ../../requirements.txt:/app/requirements.txt + - ../../services/cassandra:/app/services/cassandra + - ../../.env:/app/.env + + working_dir: /app + command: [ "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003", "--reload" ] + restart: unless-stopped + +volumes: + cassandra_data: \ No newline at end of file diff --git a/services/cassandra/init/01-schema.cql b/services/cassandra/init/01-schema.cql new file mode 100644 index 0000000..d2cce64 --- /dev/null +++ b/services/cassandra/init/01-schema.cql @@ -0,0 +1,18 @@ +-- Keyspace: like a database in PostgreSQL +CREATE KEYSPACE IF NOT EXISTS vectorembeds + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + +-- Use the keyspace +USE vectorembeds; + +-- Create the articles table +CREATE TABLE IF NOT EXISTS articles ( + url TEXT PRIMARY KEY, + title TEXT, + body TEXT, + publication_date TIMESTAMP, + vector VECTOR +); + +CREATE CUSTOM INDEX IF NOT EXISTS ann_index ON articles(vector) + USING 'StorageAttachedIndex'; \ No newline at end of file diff --git a/services/clickhouse/Dockerfile b/services/clickhouse/Dockerfile index dc035fe..d903b5f 100644 --- a/services/clickhouse/Dockerfile +++ b/services/clickhouse/Dockerfile @@ -2,12 +2,13 @@ FROM python:3.11-slim WORKDIR /app -COPY requirements.txt . -COPY .env . +# Copy the local requirements.txt from the clickhouse service directory +COPY services/clickhouse/requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -COPY ./services/clickhouse . +# Copy the clickhouse service files +COPY services/clickhouse/ . # This is where Uvicorn runs your FastAPI app! CMD ["uvicorn", "clickhouse_controller:app", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file diff --git a/services/clickhouse/docker-compose.yaml b/services/clickhouse/docker-compose.yaml index eadb4c1..d991467 100644 --- a/services/clickhouse/docker-compose.yaml +++ b/services/clickhouse/docker-compose.yaml @@ -1,4 +1,3 @@ -version: "3.8" services: clickhouse: @@ -42,7 +41,7 @@ services: - "8000:80" volumes: - .:/app - - ../requirements.txt:/app/requirements.txt + - ./requirements.txt:/app/requirements.txt - ../.env:/app/.env networks: - clickhouse-network From d451ff18ceb395d0d0a9e419f6f588ef67b96146 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 15:49:40 +1000 Subject: [PATCH 06/12] live metrics --- llm/llm_utils/langchain_pipeline.py | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 4d408db..7507cb3 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -8,16 +8,13 @@ from langchain.prompts import PromptTemplate from pydantic import SecretStr import requests -<<<<<<< HEAD -from llm.langchain_metrics import LangchainMetrics -from langchain_core.callbacks.base import BaseCallbackHandler -======= from enum import Enum ->>>>>>> 67719a3242105152268b66ef0a7192122bf13992 # === LangGraph imports === from langgraph.graph import StateGraph, START from typing_extensions import TypedDict +from llm.langchain_metrics import LangchainMetrics +from langchain_core.callbacks.base import BaseCallbackHandler load_dotenv() @@ -97,25 +94,11 @@ def on_llm_end(self, response, *, run_id, parent_run_id=None, **kwargs): # 2. Step 1: retrieve relevant articles def retrieve(state: State) -> Dict[str, Any]: # Choose port based on database type -<<<<<<< HEAD - database_type = os.getenv("DATABASE_TYPE", "") - print(f"DEBUG: DATABASE_TYPE is '{database_type}'") - - if database_type.lower() == "clickhouse": - port = 8000 # Port from clickhouse docker-compose - print(f"DEBUG: Using ClickHouse port {port}") - elif database_type.lower() == "postgres": - port = 8001 # Port from postgres docker-compose - print(f"DEBUG: Using PostgreSQL port {port}") - else: - raise ValueError(f"DATABASE_TYPE must be either clickhouse or postgres, got '{database_type}'") -======= port = PORT_MAPPING[AvailableRAGDatabases(os.getenv("DATABASE_TYPE", "").lower())] logging.info(f"Using port {port} for {AvailableRAGDatabases(os.getenv('DATABASE_TYPE', '').lower())}") ->>>>>>> 67719a3242105152268b66ef0a7192122bf13992 # hostname is based on local machine or docker - hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", False) else "host.docker.internal" + hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", "true") == "true" else "host.docker.internal" docs = requests.get(f"http://{hostname}:{port}/related-articles?query={state['question']}").json() # convert to LangChain Documents documents = [ From 43753c11ead96d9bfda42b99268315365cd0b230 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 15:50:26 +1000 Subject: [PATCH 07/12] live metrics --- README.md | 26 ++++++++++++++++++-------- llm/llm_utils/langchain_pipeline.py | 4 ++++ scripts/pull_docs_cassandra.py | 16 ++++++++++++++++ services/__init__.py | 0 services/cassandra/Dockerfile | 8 ++++++++ services/cassandra/docker-compose.yml | 4 ++++ services/cassandra/requirements.txt | 9 +++++++++ 7 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 services/__init__.py create mode 100644 services/cassandra/requirements.txt diff --git a/README.md b/README.md index ae36760..ffce01e 100644 --- a/README.md +++ b/README.md @@ -19,29 +19,39 @@ This repository can be used to measure the metrics of various vector databases. 2. Create a `.env` file at the project root in the same directory as the `.git` folder with the following fields: ``` -# PSQL CREDENTIALS +# PostgreSQL POSTGRES_DB="guardian" POSTGRES_USER="postgres" POSTGRES_PASSWORD="" POSTGRES_HOST="localhost" POSTGRES_PORT=5432 +# Clickhouse +CLICKHOUSE_DB=guardian +CLICKHOUSE_USER=default +CLICKHOUSE_PASSWORD="" + GUARDIAN_API_KEY="" +<<<<<<< HEAD HOST="localhost" or shared IP address holding the container +======= +ANTHROPIC_API_KEY="" +LANGSMITH_TRACING="true" +LANGSMITH_API_KEY="" + +DATABASE_TYPE="" +HOST="localhost" or shared IPv4 address +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ``` 3. Install PostgreSQL; ensure you **write down your login somewhere safe**. -4. Create a PostgreSQL database on `localhost:5432` named `guardian` by running `psql -U postgres -h localhost -p 5432`. - -5. In pgAdmin, run `CREATE DATABASE guardian;`. - -6. Connect to the database with `\c guardian` in your Powershell. +4. build Docker image for PostgreSQL using 'docker-compose build --no-cache" -7. Run Docker for PostgreSQL using `docker-compose -f docker/docker-compose.yml up -d` +5. Run Docker for PostgreSQL using `docker-compose up` -8. Install the project requirements using `pip install requirements.txt`; if you do not have a virtual environment set up, do that first! +6. When done, close Docker using `docker-compose down` # Running diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 7507cb3..73ad51a 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -98,7 +98,11 @@ def retrieve(state: State) -> Dict[str, Any]: logging.info(f"Using port {port} for {AvailableRAGDatabases(os.getenv('DATABASE_TYPE', '').lower())}") # hostname is based on local machine or docker +<<<<<<< HEAD hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", "true") == "true" else "host.docker.internal" +======= + hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", False) else "host.docker.internal" +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 docs = requests.get(f"http://{hostname}:{port}/related-articles?query={state['question']}").json() # convert to LangChain Documents documents = [ diff --git a/scripts/pull_docs_cassandra.py b/scripts/pull_docs_cassandra.py index 6cfc5c0..f8b1e4f 100644 --- a/scripts/pull_docs_cassandra.py +++ b/scripts/pull_docs_cassandra.py @@ -45,6 +45,22 @@ def pull_docs(total_needed: int = 1000, page_size: int = 1): ); """) +<<<<<<< HEAD +======= + rows = session.execute(f""" + SELECT index_name FROM system_schema.indexes + WHERE keyspace_name = '{keyspace}' AND table_name = 'articles'; + """) + + existing_indexes = [row.index_name for row in rows] + + if 'ann_index' not in existing_indexes: + session.execute(""" + CREATE CUSTOM INDEX IF NOT EXISTS ann_index ON articles(vector) + USING 'StorageAttachedIndex'; + """) + +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 model = SentenceTransformer("all-MiniLM-L6-v2") articles_inserted = 0 articles_skipped = 0 diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/cassandra/Dockerfile b/services/cassandra/Dockerfile index 0e7e061..9eccf75 100644 --- a/services/cassandra/Dockerfile +++ b/services/cassandra/Dockerfile @@ -21,10 +21,18 @@ RUN apt-get update && \ RUN ls -la # Install dependencies +<<<<<<< HEAD COPY requirements.txt . +======= +COPY services/cassandra/requirements.txt . +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu # Default command if not overridden by docker-compose +<<<<<<< HEAD CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8002"] +======= +CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 diff --git a/services/cassandra/docker-compose.yml b/services/cassandra/docker-compose.yml index 3243641..70fcc91 100644 --- a/services/cassandra/docker-compose.yml +++ b/services/cassandra/docker-compose.yml @@ -30,9 +30,13 @@ services: - CASSANDRA_PORT=9042 - CASSANDRA_KEYSPACE=vectorembeds volumes: +<<<<<<< HEAD - ../../scripts/:/app/scripts - ../../requirements.txt:/app/requirements.txt - ../../services/cassandra:/app/services/cassandra +======= + - ../..:/app +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 - ../../.env:/app/.env working_dir: /app diff --git a/services/cassandra/requirements.txt b/services/cassandra/requirements.txt new file mode 100644 index 0000000..250d8df --- /dev/null +++ b/services/cassandra/requirements.txt @@ -0,0 +1,9 @@ +# Cassandra dependencies +cassandra-driver +fastapi==0.116.1 +pydantic==2.11.7 +python-dotenv==1.1.1 +requests==2.32.4 +sentence_transformers==5.0.0 +services==0.1.1 +uvicorn==0.35.0 From 1591f98147c1cdd2ccf3f14fb8578ac048c7309f Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 16:19:30 +1000 Subject: [PATCH 08/12] Pulling recent changes --- README.md | 6 ++++++ llm/llm_utils/langchain_pipeline.py | 4 ++++ scripts/pull_docs_cassandra.py | 6 ++++++ services/cassandra/Dockerfile | 8 ++++++++ services/cassandra/docker-compose.yml | 4 ++++ 5 files changed, 28 insertions(+) diff --git a/README.md b/README.md index ffce01e..1556eab 100644 --- a/README.md +++ b/README.md @@ -33,15 +33,21 @@ CLICKHOUSE_PASSWORD="" GUARDIAN_API_KEY="" +<<<<<<< HEAD <<<<<<< HEAD HOST="localhost" or shared IP address holding the container ======= +======= +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ANTHROPIC_API_KEY="" LANGSMITH_TRACING="true" LANGSMITH_API_KEY="" DATABASE_TYPE="" HOST="localhost" or shared IPv4 address +<<<<<<< HEAD +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ``` diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 73ad51a..d27baea 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -98,8 +98,12 @@ def retrieve(state: State) -> Dict[str, Any]: logging.info(f"Using port {port} for {AvailableRAGDatabases(os.getenv('DATABASE_TYPE', '').lower())}") # hostname is based on local machine or docker +<<<<<<< HEAD <<<<<<< HEAD hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", "true") == "true" else "host.docker.internal" +======= + hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", False) else "host.docker.internal" +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ======= hostname = "localhost" if os.getenv("LOCAL_STREAMLIT_SERVER", False) else "host.docker.internal" >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 diff --git a/scripts/pull_docs_cassandra.py b/scripts/pull_docs_cassandra.py index f8b1e4f..d04a3f2 100644 --- a/scripts/pull_docs_cassandra.py +++ b/scripts/pull_docs_cassandra.py @@ -46,7 +46,10 @@ def pull_docs(total_needed: int = 1000, page_size: int = 1): """) <<<<<<< HEAD +<<<<<<< HEAD +======= ======= +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 rows = session.execute(f""" SELECT index_name FROM system_schema.indexes WHERE keyspace_name = '{keyspace}' AND table_name = 'articles'; @@ -60,6 +63,9 @@ def pull_docs(total_needed: int = 1000, page_size: int = 1): USING 'StorageAttachedIndex'; """) +<<<<<<< HEAD +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 model = SentenceTransformer("all-MiniLM-L6-v2") articles_inserted = 0 diff --git a/services/cassandra/Dockerfile b/services/cassandra/Dockerfile index 9eccf75..25ee564 100644 --- a/services/cassandra/Dockerfile +++ b/services/cassandra/Dockerfile @@ -22,17 +22,25 @@ RUN ls -la # Install dependencies <<<<<<< HEAD +<<<<<<< HEAD COPY requirements.txt . ======= COPY services/cassandra/requirements.txt . >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= +COPY services/cassandra/requirements.txt . +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu # Default command if not overridden by docker-compose <<<<<<< HEAD +<<<<<<< HEAD CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8002"] ======= CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= +CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 diff --git a/services/cassandra/docker-compose.yml b/services/cassandra/docker-compose.yml index 70fcc91..8ac4a5d 100644 --- a/services/cassandra/docker-compose.yml +++ b/services/cassandra/docker-compose.yml @@ -30,10 +30,14 @@ services: - CASSANDRA_PORT=9042 - CASSANDRA_KEYSPACE=vectorembeds volumes: +<<<<<<< HEAD <<<<<<< HEAD - ../../scripts/:/app/scripts - ../../requirements.txt:/app/requirements.txt - ../../services/cassandra:/app/services/cassandra +======= + - ../..:/app +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ======= - ../..:/app >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 From 89f71df660d63ab4d411ed11449e6af091e76266 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 16:28:46 +1000 Subject: [PATCH 09/12] Pulling recent changes --- services/cassandra/Dockerfile | 8 ++++++++ services/cassandra/docker-compose.yml | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/services/cassandra/Dockerfile b/services/cassandra/Dockerfile index 25ee564..2a15dc4 100644 --- a/services/cassandra/Dockerfile +++ b/services/cassandra/Dockerfile @@ -23,6 +23,7 @@ RUN ls -la # Install dependencies <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD COPY requirements.txt . ======= COPY services/cassandra/requirements.txt . @@ -30,6 +31,9 @@ COPY services/cassandra/requirements.txt . ======= COPY services/cassandra/requirements.txt . >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= +COPY services/cassandra/requirements.txt . +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 RUN pip install --upgrade pip RUN pip install --no-cache-dir -r requirements.txt RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu @@ -37,6 +41,7 @@ RUN pip install torch torchvision torchaudio --index-url https://download.pytorc # Default command if not overridden by docker-compose <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8002"] ======= CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] @@ -44,3 +49,6 @@ CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", " ======= CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= +CMD ["python", "-m", "uvicorn", "services.cassandra.cassandra_controller:app", "--host", "0.0.0.0", "--port", "8003"] +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 diff --git a/services/cassandra/docker-compose.yml b/services/cassandra/docker-compose.yml index 8ac4a5d..9659555 100644 --- a/services/cassandra/docker-compose.yml +++ b/services/cassandra/docker-compose.yml @@ -31,6 +31,7 @@ services: - CASSANDRA_KEYSPACE=vectorembeds volumes: <<<<<<< HEAD +<<<<<<< HEAD <<<<<<< HEAD - ../../scripts/:/app/scripts - ../../requirements.txt:/app/requirements.txt @@ -38,6 +39,9 @@ services: ======= - ../..:/app >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 +======= + - ../..:/app +>>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 ======= - ../..:/app >>>>>>> 9634c8a0ef870fa079b7b6de8463d67f3425a819 From 86bc583ae8634746bde45e4d32db2ee7711eefb0 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Mon, 28 Jul 2025 18:21:42 +1000 Subject: [PATCH 10/12] Adding live metrics streamlit --- services/streamlit/docker-compose.yaml | 2 ++ services/streamlit/provisioner/dashboard.json | 6 +++--- .../provisioner/provision_grafana.py | 19 +++++++++++++------ services/streamlit/requirements.txt | 1 + 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/services/streamlit/docker-compose.yaml b/services/streamlit/docker-compose.yaml index 8232eb9..dc83a8a 100644 --- a/services/streamlit/docker-compose.yaml +++ b/services/streamlit/docker-compose.yaml @@ -40,3 +40,5 @@ services: - GRAFANA_USER=admin - GRAFANA_PASS=admin restart: "no" + networks: + - default diff --git a/services/streamlit/provisioner/dashboard.json b/services/streamlit/provisioner/dashboard.json index f47d5b0..bb920ff 100644 --- a/services/streamlit/provisioner/dashboard.json +++ b/services/streamlit/provisioner/dashboard.json @@ -24,7 +24,7 @@ { "datasource": { "type": "grafana-clickhouse-datasource", - "uid": "__datasource__" + "uid": "ClickHouse" }, "fieldConfig": { "defaults": { @@ -126,7 +126,7 @@ { "datasource": { "type": "grafana-clickhouse-datasource", - "uid": "__datasource__" + "uid": "ClickHouse" }, "fieldConfig": { "defaults": { @@ -206,7 +206,7 @@ { "datasource": { "type": "grafana-clickhouse-datasource", - "uid": "__datasource__" + "uid": "ClickHouse" }, "editorType": "sql", "format": 1, diff --git a/services/streamlit/provisioner/provision_grafana.py b/services/streamlit/provisioner/provision_grafana.py index e47a72e..2e6f442 100644 --- a/services/streamlit/provisioner/provision_grafana.py +++ b/services/streamlit/provisioner/provision_grafana.py @@ -8,6 +8,13 @@ PASSWORD = os.getenv("GRAFANA_PASS", "admin") DASHBOARD_PATH = "dashboard.json" +# ClickHouse connection details +CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "10.0.100.92") +CLICKHOUSE_PORT = os.getenv("CLICKHOUSE_PORT", "8123") +CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "guardian") +CLICKHOUSE_USER = os.getenv("CLICKHOUSE_USER", "user") +CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "default") + MAX_RETRIES = 10 RETRY_DELAY = 3 # seconds @@ -34,19 +41,19 @@ def create_datasource(session): "name": "ClickHouse", "type": "grafana-clickhouse-datasource", "access": "proxy", - "url": "http://10.0.100.92:8123", # Adjust as needed + "url": f"http://{CLICKHOUSE_HOST}:{CLICKHOUSE_PORT}", "basicAuth": False, "jsonData": { - "defaultDatabase": "guardian", - "port": 8123, - "username": "user", - "server": "10.0.100.92", # 👈 required + "defaultDatabase": CLICKHOUSE_DATABASE, + "port": int(CLICKHOUSE_PORT), + "username": CLICKHOUSE_USER, + "server": CLICKHOUSE_HOST, "secure": False, "protocol": "http", "skip-tls-verify": True }, "secureJsonData": { - "password": "default" + "password": CLICKHOUSE_PASSWORD }, "isDefault": True } diff --git a/services/streamlit/requirements.txt b/services/streamlit/requirements.txt index 84682d5..3ae4a9e 100644 --- a/services/streamlit/requirements.txt +++ b/services/streamlit/requirements.txt @@ -1,5 +1,6 @@ # Streamlit service requirements anthropic==0.59.0 +clickhouse_connect==0.8.18 fastapi==0.116.1 langchain==0.3.27 langchain_anthropic==0.3.17 From 801aad646a61d966553cd87f9a8ab5b258f0dff5 Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Tue, 29 Jul 2025 09:29:25 +1000 Subject: [PATCH 11/12] adding metrics to streamlit --- llm/langchain_gui.py | 17 +++++++++++++++-- llm/langchain_metrics.py | 7 ++++++- llm/llm_utils/langchain_pipeline.py | 5 ++++- llm/llm_utils/langchain_pipeline_2.py | 2 +- requirements.txt | 2 +- 5 files changed, 27 insertions(+), 6 deletions(-) diff --git a/llm/langchain_gui.py b/llm/langchain_gui.py index e74922d..602eb51 100644 --- a/llm/langchain_gui.py +++ b/llm/langchain_gui.py @@ -8,6 +8,8 @@ LOADING_URL = "https://cdn.pixabay.com/animation/2025/04/08/09/08/09-08-31-655_512.gif" DURATION_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753395832378&to=1753417432378&timezone=browser&panelId=1&__feature.dashboardSceneSolo=true" TOKEN_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753645594350&to=1753667194350&timezone=browser&panelId=2&__feature.dashboardSceneSolo=true" +CONTEXT_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753613969712&to=1753700369712&timezone=browser&panelId=3&__feature.dashboardSceneSolo=true" +MOST_RECENT_RUNS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753613969712&to=1753700369712&timezone=browser&panelId=4&__feature.dashboardSceneSolo=true" # --- Custom styles --- st.markdown(f""" @@ -401,14 +403,25 @@ def run_api(): st.warning("Please enter at least one valid query.") with tab4: - tab4a, tab4b = st.tabs(["Duration Metrics", "Token Metrics"]) + tab4a, tab4b, tab4c, tab4d = st.tabs(["Most Recent Run", "Duration Metrics", "Token Metrics", "Context Metrics"]) with tab4a: + st.components.v1.html( + f'', + height=800) + + with tab4b: st.components.v1.html( f'', height=600) - with tab4b: + with tab4c: st.components.v1.html( f'', height=600) + + with tab4d: + st.components.v1.html( + f'', + height=800) + diff --git a/llm/langchain_metrics.py b/llm/langchain_metrics.py index c071761..d121f3e 100644 --- a/llm/langchain_metrics.py +++ b/llm/langchain_metrics.py @@ -94,9 +94,14 @@ def save_to_clickhouse(self, run): start_time = start_time.astimezone(timezone('Australia/Sydney')) end_time = end_time.astimezone(timezone('Australia/Sydney')) + # Handle lambda names by replacing with "retrieve" + run_name = str(run.name) if run.name else "" + if run_name == "": + run_name = "retrieve" + row = [ str(run.id), - str(run.name) if run.name else "", + run_name, str(run.status) if run.status else "", int(run.total_tokens) if run.total_tokens is not None else 0, float(run.total_cost) if run.total_cost is not None else 0.0, diff --git a/llm/llm_utils/langchain_pipeline.py b/llm/llm_utils/langchain_pipeline.py index 40cd05d..e81a024 100644 --- a/llm/llm_utils/langchain_pipeline.py +++ b/llm/llm_utils/langchain_pipeline.py @@ -13,7 +13,10 @@ # === LangGraph imports === from langgraph.graph import StateGraph, START from typing_extensions import TypedDict -from llm.langchain_metrics import LangchainMetrics +import sys +import os +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from langchain_metrics import LangchainMetrics from langchain_core.callbacks.base import BaseCallbackHandler load_dotenv() diff --git a/llm/llm_utils/langchain_pipeline_2.py b/llm/llm_utils/langchain_pipeline_2.py index bf513dd..10c1c25 100644 --- a/llm/llm_utils/langchain_pipeline_2.py +++ b/llm/llm_utils/langchain_pipeline_2.py @@ -8,7 +8,7 @@ from langchain.prompts import PromptTemplate from pydantic import SecretStr import requests -from llm.langchain_metrics import LangchainMetrics +from langchain_metrics import LangchainMetrics from langchain_core.callbacks.base import BaseCallbackHandler from langchain_core.runnables import RunnableSequence from typing_extensions import TypedDict diff --git a/requirements.txt b/requirements.txt index 0bbe95e..4d1fc2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ #uvicorn==0.35.0 # LangChain dependencies -#clickhouse_connect==0.8.18 +clickhouse_connect==0.8.18 langchain~=0.3.26 langchain-community langchain-core From 49e5369251fe5778e962187d4839b82681bc270b Mon Sep 17 00:00:00 2001 From: sheyhanxlalani <123619461+sheyhanxlalani@users.noreply.github.com> Date: Tue, 29 Jul 2025 10:26:56 +1000 Subject: [PATCH 12/12] added new table in streamlit --- llm/langchain_gui.py | 6 ++++++ services/clickhouse/docker-compose.yaml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/llm/langchain_gui.py b/llm/langchain_gui.py index 4241687..8ff3783 100644 --- a/llm/langchain_gui.py +++ b/llm/langchain_gui.py @@ -15,6 +15,7 @@ DURATION_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753395832378&to=1753417432378&timezone=browser&panelId=1&__feature.dashboardSceneSolo=true" TOKEN_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753645594350&to=1753667194350&timezone=browser&panelId=2&__feature.dashboardSceneSolo=true" MOST_RECENT_RUNS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753613969712&to=1753700369712&timezone=browser&panelId=4&__feature.dashboardSceneSolo=true" +CONTEXT_METRICS_URL = "http://localhost:3000/d-solo/90ced2bd-5ea8-42c5-b87b-be9e1a8cdb4c/db-metrics-visualization?orgId=1&from=1753645594350&to=1753667194350&timezone=browser&panelId=3&__feature.dashboardSceneSolo=true" # Database options - customize these based on your available databases DATABASE_OPTIONS = [ db.value[0] for db in Database @@ -683,4 +684,9 @@ def log_user_interaction(interaction_type, details): with tab4c: st.components.v1.html( f'', + height=600) + + with tab4d: + st.components.v1.html( + f'', height=600) \ No newline at end of file diff --git a/services/clickhouse/docker-compose.yaml b/services/clickhouse/docker-compose.yaml index 61ed390..b5ae685 100644 --- a/services/clickhouse/docker-compose.yaml +++ b/services/clickhouse/docker-compose.yaml @@ -4,7 +4,7 @@ services: container_name: clickhouse-server hostname: clickhouse ports: - - "8124:8123" # HTTP interface + - "8123:8123" # HTTP interface - "9001:9000" # Native TCP interface volumes: - ./user_directories/config.xml:/etc/clickhouse-server/user_directories/config.xml