Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics
0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting
2 changes: 1 addition & 1 deletion learning_observer/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.04.10T17.37.57.685Z.95ba1c24.berickson.202504.process.metrics
0.1.0+2025.04.30T13.42.23.840Z.72e9d99b.berickson.202504.llm.limiting
3 changes: 2 additions & 1 deletion learning_observer/learning_observer/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)

# process item if the cache is present
key = create_key_from_args(func, args, kwargs)
kwargs_no_runtime = {k: v for k, v in kwargs.items() if k != 'runtime'}
key = create_key_from_args(func, args, kwargs_no_runtime)
if key in await cache_backend.keys():
return await cache_backend[key]
result = await func(*args, **kwargs)
Expand Down
78 changes: 78 additions & 0 deletions learning_observer/learning_observer/rate_limiting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
'''
This file defines a rate limit decorator function.
'''
import asyncio
import collections
import functools
import inspect
import time

import learning_observer.auth
import learning_observer.constants

RATE_LIMITERS = {}
RL_LOCK = asyncio.Lock()


def create_rate_limiter(service_name):
'''Creates a rate limiter for a specific service.
'''
async def check_rate_limit(id):
'''Check if id has hit the rate limit.

Uses sliding window to track recent requests. Returns
whether the current request should be allowed based on
the configured limits.
'''
# TODO fetch from pmss/define appropriate window
max_requests_per_window = 2
window_seconds = 60

async with RL_LOCK:
now = time.time()

# Initialize id/service tracking
limiter_key = f'rate_limit:{service_name}:{id}'
if limiter_key not in RATE_LIMITERS:
RATE_LIMITERS[limiter_key] = collections.deque()

# Expire old requests
prior_requests_timestamps = RATE_LIMITERS[limiter_key]
while prior_requests_timestamps and (now - prior_requests_timestamps[0]) > window_seconds:
prior_requests_timestamps.popleft()

if len(prior_requests_timestamps) >= max_requests_per_window:
return False

prior_requests_timestamps.append(now)
return True
return check_rate_limit


def rate_limited(service_name):
'''Decorator for async functions needing rate limiting'''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should clearly document what kinds of functions this can wrap, and the protocol. Things like this:

            if 'runtime' not in kwargs:
                raise TypeError(f'`{func.__name__}` requires `runtime` keyword argument for checking rate limits.')

Should be in the docstring. Basically, I want to know how I use this.

def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
# Get the appropriate rate limiter
if 'runtime' not in kwargs:
raise TypeError(f'`{func.__name__}` requires `runtime` keyword argument for checking rate limits.')

runtime = kwargs['runtime']

check_rate_limit = create_rate_limiter(service_name)

# Check rate limits before execution
request = runtime.get_request()
user = await learning_observer.auth.get_active_user(request)
user_id = user[learning_observer.constants.USER_ID]
if not await check_rate_limit(user_id):
raise PermissionError(f'Rate limit exceeded for {service_name} service')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually a bit confused when I'd want this behavior. It'd be helpful to know when this is used.

Seems like what I want:

  1. Queue up requests
  2. If they go obsolete (e.g. user navigates away), drop them
  3. If they don't, let the user know we're throttling and throttle them

Simply failing seems like it might be annoying to the user.


function_signature = inspect.signature(func)
if 'runtime' not in function_signature.parameters:
kwargs = {k: v for k, v in kwargs.items() if k != 'runtime'}

return await func(*args, **kwargs)
return wrapper
return decorator
2 changes: 1 addition & 1 deletion modules/wo_bulk_essay_analysis/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.04.16T16.05.04.940Z.74ace276.berickson.202504.process.metrics
0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting
8 changes: 4 additions & 4 deletions modules/wo_bulk_essay_analysis/wo_bulk_essay_analysis/gpt.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import learning_observer.communication_protocol.integration
import learning_observer.cache
import learning_observer.prestartup
import learning_observer.settings
import learning_observer.rate_limiting

import lo_gpt.gpt

Expand All @@ -10,7 +9,7 @@


@learning_observer.communication_protocol.integration.publish_function('wo_bulk_essay_analysis.gpt_essay_prompt')
async def process_student_essay(text, prompt, system_prompt, tags):
async def process_student_essay(text, prompt, system_prompt, tags, runtime):
'''
This method processes text with a prompt through GPT.

Expand All @@ -19,6 +18,7 @@ async def process_student_essay(text, prompt, system_prompt, tags):
copy_tags = tags.copy()

@learning_observer.cache.async_memoization()
@learning_observer.rate_limiting.rate_limited('LLM')
async def gpt(gpt_prompt):
completion = await lo_gpt.gpt.gpt_responder.chat_completion(gpt_prompt, system_prompt)
return completion
Expand All @@ -41,7 +41,7 @@ async def gpt(gpt_prompt):

output = {
'text': text,
'feedback': await gpt(formatted_prompt),
'feedback': await gpt(formatted_prompt, runtime=runtime),
'prompt': prompt
}
return output
2 changes: 1 addition & 1 deletion modules/writing_observer/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.0+2025.04.07T20.48.55.419Z.3bdcc7c9.berickson.202504.process.metrics
0.1.0+2025.04.18T12.16.24.803Z.1e9da34c.berickson.202504.llm.limiting
7 changes: 6 additions & 1 deletion modules/writing_observer/writing_observer/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@
gpt_bulk_essay,
values=q.variable('docs'),
value_path='text',
func_kwargs={'prompt': q.parameter('gpt_prompt'), 'system_prompt': q.parameter('system_prompt'), 'tags': q.parameter('tags', required=False, default={})},
func_kwargs={
'prompt': q.parameter('gpt_prompt'),
'system_prompt': q.parameter('system_prompt'),
'tags': q.parameter('tags', required=False, default={}),
'runtime': q.parameter('runtime')
},
parallel=True
),
'gpt_bulk': q.join(LEFT=q.variable('gpt_map'), LEFT_ON='provenance.provenance.provenance.STUDENT.value.user_id', RIGHT=q.variable('roster'), RIGHT_ON='user_id'),
Expand Down
Loading