From 72e9d99b1aaf534ab96382c016aaba79975c05d1 Mon Sep 17 00:00:00 2001 From: Bradley Erickson Date: Fri, 18 Apr 2025 08:16:24 -0400 Subject: [PATCH 1/2] added rate limiting --- VERSION | 2 +- learning_observer/VERSION | 2 +- learning_observer/learning_observer/cache.py | 3 +- .../learning_observer/rate_limiting.py | 73 +++++++++++++++++++ modules/wo_bulk_essay_analysis/VERSION | 2 +- .../wo_bulk_essay_analysis/gpt.py | 8 +- modules/writing_observer/VERSION | 2 +- .../writing_observer/module.py | 7 +- 8 files changed, 89 insertions(+), 10 deletions(-) create mode 100644 learning_observer/learning_observer/rate_limiting.py diff --git a/VERSION b/VERSION index c7ed6994..246bcbeb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/learning_observer/VERSION b/learning_observer/VERSION index 995fecb1..246bcbeb 100644 --- a/learning_observer/VERSION +++ b/learning_observer/VERSION @@ -1 +1 @@ -0.1.0+2025.04.10T17.37.57.685Z.95ba1c24.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/learning_observer/learning_observer/cache.py b/learning_observer/learning_observer/cache.py index ab11259c..96018500 100644 --- a/learning_observer/learning_observer/cache.py +++ b/learning_observer/learning_observer/cache.py @@ -37,7 +37,8 @@ async def wrapper(*args, **kwargs): return await func(*args, **kwargs) # process item if the cache is present - key = create_key_from_args(func, args, kwargs) + kwargs_no_runtime = {k: v for k, v in kwargs.items() if k != 'runtime'} + key = create_key_from_args(func, args, kwargs_no_runtime) if key in await cache_backend.keys(): return await cache_backend[key] result = await func(*args, **kwargs) diff --git a/learning_observer/learning_observer/rate_limiting.py b/learning_observer/learning_observer/rate_limiting.py new file mode 100644 index 00000000..a65d1bec --- /dev/null +++ b/learning_observer/learning_observer/rate_limiting.py @@ -0,0 +1,73 @@ +''' +This file defines a rate limit decorator function. +''' +import asyncio +import collections +import functools +import inspect +import time + +import learning_observer.auth +import learning_observer.constants + +RATE_LIMITERS = {} +RL_LOCK = asyncio.Lock() + + +def create_rate_limiter(service_name): + '''Factory function for rate limiters with closure over service name''' + async def check_rate_limit(user_id): + '''Reusable rate limiter with service-specific settings''' + # TODO fetch from pmss/define appropriate window + max_requests = 2 + window_seconds = 60 + + async with RL_LOCK: + now = time.time() + + # Initialize user/service tracking + key = f'rate_limit:{service_name}:{user_id}' + if key not in RATE_LIMITERS: + RATE_LIMITERS[key] = collections.deque() + + # Expire old requests + timestamps = RATE_LIMITERS[key] + while timestamps and (now - timestamps[0]) > window_seconds: + timestamps.popleft() + + if len(timestamps) >= max_requests: + return False + + timestamps.append(now) + return True + + return check_rate_limit + + +def rate_limited(service_name): + '''Decorator for async functions needing rate limiting''' + def decorator(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Get the appropriate rate limiter + if 'runtime' not in kwargs: + raise TypeError(f'`{func.__name__}` requires `runtime` keyword argument for checking rate limits.') + + runtime = kwargs['runtime'] + + check_limit = create_rate_limiter(service_name) + + # Check rate limits before execution + request = runtime.get_request() + user = await learning_observer.auth.get_active_user(request) + user_id = user[learning_observer.constants.USER_ID] + if not await check_limit(user_id): + raise PermissionError(f'Rate limit exceeded for {service_name} service') + + function_signature = inspect.signature(func) + if 'runtime' not in function_signature.parameters: + kwargs = {k: v for k, v in kwargs.items() if k != 'runtime'} + + return await func(*args, **kwargs) + return wrapper + return decorator diff --git a/modules/wo_bulk_essay_analysis/VERSION b/modules/wo_bulk_essay_analysis/VERSION index c7ed6994..246bcbeb 100644 --- a/modules/wo_bulk_essay_analysis/VERSION +++ b/modules/wo_bulk_essay_analysis/VERSION @@ -1 +1 @@ -0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py b/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py index 7562f9ef..5cc273c4 100644 --- a/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py +++ b/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py @@ -1,7 +1,6 @@ import learning_observer.communication_protocol.integration import learning_observer.cache -import learning_observer.prestartup -import learning_observer.settings +import learning_observer.rate_limiting import lo_gpt.gpt @@ -10,7 +9,7 @@ @learning_observer.communication_protocol.integration.publish_function('wo_bulk_essay_analysis.gpt_essay_prompt') -async def process_student_essay(text, prompt, system_prompt, tags): +async def process_student_essay(text, prompt, system_prompt, tags, runtime): ''' This method processes text with a prompt through GPT. @@ -19,6 +18,7 @@ async def process_student_essay(text, prompt, system_prompt, tags): copy_tags = tags.copy() @learning_observer.cache.async_memoization() + @learning_observer.rate_limiting.rate_limited('LLM') async def gpt(gpt_prompt): completion = await lo_gpt.gpt.gpt_responder.chat_completion(gpt_prompt, system_prompt) return completion @@ -41,7 +41,7 @@ async def gpt(gpt_prompt): output = { 'text': text, - 'feedback': await gpt(formatted_prompt), + 'feedback': await gpt(formatted_prompt, runtime=runtime), 'prompt': prompt } return output diff --git a/modules/writing_observer/VERSION b/modules/writing_observer/VERSION index c309e668..246bcbeb 100644 --- a/modules/writing_observer/VERSION +++ b/modules/writing_observer/VERSION @@ -1 +1 @@ -0.1.0+2025.04.07T20.48.55.419Z.3bdcc7c9.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/modules/writing_observer/writing_observer/module.py b/modules/writing_observer/writing_observer/module.py index dad2f01a..aa26ceeb 100644 --- a/modules/writing_observer/writing_observer/module.py +++ b/modules/writing_observer/writing_observer/module.py @@ -151,7 +151,12 @@ gpt_bulk_essay, values=q.variable('docs'), value_path='text', - func_kwargs={'prompt': q.parameter('gpt_prompt'), 'system_prompt': q.parameter('system_prompt'), 'tags': q.parameter('tags', required=False, default={})}, + func_kwargs={ + 'prompt': q.parameter('gpt_prompt'), + 'system_prompt': q.parameter('system_prompt'), + 'tags': q.parameter('tags', required=False, default={}), + 'runtime': q.parameter('runtime') + }, parallel=True ), 'gpt_bulk': q.join(LEFT=q.variable('gpt_map'), LEFT_ON='provenance.provenance.provenance.STUDENT.value.user_id', RIGHT=q.variable('roster'), RIGHT_ON='user_id'), From cc3763ada6c75c0ed25fba9f9cb5a376c0c27e6a Mon Sep 17 00:00:00 2001 From: Bradley Erickson Date: Wed, 30 Apr 2025 09:42:23 -0400 Subject: [PATCH 2/2] some pr feedback, still need to address more --- VERSION | 2 +- learning_observer/VERSION | 2 +- .../learning_observer/rate_limiting.py | 37 +++++++++++-------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/VERSION b/VERSION index 246bcbeb..fe2e3abc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting +0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting diff --git a/learning_observer/VERSION b/learning_observer/VERSION index 246bcbeb..fe2e3abc 100644 --- a/learning_observer/VERSION +++ b/learning_observer/VERSION @@ -1 +1 @@ -0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting +0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting diff --git a/learning_observer/learning_observer/rate_limiting.py b/learning_observer/learning_observer/rate_limiting.py index a65d1bec..c9e1259b 100644 --- a/learning_observer/learning_observer/rate_limiting.py +++ b/learning_observer/learning_observer/rate_limiting.py @@ -15,32 +15,37 @@ def create_rate_limiter(service_name): - '''Factory function for rate limiters with closure over service name''' - async def check_rate_limit(user_id): - '''Reusable rate limiter with service-specific settings''' + '''Creates a rate limiter for a specific service. + ''' + async def check_rate_limit(id): + '''Check if id has hit the rate limit. + + Uses sliding window to track recent requests. Returns + whether the current request should be allowed based on + the configured limits. + ''' # TODO fetch from pmss/define appropriate window - max_requests = 2 + max_requests_per_window = 2 window_seconds = 60 async with RL_LOCK: now = time.time() - # Initialize user/service tracking - key = f'rate_limit:{service_name}:{user_id}' - if key not in RATE_LIMITERS: - RATE_LIMITERS[key] = collections.deque() + # Initialize id/service tracking + limiter_key = f'rate_limit:{service_name}:{id}' + if limiter_key not in RATE_LIMITERS: + RATE_LIMITERS[limiter_key] = collections.deque() # Expire old requests - timestamps = RATE_LIMITERS[key] - while timestamps and (now - timestamps[0]) > window_seconds: - timestamps.popleft() + prior_requests_timestamps = RATE_LIMITERS[limiter_key] + while prior_requests_timestamps and (now - prior_requests_timestamps[0]) > window_seconds: + prior_requests_timestamps.popleft() - if len(timestamps) >= max_requests: + if len(prior_requests_timestamps) >= max_requests_per_window: return False - timestamps.append(now) + prior_requests_timestamps.append(now) return True - return check_rate_limit @@ -55,13 +60,13 @@ async def wrapper(*args, **kwargs): runtime = kwargs['runtime'] - check_limit = create_rate_limiter(service_name) + check_rate_limit = create_rate_limiter(service_name) # Check rate limits before execution request = runtime.get_request() user = await learning_observer.auth.get_active_user(request) user_id = user[learning_observer.constants.USER_ID] - if not await check_limit(user_id): + if not await check_rate_limit(user_id): raise PermissionError(f'Rate limit exceeded for {service_name} service') function_signature = inspect.signature(func)