Open
Conversation
Contributor
Author
#!/usr/bin/env python3
"""Test script for OxyGent rate limiter functionality."""
import asyncio
import logging
import time
from oxygent.rate_limiter import TokenLimiter, RateLimitManager, get_rate_limit_manager
from oxygent.config import Config
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def test_token_limiter():
"""Test basic token limiter functionality."""
print("=== Testing TokenLimiter ===")
# Create a token limiter with 2 tokens per second, capacity of 5
limiter = TokenLimiter(rate=2.0, capacity=5)
# Test 1: Initial token acquisition
print("Test 1: Initial token acquisition")
for i in range(5):
result = limiter.acquire_sync(1)
print(f" Attempt {i+1}: {'Success' if result else 'Failed'}")
# Test 2: Should fail - no tokens left
print("Test 2: Should fail - no tokens left")
result = limiter.acquire_sync(1)
print(f" Result: {'Success' if result else 'Failed (expected)'}")
# Test 3: Wait for token refill
print("Test 3: Wait for token refill")
await asyncio.sleep(1) # Wait for 2 tokens to refill
result = limiter.acquire_sync(1)
print(f" After 1s wait: {'Success' if result else 'Failed'}")
# Test 4: Async token acquisition
print("Test 4: Async token acquisition")
async_result = await limiter.acquire_async(1)
print(f" Async result: {'Success' if async_result else 'Failed'}")
async def test_rate_limit_manager():
"""Test rate limit manager functionality."""
print("\n=== Testing RateLimitManager ===")
manager = RateLimitManager()
# Test 1: Basic functionality when disabled
print("Test 1: Basic functionality when disabled")
manager.disable()
result = manager.check_rate_limit("test_oxy", 1)
print(f" When disabled: {'Success' if result else 'Failed (unexpected)'}")
# Test 2: Enable and create limiters
print("Test 2: Enable and create limiters")
manager.enable()
manager.create_limiter("test_oxy", rate=1.0, capacity=3)
manager.create_limiter("test_agent", rate=2.0, capacity=5)
# Test 3: Check rate limits
print("Test 3: Check rate limits")
for i in range(4):
result = manager.check_rate_limit("test_oxy", 1)
print(f" test_oxy attempt {i+1}: {'Success' if result else 'Failed (expected for last)'}")
# Test 4: Async check
print("Test 4: Async check")
async_result = await manager.check_rate_limit_async("test_agent", 1)
print(f" test_agent async: {'Success' if async_result else 'Failed'}")
async def test_integration():
"""Test integration with configuration."""
print("\n=== Testing Integration with Config ===")
# Enable rate limiter in config
Config.set_rate_limiter_enabled(True)
Config.set_rate_limiter_default_rate(1.0)
Config.set_rate_limiter_default_capacity(2)
# Get global manager
manager = get_rate_limit_manager()
manager.enable()
# Create limiters
manager.create_limiter("integration_test", rate=1.0, capacity=2)
# Test with configuration
print("Test 1: Configuration-based limits")
for i in range(3):
result = manager.check_rate_limit("integration_test", 1)
print(f" Attempt {i+1}: {'Success' if result else 'Failed (expected for last)'}")
def test_sync_functionality():
"""Test synchronous functionality."""
print("\n=== Testing Synchronous Functionality ===")
limiter = TokenLimiter(rate=1.0, capacity=2)
# Test sync methods
print("Test 1: Sync acquire")
result1 = limiter.acquire_sync(1)
result2 = limiter.acquire_sync(1)
result3 = limiter.acquire_sync(1)
print(f" First acquire: {'Success' if result1 else 'Failed'}")
print(f" Second acquire: {'Success' if result2 else 'Failed'}")
print(f" Third acquire: {'Success' if result3 else 'Failed (expected)'}")
# Test utility methods
print("Test 2: Utility methods")
available = limiter.get_available_tokens()
wait_time = limiter.get_wait_time(1)
print(f" Available tokens: {available}")
print(f" Wait time for 1 token: {wait_time:.2f}s")
async def main():
"""Run all tests."""
print("Starting OxyGent Rate Limiter Tests")
print("=" * 50)
try:
# Run all tests
await test_token_limiter()
await test_rate_limit_manager()
await test_integration()
test_sync_functionality()
print("\n" + "=" * 50)
print("All tests completed successfully!")
except Exception as e:
print(f"\nTest failed with error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(main())result (.venv) zhuyanhua.18@ZBMac-K4FR9HWJNM OxyGent % python ./test_rate_limiter.py
Starting OxyGent Rate Limiter Tests
==================================================
=== Testing TokenLimiter ===
Test 1: Initial token acquisition
Attempt 1: Success
Attempt 2: Success
Attempt 3: Success
Attempt 4: Success
Attempt 5: Success
Test 2: Should fail - no tokens left
Result: Failed (expected)
Test 3: Wait for token refill
After 1s wait: Success
Test 4: Async token acquisition
Async result: Success
=== Testing RateLimitManager ===
Test 1: Basic functionality when disabled
INFO:oxygent.rate_limiter:Rate limiting disabled
When disabled: Success
Test 2: Enable and create limiters
INFO:oxygent.rate_limiter:Rate limiting enabled
INFO:oxygent.rate_limiter:Created token limiter 'test_oxy' with rate=1.0, capacity=3
INFO:oxygent.rate_limiter:Created token limiter 'test_agent' with rate=2.0, capacity=5
Test 3: Check rate limits
test_oxy attempt 1: Success
test_oxy attempt 2: Success
test_oxy attempt 3: Success
test_oxy attempt 4: Failed (expected for last)
Test 4: Async check
test_agent async: Success
=== Testing Integration with Config ===
INFO:oxygent.rate_limiter:Rate limiting enabled
INFO:oxygent.rate_limiter:Created token limiter 'integration_test' with rate=1.0, capacity=2
Test 1: Configuration-based limits
Attempt 1: Success
Attempt 2: Success
Attempt 3: Failed (expected for last)
=== Testing Synchronous Functionality ===
Test 1: Sync acquire
First acquire: Success
Second acquire: Success
Third acquire: Failed (expected)
Test 2: Utility methods
Available tokens: 2.09808349609375e-05
Wait time for 1 token: 1.00s
==================================================
All tests completed successfully! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
add rate_limit with token bucket
easily set up with register
Changes
add class rate_limit
fix config.py to handle rate_limit in oxy register list