From 1405d67adf3c90958cde669a7a9cdacd92281d94 Mon Sep 17 00:00:00 2001 From: nishika26 Date: Thu, 12 Jun 2025 09:39:49 +0530 Subject: [PATCH 1/2] adding responses api to script --- bin/run-response.sh | 58 +++++++++ src/prompt/response.py | 278 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100755 bin/run-response.sh create mode 100644 src/prompt/response.py diff --git a/bin/run-response.sh b/bin/run-response.sh new file mode 100755 index 0000000..865446a --- /dev/null +++ b/bin/run-response.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +ROOT=`git rev-parse --show-toplevel` + +export PYTHONPATH=$ROOT + +_repetition=5 +_default_model=gpt-4o-mini + +while getopts 'n:p:d:g:m:e:ch' option; do + case $option in + n) _repetition=$OPTARG ;; + p) _prompts=$OPTARG ;; + d) _documents=$OPTARG ;; + g) _gt=$OPTARG ;; + m) _models=( ${_models[@]} --model $OPTARG ) ;; + e) _extra=( ${_extra[@]} --extra-info $OPTARG ) ;; + h) + cat < Date: Thu, 12 Jun 2025 09:45:22 +0530 Subject: [PATCH 2/2] adding max latency stopper --- src/analysis/response-scores/latency.py | 48 +++++++++++++++++++++++++ src/prompt/response.py | 44 +++++++++++++++++------ src/prompt/run.py | 38 +++++++++++++++----- 3 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 src/analysis/response-scores/latency.py diff --git a/src/analysis/response-scores/latency.py b/src/analysis/response-scores/latency.py new file mode 100644 index 0000000..932d11e --- /dev/null +++ b/src/analysis/response-scores/latency.py @@ -0,0 +1,48 @@ +import pandas as pd +import matplotlib.pyplot as plt +from argparse import ArgumentParser +from pathlib import Path + +def load_and_flatten_jsonl(file_path: Path) -> pd.DataFrame: + df = pd.read_json(file_path, lines=True) + # Flatten the nested response field + response_data = pd.json_normalize(df['response'].apply(lambda r: r[0] if isinstance(r, list) and r else {})) + df_flat = pd.concat([df.drop(columns='response'), response_data], axis=1) + return df_flat + +def compute_latency_stats(df: pd.DataFrame) -> pd.DataFrame: + latencies = df['latency'] + stats = { + 'Total Responses': len(latencies), + 'Total Time (minutes)': latencies.sum() / 60, + 'Min Latency (seconds)': latencies.min(), + 'Max Latency (seconds)': latencies.max(), + 'Average Latency (seconds)': latencies.mean(), + 'Median Latency (seconds)': latencies.median() + } + return pd.DataFrame(list(stats.items()), columns=['Metric', 'Value']) + +def plot_latency_stats(stats_df: pd.DataFrame, output_path: Path): + plt.figure(figsize=(10, 6)) + bars = plt.barh(stats_df['Metric'], stats_df['Value'], color='skyblue') + plt.xlabel('Value') + plt.title('Latency Summary') + plt.grid(True, linestyle='dotted', axis='x', alpha=0.5) + + for bar in bars: + width = bar.get_width() + plt.text(width, bar.get_y() + bar.get_height() / 2, f'{width:.2f}', va='center') + + plt.tight_layout() + plt.savefig(output_path) + print(f"Saved plot to {output_path}") + +if __name__ == '__main__': + parser = ArgumentParser(description="Plot latency summary from a JSONL file.") + parser.add_argument('--input', type=Path, required=True, help="Path to the .jsonl file") + parser.add_argument('--output', type=Path, default=Path("latency_summary.png"), help="Path to save the output plot") + args = parser.parse_args() + + df = load_and_flatten_jsonl(args.input) + stats_df = compute_latency_stats(df) + plot_latency_stats(stats_df, args.output) diff --git a/src/prompt/response.py b/src/prompt/response.py index 0c6b77f..a14cc96 100644 --- a/src/prompt/response.py +++ b/src/prompt/response.py @@ -210,26 +210,49 @@ def __call__(self, fp): yield Job(resource, model, config) +MAX_LATENCY = 90 # seconds + +def now(): + return time.strftime('%Y-%m-%d %H:%M:%S') + def func(incoming, outgoing, session_id, args): import datetime + import concurrent.futures client = OpenAI() creator = ResponseCreator(client, args) while True: job = incoming.get() - Logger.info(job) + Logger.info('[%s] Received job | Config: %s | Model: %s', now(), job.config, job.model) question = scanp(job.config, args.prompt_root, 'user') - start = time.time() - response = creator.create( - job.config, - model=job.model, - vector_store=job.resource.vector_store, - question=question, - ) - - latency = time.time() - start + def generate_response(): + return creator.create( + job.config, + model=job.model, + vector_store=job.resource.vector_store, + question=question, + ) + + try: + t_start = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(generate_response) + response = future.result(timeout=MAX_LATENCY) + t_end = time.perf_counter() + except concurrent.futures.TimeoutError: + Logger.error('[%s] Response generation timed out after %ds | Config: %s | Model: %s', + now(), MAX_LATENCY, job.config, job.model) + continue + except Exception as e: + Logger.critical('[%s] Error during response generation: %s | Config: %s', now(), e, job.config) + continue + + latency = t_end - t_start + + Logger.info('[%s] Response generated | Latency: %.2fs | Model: %s | Response ID: %s', + now(), latency, job.model, response.id) outgoing.put({ "system": job.config["system"], @@ -247,6 +270,7 @@ def func(incoming, outgoing, session_id, args): ] }) + if __name__ == '__main__': arguments = ArgumentParser() arguments.add_argument('--prompt-root', type=Path) diff --git a/src/prompt/run.py b/src/prompt/run.py index 4050677..83830c8 100644 --- a/src/prompt/run.py +++ b/src/prompt/run.py @@ -9,6 +9,7 @@ from argparse import ArgumentParser from dataclasses import dataclass, astuple, asdict from multiprocessing import Pool, Queue +import concurrent.futures import pandas as pd from openai import OpenAI, OpenAIError, NotFoundError @@ -241,6 +242,19 @@ def __call__(self, fp): # # # + + +MAX_LATENCY = 90 # seconds, cap total time for each run + +def timeout_run(client, thread, assistant_id): + return client.beta.threads.runs.create_and_poll( + thread_id=thread.id, + assistant_id=assistant_id, + ) + +def now(): # Utility for nicer timestamp logging + return time.strftime('%Y-%m-%d %H:%M:%S') + class ThreadRunner: @staticmethod def parse_wait_time(err): @@ -251,7 +265,6 @@ def parse_wait_time(err): return (pd .to_timedelta(wait) .total_seconds()) - raise TypeError(err.code) def __init__(self, client, response_id): @@ -262,15 +275,21 @@ def __call__(self, job, thread): for i in it.count(): try: t_start = time.perf_counter() - run = self.client.beta.threads.runs.create_and_poll( - thread_id=thread.id, - assistant_id=job.resource.assistant, - ) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(timeout_run, self.client, thread, job.resource.assistant) + run = future.result(timeout=MAX_LATENCY) t_end = time.perf_counter() + except concurrent.futures.TimeoutError: + Logger.error('[%s] Run timed out after %ds | Config: %s | Assistant: %s', + now(), MAX_LATENCY, job.config, job.resource.assistant) + return None except OpenAIError as err: - Logger.critical(err) + Logger.critical('[%s] OpenAIError: %s | Config: %s', now(), err, job.config) continue + Logger.info('[%s] Run completed | Status: %s | Time: %.2fs | Model: %s | Config: %s | Run ID: %s', + now(), run.status, t_end - t_start, job.model, job.config, run.id) + if run.status == 'completed': break if run.status == 'is_expired': @@ -285,10 +304,12 @@ def __call__(self, job, thread): rest = None if rest is not None: rest = math.ceil(rest) - Logger.warning('Sleeping %ds', rest) + Logger.warning('[%s] Rate limit hit. Sleeping %ds | Error: %s', + now(), rest, run.last_error.message) time.sleep(rest) - Logger.error('%d / %s / %s', i, job.config, run) + Logger.error('[%s] Retry %d | Config: %s | Run Status: %s', + now(), i, job.config, run.status) latency = t_end - t_start response = self.client.beta.threads.messages.list( @@ -303,7 +324,6 @@ def __call__(self, job, thread): latency=latency, response_id=self.response_id, ) - # # #