diff --git a/VERSION b/VERSION index c7ed6994..fe2e3abc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics +0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting diff --git a/learning_observer/VERSION b/learning_observer/VERSION index 995fecb1..fe2e3abc 100644 --- a/learning_observer/VERSION +++ b/learning_observer/VERSION @@ -1 +1 @@ -0.1.0+2025.04.10T17.37.57.685Z.95ba1c24.berickson.202504.process.metrics +0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting diff --git a/learning_observer/learning_observer/cache.py b/learning_observer/learning_observer/cache.py index ab11259c..96018500 100644 --- a/learning_observer/learning_observer/cache.py +++ b/learning_observer/learning_observer/cache.py @@ -37,7 +37,8 @@ async def wrapper(*args, **kwargs): return await func(*args, **kwargs) # process item if the cache is present - key = create_key_from_args(func, args, kwargs) + kwargs_no_runtime = {k: v for k, v in kwargs.items() if k != 'runtime'} + key = create_key_from_args(func, args, kwargs_no_runtime) if key in await cache_backend.keys(): return await cache_backend[key] result = await func(*args, **kwargs) diff --git a/learning_observer/learning_observer/rate_limiting.py b/learning_observer/learning_observer/rate_limiting.py new file mode 100644 index 00000000..c9e1259b --- /dev/null +++ b/learning_observer/learning_observer/rate_limiting.py @@ -0,0 +1,78 @@ +''' +This file defines a rate limit decorator function. +''' +import asyncio +import collections +import functools +import inspect +import time + +import learning_observer.auth +import learning_observer.constants + +RATE_LIMITERS = {} +RL_LOCK = asyncio.Lock() + + +def create_rate_limiter(service_name): + '''Creates a rate limiter for a specific service. + ''' + async def check_rate_limit(id): + '''Check if id has hit the rate limit. + + Uses sliding window to track recent requests. Returns + whether the current request should be allowed based on + the configured limits. + ''' + # TODO fetch from pmss/define appropriate window + max_requests_per_window = 2 + window_seconds = 60 + + async with RL_LOCK: + now = time.time() + + # Initialize id/service tracking + limiter_key = f'rate_limit:{service_name}:{id}' + if limiter_key not in RATE_LIMITERS: + RATE_LIMITERS[limiter_key] = collections.deque() + + # Expire old requests + prior_requests_timestamps = RATE_LIMITERS[limiter_key] + while prior_requests_timestamps and (now - prior_requests_timestamps[0]) > window_seconds: + prior_requests_timestamps.popleft() + + if len(prior_requests_timestamps) >= max_requests_per_window: + return False + + prior_requests_timestamps.append(now) + return True + return check_rate_limit + + +def rate_limited(service_name): + '''Decorator for async functions needing rate limiting''' + def decorator(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + # Get the appropriate rate limiter + if 'runtime' not in kwargs: + raise TypeError(f'`{func.__name__}` requires `runtime` keyword argument for checking rate limits.') + + runtime = kwargs['runtime'] + + check_rate_limit = create_rate_limiter(service_name) + + # Check rate limits before execution + request = runtime.get_request() + user = await learning_observer.auth.get_active_user(request) + user_id = user[learning_observer.constants.USER_ID] + if not await check_rate_limit(user_id): + raise PermissionError(f'Rate limit exceeded for {service_name} service') + + function_signature = inspect.signature(func) + if 'runtime' not in function_signature.parameters: + kwargs = {k: v for k, v in kwargs.items() if k != 'runtime'} + + return await func(*args, **kwargs) + return wrapper + return decorator diff --git a/modules/wo_bulk_essay_analysis/VERSION b/modules/wo_bulk_essay_analysis/VERSION index c7ed6994..246bcbeb 100644 --- a/modules/wo_bulk_essay_analysis/VERSION +++ b/modules/wo_bulk_essay_analysis/VERSION @@ -1 +1 @@ -0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py b/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py index 7562f9ef..5cc273c4 100644 --- a/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py +++ b/modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py @@ -1,7 +1,6 @@ import learning_observer.communication_protocol.integration import learning_observer.cache -import learning_observer.prestartup -import learning_observer.settings +import learning_observer.rate_limiting import lo_gpt.gpt @@ -10,7 +9,7 @@ @learning_observer.communication_protocol.integration.publish_function('wo_bulk_essay_analysis.gpt_essay_prompt') -async def process_student_essay(text, prompt, system_prompt, tags): +async def process_student_essay(text, prompt, system_prompt, tags, runtime): ''' This method processes text with a prompt through GPT. @@ -19,6 +18,7 @@ async def process_student_essay(text, prompt, system_prompt, tags): copy_tags = tags.copy() @learning_observer.cache.async_memoization() + @learning_observer.rate_limiting.rate_limited('LLM') async def gpt(gpt_prompt): completion = await lo_gpt.gpt.gpt_responder.chat_completion(gpt_prompt, system_prompt) return completion @@ -41,7 +41,7 @@ async def gpt(gpt_prompt): output = { 'text': text, - 'feedback': await gpt(formatted_prompt), + 'feedback': await gpt(formatted_prompt, runtime=runtime), 'prompt': prompt } return output diff --git a/modules/writing_observer/VERSION b/modules/writing_observer/VERSION index c309e668..246bcbeb 100644 --- a/modules/writing_observer/VERSION +++ b/modules/writing_observer/VERSION @@ -1 +1 @@ -0.1.0+2025.04.07T20.48.55.419Z.3bdcc7c9.berickson.202504.process.metrics +0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting diff --git a/modules/writing_observer/writing_observer/module.py b/modules/writing_observer/writing_observer/module.py index dad2f01a..aa26ceeb 100644 --- a/modules/writing_observer/writing_observer/module.py +++ b/modules/writing_observer/writing_observer/module.py @@ -151,7 +151,12 @@ gpt_bulk_essay, values=q.variable('docs'), value_path='text', - func_kwargs={'prompt': q.parameter('gpt_prompt'), 'system_prompt': q.parameter('system_prompt'), 'tags': q.parameter('tags', required=False, default={})}, + func_kwargs={ + 'prompt': q.parameter('gpt_prompt'), + 'system_prompt': q.parameter('system_prompt'), + 'tags': q.parameter('tags', required=False, default={}), + 'runtime': q.parameter('runtime') + }, parallel=True ), 'gpt_bulk': q.join(LEFT=q.variable('gpt_map'), LEFT_ON='provenance.provenance.provenance.STUDENT.value.user_id', RIGHT=q.variable('roster'), RIGHT_ON='user_id'),