Skip to content

feat: add dynamic workers num and lock #84

Open
zhuyanhuazhuyanhua wants to merge 4 commits intojd-opensource:mainfrom
zhuyanhuazhuyanhua:main
Open

feat: add dynamic workers num and lock #84
zhuyanhuazhuyanhua wants to merge 4 commits intojd-opensource:mainfrom
zhuyanhuazhuyanhua:main

Conversation

@zhuyanhuazhuyanhua
Copy link
Contributor

@zhuyanhuazhuyanhua zhuyanhuazhuyanhua commented Dec 8, 2025

Summary

add dynamic workers num and lock

Changes

  • [ add dynamic workers with CPU num] Feature A added
  • [add lock]Feature B added
  • [add log]Feature C added
  • [clean up smoothly]Feature D added

@zhuyanhuazhuyanhua
Copy link
Contributor Author

test:

#!/usr/bin/env python3
"""Test script to verify FunctionHub improved cleanup functionality."""

import asyncio
import concurrent.futures
import time
import logging

# Configure logging to see cleanup messages
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

from oxygent.oxy.function_tools.function_hub import FunctionHub


def blocking_task(task_id, duration=1):
    """模拟阻塞任务"""
    print(f"  ⏳ Task {task_id} starting (duration: {duration}s)")
    time.sleep(duration)
    print(f"  ✅ Task {task_id} completed")
    return f"Task {task_id} result"


async def test_improved_cleanup():
    """测试改进后的清理功能"""
    print("=== Testing Improved FunctionHub Cleanup ===")
    
    print("1. Creating FunctionHub...")
    hub = FunctionHub(name="TestHub", desc="Test FunctionHub")
    
    print("2. Registering test function...")
    @hub.tool("Test function for cleanup")
    def test_func(task_id, duration=0.5):
        return blocking_task(task_id, duration)
    
    print("3. Using function to initialize thread pool...")
    result = await test_func(1)
    print(f"   Result: {result}")
    
    print("4. Checking thread pool status...")
    pool_info = hub.get_thread_pool_info()
    print(f"   Thread pool info: {pool_info}")
    print(f"   Is thread pool active: {hub.is_thread_pool_active()}")
    
    print("5. Performing cleanup...")
    await hub.cleanup()
    
    print("6. Checking status after cleanup...")
    pool_info = hub.get_thread_pool_info()
    print(f"   Thread pool info: {pool_info}")
    print(f"   Is thread pool active: {hub.is_thread_pool_active()}")
    
    if not hub.is_thread_pool_active():
        print("   ✅ Cleanup successful - thread pool properly shutdown")
    else:
        print("   ❌ Cleanup failed - thread pool still active")


async def test_context_manager():
    """测试上下文管理器"""
    print("\n=== Testing Context Manager ===")
    
    print("1. Using FunctionHub as async context manager...")
    async with FunctionHub(name="ContextHub", desc="Context manager test") as hub:
        print("2. Inside context - registering function...")
        @hub.tool("Context test function")
        def context_func(task_id):
            return blocking_task(task_id, 0.3)
        
        print("3. Using function...")
        result = await context_func(1)
        print(f"   Result: {result}")
        
        print("4. Checking thread pool status in context...")
        print(f"   Is active: {hub.is_thread_pool_active()}")
    
    print("5. Outside context - should be cleaned up...")
    print(f"   Is active: {hub.is_thread_pool_active()}")
    
    if not hub.is_thread_pool_active():
        print("   ✅ Context manager cleanup successful")
    else:
        print("   ❌ Context manager cleanup failed")


async def test_multiple_cleanup_calls():
    """测试多次清理调用"""
    print("\n=== Testing Multiple Cleanup Calls ===")
    
    hub = FunctionHub(name="MultiCleanupHub", desc="Multiple cleanup test")
    
    @hub.tool("Test function")
    def test_func(task_id):
        return blocking_task(task_id, 0.1)
    
    # Use function to initialize thread pool
    await test_func(1)
    
    print("1. First cleanup...")
    await hub.cleanup()
    
    print("2. Second cleanup (should be safe)...")
    await hub.cleanup()
    
    print("3. Third cleanup (should still be safe)...")
    await hub.cleanup()
    
    print("4. Final status check...")
    print(f"   Is active: {hub.is_thread_pool_active()}")
    
    if not hub.is_thread_pool_active():
        print("   ✅ Multiple cleanup calls handled correctly")
    else:
        print("   ❌ Multiple cleanup calls caused issues")


async def test_cleanup_with_concurrent_tasks():
    """测试有并发任务时的清理"""
    print("\n=== Testing Cleanup with Concurrent Tasks ===")
    
    hub = FunctionHub(name="ConcurrentHub", desc="Concurrent tasks test")
    
    @hub.tool("Concurrent function")
    def concurrent_func(task_id, duration):
        return blocking_task(task_id, duration)
    
    print("1. Starting multiple concurrent tasks...")
    tasks = [
        concurrent_func(1, 0.5),
        concurrent_func(2, 0.3),
        concurrent_func(3, 0.7),
    ]
    
    print("2. Executing tasks...")
    results = await asyncio.gather(*tasks)
    print(f"   Results: {results}")
    
    print("3. Performing cleanup after tasks complete...")
    await hub.cleanup()
    
    print("4. Checking status after cleanup...")
    print(f"   Is active: {hub.is_thread_pool_active()}")
    
    if not hub.is_thread_pool_active():
        print("   ✅ Cleanup successful after concurrent tasks")
    else:
        print("   ❌ Cleanup failed after concurrent tasks")


async def test_exception_handling():
    """测试异常处理"""
    print("\n=== Testing Exception Handling ===")
    
    hub = FunctionHub(name="ExceptionHub", desc="Exception handling test")
    
    @hub.tool("Test function")
    def test_func(task_id):
        return blocking_task(task_id, 0.2)
    
    # Use function to initialize thread pool
    await test_func(1)
    
    print("1. Performing cleanup...")
    try:
        await hub.cleanup()
        print("   ✅ Cleanup completed without exceptions")
    except Exception as e:
        print(f"   ❌ Unexpected exception during cleanup: {e}")
    
    print("2. Checking final status...")
    print(f"   Is active: {hub.is_thread_pool_active()}")
    
    if not hub.is_thread_pool_active():
        print("   ✅ Exception handling works correctly")
    else:
        print("   ❌ Exception handling failed")


async def main():
    """运行所有测试"""
    print("🚀 Starting FunctionHub cleanup tests...\n")
    
    try:
        await test_improved_cleanup()
        await test_context_manager()
        await test_multiple_cleanup_calls()
        await test_cleanup_with_concurrent_tasks()
        await test_exception_handling()
        
        print("\n🎉 All tests completed successfully!")
        
    except Exception as e:
        print(f"\n❌ Test failed with error: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    asyncio.run(main())

@zhuyanhuazhuyanhua
Copy link
Contributor Author

.venv) zhuyanhua.18@ZBMac-K4FR9HWJNM OxyGent % python test_function_hub_cleanup.py                 
🚀 Starting FunctionHub cleanup tests...

=== Testing Improved FunctionHub Cleanup ===
1. Creating FunctionHub...
2. Registering test function...
3. Using function to initialize thread pool...
  ⏳ Task 1 starting (duration: 0.5s)
  ✅ Task 1 completed
   Result: Task 1 result
4. Checking thread pool status...
   Thread pool info: {'initialized': True, 'workers': 32, 'shutdown': False}
   Is thread pool active: True
5. Performing cleanup...
2025-12-08 20:28:55,210 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub TestHub: Starting thread pool cleanup...
2025-12-08 20:28:55,211 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub TestHub: Thread pool shutdown completed
6. Checking status after cleanup...
   Thread pool info: None
   Is thread pool active: False
   ✅ Cleanup successful - thread pool properly shutdown

=== Testing Context Manager ===
1. Using FunctionHub as async context manager...
2. Inside context - registering function...
3. Using function...
  ⏳ Task 1 starting (duration: 0.3s)
  ✅ Task 1 completed
   Result: Task 1 result
4. Checking thread pool status in context...
   Is active: True
2025-12-08 20:28:55,515 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ContextHub: Starting thread pool cleanup...
2025-12-08 20:28:55,515 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ContextHub: Thread pool shutdown completed
5. Outside context - should be cleaned up...
   Is active: False
   ✅ Context manager cleanup successful

=== Testing Multiple Cleanup Calls ===
  ⏳ Task 1 starting (duration: 0.1s)
  ✅ Task 1 completed
1. First cleanup...
2025-12-08 20:28:55,620 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub MultiCleanupHub: Starting thread pool cleanup...
2025-12-08 20:28:55,620 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub MultiCleanupHub: Thread pool shutdown completed
2. Second cleanup (should be safe)...
3. Third cleanup (should still be safe)...
4. Final status check...
   Is active: False
   ✅ Multiple cleanup calls handled correctly

=== Testing Cleanup with Concurrent Tasks ===
1. Starting multiple concurrent tasks...
2. Executing tasks...
  ⏳ Task 1 starting (duration: 0.5s)
  ⏳ Task 2 starting (duration: 0.3s)
  ⏳ Task 3 starting (duration: 0.7s)
  ✅ Task 2 completed
  ✅ Task 1 completed
  ✅ Task 3 completed
   Results: ['Task 1 result', 'Task 2 result', 'Task 3 result']
3. Performing cleanup after tasks complete...
2025-12-08 20:28:56,328 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ConcurrentHub: Starting thread pool cleanup...
2025-12-08 20:28:56,329 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ConcurrentHub: Thread pool shutdown completed
4. Checking status after cleanup...
   Is active: False
   ✅ Cleanup successful after concurrent tasks

=== Testing Exception Handling ===
  ⏳ Task 1 starting (duration: 0.2s)
  ✅ Task 1 completed
1. Performing cleanup...
2025-12-08 20:28:56,531 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ExceptionHub: Starting thread pool cleanup...
2025-12-08 20:28:56,531 - oxygent.oxy.function_tools.function_hub - INFO - FunctionHub ExceptionHub: Thread pool shutdown completed
   ✅ Cleanup completed without exceptions
2. Checking final status...
   Is active: False
   ✅ Exception handling works correctly

🎉 All tests completed successfully!

@zhuyanhuazhuyanhua
Copy link
Contributor Author

unitest result:

(.venv) zhuyanhua.18@ZBMac-K4FR9HWJNM Drop Box % pytest OxyGent/test/unittest/test_function_hub.py
=============================================================== test session starts ================================================================
platform darwin -- Python 3.10.19, pytest-9.0.1, pluggy-1.6.0
rootdir: /Users/zhuyanhua.18/Public/Drop Box/OxyGent
configfile: pytest.ini
plugins: anyio-4.11.0, asyncio-1.3.0
asyncio: mode=strict, debug=False, asyncio_default_fixture_loop_scope=None, asyncio_default_test_loop_scope=function
collected 10 items                                                                                                                                 

OxyGent/test/unittest/test_function_hub.py ..........                                                                                        [100%]

================================================================ 10 passed in 1.25s ================================================================

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant