diff --git a/bin/run-response.sh b/bin/run-response.sh new file mode 100755 index 0000000..865446a --- /dev/null +++ b/bin/run-response.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +ROOT=`git rev-parse --show-toplevel` + +export PYTHONPATH=$ROOT + +_repetition=5 +_default_model=gpt-4o-mini + +while getopts 'n:p:d:g:m:e:ch' option; do + case $option in + n) _repetition=$OPTARG ;; + p) _prompts=$OPTARG ;; + d) _documents=$OPTARG ;; + g) _gt=$OPTARG ;; + m) _models=( ${_models[@]} --model $OPTARG ) ;; + e) _extra=( ${_extra[@]} --extra-info $OPTARG ) ;; + h) + cat < pd.DataFrame: + df = pd.read_json(file_path, lines=True) + # Flatten the nested response field + response_data = pd.json_normalize(df['response'].apply(lambda r: r[0] if isinstance(r, list) and r else {})) + df_flat = pd.concat([df.drop(columns='response'), response_data], axis=1) + return df_flat + +def compute_latency_stats(df: pd.DataFrame) -> pd.DataFrame: + latencies = df['latency'] + stats = { + 'Total Responses': len(latencies), + 'Total Time (minutes)': latencies.sum() / 60, + 'Min Latency (seconds)': latencies.min(), + 'Max Latency (seconds)': latencies.max(), + 'Average Latency (seconds)': latencies.mean(), + 'Median Latency (seconds)': latencies.median() + } + return pd.DataFrame(list(stats.items()), columns=['Metric', 'Value']) + +def plot_latency_stats(stats_df: pd.DataFrame, output_path: Path): + plt.figure(figsize=(10, 6)) + bars = plt.barh(stats_df['Metric'], stats_df['Value'], color='skyblue') + plt.xlabel('Value') + plt.title('Latency Summary') + plt.grid(True, linestyle='dotted', axis='x', alpha=0.5) + + for bar in bars: + width = bar.get_width() + plt.text(width, bar.get_y() + bar.get_height() / 2, f'{width:.2f}', va='center') + + plt.tight_layout() + plt.savefig(output_path) + print(f"Saved plot to {output_path}") + +if __name__ == '__main__': + parser = ArgumentParser(description="Plot latency summary from a JSONL file.") + parser.add_argument('--input', type=Path, required=True, help="Path to the .jsonl file") + parser.add_argument('--output', type=Path, default=Path("latency_summary.png"), help="Path to save the output plot") + args = parser.parse_args() + + df = load_and_flatten_jsonl(args.input) + stats_df = compute_latency_stats(df) + plot_latency_stats(stats_df, args.output) diff --git a/src/prompt/response.py b/src/prompt/response.py new file mode 100644 index 0000000..a14cc96 --- /dev/null +++ b/src/prompt/response.py @@ -0,0 +1,302 @@ +import sys +import json +import math +import time +import operator as op +import itertools as it +from uuid import uuid4 +from pathlib import Path +from argparse import ArgumentParser +from dataclasses import dataclass, astuple, asdict +from multiprocessing import Pool, Queue + +import pandas as pd +from openai import OpenAI, OpenAIError, NotFoundError + +from mylib import Logger, ExperimentResponse, FileIterator + +@dataclass(frozen=True) +class Resource: + response: str + vector_store: str + +@dataclass(frozen=True) +class Job: + resource: Resource + model: str + config: dict + +def vs_ls(vector_store_id, client): + kwargs = {} + while True: + page = client.vector_stores.files.list( + vector_store_id=vector_store_id, + **kwargs, + ) + yield from page + if not page.has_more: + break + kwargs['after'] = page.last_id + +def scanp(config, root, ptype): + return (root + .joinpath(ptype, config[ptype]) + .read_text()) + +class ResourceCleaner: + def __init__(self, resource): + self.resource = resource + + def __call__(self, client, retries=1): + for _ in range(retries): + try: + self.clean(client) + break + except NotFoundError: + pass + else: + Logger.error('Cannot clean %s', type(self).__name__) + + def clean(self, client): + raise NotImplementedError() + +class VectorStoreCleaner(ResourceCleaner): + def clean(self, client): + for i in vs_ls(self.resource, client): + client.files.delete(i.id) + client.vector_stores.delete(self.resource) + +class ResponseCleaner(ResourceCleaner): + def clean(self, client): + client.responses.delete(self.resource) + +class ResourceCreator: + def __init__(self, client, args): + self.client = client + self.args = args + + def __call__(self, config, **kwargs): + handle = self.create(config, **kwargs) + return handle.id + + def create(self, config, **kwargs): + raise NotImplementedError() + +class VectorStoreCreator(ResourceCreator): + def __init__(self, client, args): + super().__init__(client, args) + self.ls = FileIterator(self.args.upload_batch_size) + + def create(self, config, **kwargs): + documents = self.args.document_root.joinpath(config['docs']) + vector_store = self.client.vector_stores.create() + + for paths in self.ls(documents): + Logger.info('Uploading %d', len(paths)) + + files = [ x.open('rb') for x in paths ] + file_batch = (self + .client + .vector_stores + .file_batches.upload_and_poll( + vector_store_id=vector_store.id, + files=files, + )) + for i in files: + i.close() + self.raise_for_status(file_batch, vector_store, paths) + + return vector_store + + def raise_for_status(self, response, vector_store, paths): + assert response.file_counts.total == len(paths) + + if response.file_counts.completed != response.file_counts.total: + paths = { str(x.name): x for x in paths } + + for i in vs_ls(vector_store.id, self.client): + if i.last_error is None: + document = self.client.files.retrieve(i.id) + paths.pop(document.filename) + for i in paths.values(): + Logger.error('Upload error: %s', i) + + vector_store_cleaner = VectorStoreCleaner(vector_store.id) + vector_store_cleaner(self.client, self.args.cleanup_attempts) + + raise IndexError('Upload failure ({} of {}): {}'.format( + response.file_counts.failed, + response.file_counts.total, + ', '.join(map(str, paths.values())), + )) + +class ResponseCreator(ResourceCreator): + _kwargs = ( + 'model', + 'vector_store', + 'question', + ) + + def create(self, config, **kwargs): + model, vector_store_id, question = map(kwargs.get, self._kwargs) + instructions = scanp(config, self.args.prompt_root, 'system') + + response = self.client.responses.create( + model=model, + instructions=instructions, + tools=[ + { + "type": "file_search", + "vector_store_ids": [vector_store_id], + "max_num_results": 20, + } + ], + temperature=0.1, + input=[{"role": "user", "content": question}], + include=["file_search_call.results"], + ) + + return response + +@dataclass(frozen=True) +class ResourceKey: + docs: str + model: str + +class OpenAIResources: + _resources = ( + (ResponseCreator, ResponseCleaner), + (VectorStoreCreator, VectorStoreCleaner), + ) + + def __init__(self, args): + self.args = args + + self.client = OpenAI() + self.resources = {} + (self.r_creator, self.v_creator) = ( + x(self.client, self.args) for (x, _) in self._resources + ) + + def __enter__(self): + self.resources.clear() + return self + + def __exit__(self, exc_type, exc_value, traceback): + cleaners = list(map(op.itemgetter(1), self._resources)) + for resource in self.resources.values(): + for (MyCleaner, r) in zip(cleaners, astuple(resource)): + cleaner = MyCleaner(r) + cleaner(self.client, self.args.cleanup_attempts) + + def __call__(self, fp): + for line in fp: + config = json.loads(line) + docs = config['docs'] + question = scanp(config, self.args.prompt_root, 'user') + for model in self.args.model: + key = ResourceKey(docs, model) + resource = self.resources.get(key) + if resource is None: + vector_store = self.v_creator(config) + response = self.r_creator( + config, + model=model, + vector_store=vector_store, + question=question, + ) + resource = Resource(response, vector_store) + self.resources[key] = resource + + yield Job(resource, model, config) + +MAX_LATENCY = 90 # seconds + +def now(): + return time.strftime('%Y-%m-%d %H:%M:%S') + +def func(incoming, outgoing, session_id, args): + import datetime + import concurrent.futures + client = OpenAI() + creator = ResponseCreator(client, args) + + while True: + job = incoming.get() + Logger.info('[%s] Received job | Config: %s | Model: %s', now(), job.config, job.model) + + question = scanp(job.config, args.prompt_root, 'user') + + def generate_response(): + return creator.create( + job.config, + model=job.model, + vector_store=job.resource.vector_store, + question=question, + ) + + try: + t_start = time.perf_counter() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(generate_response) + response = future.result(timeout=MAX_LATENCY) + t_end = time.perf_counter() + except concurrent.futures.TimeoutError: + Logger.error('[%s] Response generation timed out after %ds | Config: %s | Model: %s', + now(), MAX_LATENCY, job.config, job.model) + continue + except Exception as e: + Logger.critical('[%s] Error during response generation: %s | Config: %s', now(), e, job.config) + continue + + latency = t_end - t_start + + Logger.info('[%s] Response generated | Latency: %.2fs | Model: %s | Response ID: %s', + now(), latency, job.model, response.id) + + outgoing.put({ + "system": job.config["system"], + "user": job.config["user"], + "docs": job.config["docs"], + "sequence": job.config.get("sequence", 0), + "response": [ + { + "message": response.output_text, + "model": job.model, + "latency": latency, + "response_id": response.id, + "date": datetime.datetime.now().ctime() + } + ] + }) + + +if __name__ == '__main__': + arguments = ArgumentParser() + arguments.add_argument('--prompt-root', type=Path) + arguments.add_argument('--document-root', type=Path) + arguments.add_argument('--model', action='append') + arguments.add_argument('--cleanup-attempts', type=int, default=3) + arguments.add_argument('--upload-batch-size', type=int, default=20) + arguments.add_argument('--workers', type=int) + args = arguments.parse_args() + + incoming = Queue() + outgoing = Queue() + initargs = ( + incoming, + outgoing, + str(uuid4()), + args, + ) + + with Pool(args.workers, func, initargs): + with OpenAIResources(args) as resources: + jobs = 0 + for i in resources(sys.stdin): + incoming.put(i) + jobs += 1 + + for _ in range(jobs): + result = outgoing.get() + print(json.dumps(result)) diff --git a/src/prompt/run.py b/src/prompt/run.py index 4050677..83830c8 100644 --- a/src/prompt/run.py +++ b/src/prompt/run.py @@ -9,6 +9,7 @@ from argparse import ArgumentParser from dataclasses import dataclass, astuple, asdict from multiprocessing import Pool, Queue +import concurrent.futures import pandas as pd from openai import OpenAI, OpenAIError, NotFoundError @@ -241,6 +242,19 @@ def __call__(self, fp): # # # + + +MAX_LATENCY = 90 # seconds, cap total time for each run + +def timeout_run(client, thread, assistant_id): + return client.beta.threads.runs.create_and_poll( + thread_id=thread.id, + assistant_id=assistant_id, + ) + +def now(): # Utility for nicer timestamp logging + return time.strftime('%Y-%m-%d %H:%M:%S') + class ThreadRunner: @staticmethod def parse_wait_time(err): @@ -251,7 +265,6 @@ def parse_wait_time(err): return (pd .to_timedelta(wait) .total_seconds()) - raise TypeError(err.code) def __init__(self, client, response_id): @@ -262,15 +275,21 @@ def __call__(self, job, thread): for i in it.count(): try: t_start = time.perf_counter() - run = self.client.beta.threads.runs.create_and_poll( - thread_id=thread.id, - assistant_id=job.resource.assistant, - ) + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(timeout_run, self.client, thread, job.resource.assistant) + run = future.result(timeout=MAX_LATENCY) t_end = time.perf_counter() + except concurrent.futures.TimeoutError: + Logger.error('[%s] Run timed out after %ds | Config: %s | Assistant: %s', + now(), MAX_LATENCY, job.config, job.resource.assistant) + return None except OpenAIError as err: - Logger.critical(err) + Logger.critical('[%s] OpenAIError: %s | Config: %s', now(), err, job.config) continue + Logger.info('[%s] Run completed | Status: %s | Time: %.2fs | Model: %s | Config: %s | Run ID: %s', + now(), run.status, t_end - t_start, job.model, job.config, run.id) + if run.status == 'completed': break if run.status == 'is_expired': @@ -285,10 +304,12 @@ def __call__(self, job, thread): rest = None if rest is not None: rest = math.ceil(rest) - Logger.warning('Sleeping %ds', rest) + Logger.warning('[%s] Rate limit hit. Sleeping %ds | Error: %s', + now(), rest, run.last_error.message) time.sleep(rest) - Logger.error('%d / %s / %s', i, job.config, run) + Logger.error('[%s] Retry %d | Config: %s | Run Status: %s', + now(), i, job.config, run.status) latency = t_end - t_start response = self.client.beta.threads.messages.list( @@ -303,7 +324,6 @@ def __call__(self, job, thread): latency=latency, response_id=self.response_id, ) - # # #