diff --git a/models/__init__.py b/models/__init__.py index b257501..d754679 100644 --- a/models/__init__.py +++ b/models/__init__.py @@ -215,7 +215,6 @@ CreateExecutorResponse, StopExecutorRequest, StopExecutorResponse, - DeleteExecutorResponse, ExecutorFilterRequest, ExecutorResponse, ExecutorDetailResponse, @@ -389,7 +388,6 @@ "CreateExecutorResponse", "StopExecutorRequest", "StopExecutorResponse", - "DeleteExecutorResponse", "ExecutorFilterRequest", "ExecutorResponse", "ExecutorDetailResponse", diff --git a/models/executors.py b/models/executors.py index 0822b39..a026cee 100644 --- a/models/executors.py +++ b/models/executors.py @@ -278,10 +278,6 @@ class ExecutorFilterRequest(PaginationParams): None, description="Filter by status (RUNNING, TERMINATED, etc.)" ) - include_completed: bool = Field( - default=False, - description="Include recently completed executors" - ) # ======================================== @@ -365,31 +361,25 @@ class StopExecutorResponse(BaseModel): class ExecutorsSummaryResponse(BaseModel): - """Summary of all executors.""" + """Summary of active executors.""" model_config = ConfigDict( json_schema_extra={ "example": { "total_active": 5, - "total_completed": 23, "total_pnl_quote": 1234.56, "total_volume_quote": 50000.00, "by_type": {"position_executor": 3, "grid_executor": 2}, "by_connector": {"binance_perpetual": 4, "binance": 1}, - "by_status": {"RUNNING": 5, "TERMINATED": 23} + "by_status": {"RUNNING": 5} } } ) total_active: int = Field(description="Number of active executors") - total_completed: int = Field(description="Number of completed executors") - total_pnl_quote: float = Field(description="Total PnL across all executors") - total_volume_quote: float = Field(description="Total volume across all executors") + total_pnl_quote: float = Field(description="Total PnL across active executors") + total_volume_quote: float = Field(description="Total volume across active executors") by_type: Dict[str, int] = Field(description="Executor count by type") by_connector: Dict[str, int] = Field(description="Executor count by connector") by_status: Dict[str, int] = Field(description="Executor count by status") -class DeleteExecutorResponse(BaseModel): - """Response after deleting an executor from tracking.""" - message: str = Field(description="Success message") - executor_id: str = Field(description="Executor identifier that was removed") diff --git a/routers/executors.py b/routers/executors.py index 4c20997..b606437 100644 --- a/routers/executors.py +++ b/routers/executors.py @@ -13,7 +13,6 @@ from models.executors import ( CreateExecutorRequest, CreateExecutorResponse, - DeleteExecutorResponse, ExecutorDetailResponse, ExecutorFilterRequest, ExecutorResponse, @@ -75,7 +74,9 @@ async def list_executors( executor_service: ExecutorService = Depends(get_executor_service) ): """ - Get list of active executors with optional filtering. + Get list of executors with optional filtering. + + Returns active executors from memory combined with completed executors from database. Filters: - `account_names`: Filter by specific accounts @@ -83,19 +84,17 @@ async def list_executors( - `trading_pairs`: Filter by trading pairs - `executor_types`: Filter by executor types - `status`: Filter by status (RUNNING, TERMINATED, etc.) - - `include_completed`: Include recently completed executors Returns paginated list of executor summaries. """ try: - # Get filtered executors - executors = executor_service.get_executors( + # Get filtered executors (active from memory + completed from DB) + executors = await executor_service.get_executors( account_name=filter_request.account_names[0] if filter_request.account_names else None, connector_name=filter_request.connector_names[0] if filter_request.connector_names else None, trading_pair=filter_request.trading_pairs[0] if filter_request.trading_pairs else None, executor_type=filter_request.executor_types[0] if filter_request.executor_types else None, - status=filter_request.status, - include_completed=filter_request.include_completed + status=filter_request.status ) # Apply additional multi-value filters @@ -167,13 +166,15 @@ async def get_executor( """ Get detailed information about a specific executor. + Checks active executors in memory first, then falls back to database for completed executors. + Returns full executor information including: - Current status and PnL - Full configuration - Executor-specific custom information """ try: - executor = executor_service.get_executor(executor_id) + executor = await executor_service.get_executor(executor_id) if not executor: raise HTTPException(status_code=404, detail=f"Executor {executor_id} not found") @@ -214,48 +215,6 @@ async def stop_executor( raise HTTPException(status_code=500, detail=f"Error stopping executor: {str(e)}") -@router.delete("/{executor_id}", response_model=DeleteExecutorResponse) -async def delete_executor( - executor_id: str, - executor_service: ExecutorService = Depends(get_executor_service) -): - """ - Remove an executor from tracking. - - The executor must be already stopped/completed. This removes it from - the active tracking list but preserves database records for historical queries. - - Returns success message if removed. - """ - try: - # Check if executor exists - executor = executor_service.get_executor(executor_id) - if not executor: - raise HTTPException(status_code=404, detail=f"Executor {executor_id} not found") - - # Check if still active - if executor.get("is_active", False): - raise HTTPException( - status_code=400, - detail=f"Cannot delete active executor. Stop it first using POST /executors/{executor_id}/stop" - ) - - # Remove from tracking - removed = executor_service.remove_completed_executor(executor_id) - if not removed: - raise HTTPException(status_code=404, detail=f"Executor {executor_id} not found in completed list") - - return DeleteExecutorResponse( - message=f"Executor {executor_id} removed from tracking", - executor_id=executor_id - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting executor {executor_id}: {e}", exc_info=True) - raise HTTPException(status_code=500, detail=f"Error deleting executor: {str(e)}") - - # ======================================== # Position Hold Endpoints # ======================================== diff --git a/services/executor_service.py b/services/executor_service.py index 2e8f6df..43f2adc 100644 --- a/services/executor_service.py +++ b/services/executor_service.py @@ -115,9 +115,6 @@ def __init__( # Executor metadata: executor_id -> metadata dict self._executor_metadata: Dict[str, Dict[str, Any]] = {} - # Completed executors (kept for a period for queries) - self._completed_executors: Dict[str, Dict[str, Any]] = {} - # Position holds: key = "account_name|connector_name|trading_pair" # Tracks aggregated positions from executors stopped with keep_position=True self._positions_held: Dict[str, PositionHold] = {} @@ -172,25 +169,42 @@ async def recover_positions_from_db(self): if executor_record.final_state: try: final_state = json.loads(executor_record.final_state) - # Extract buy/sell amounts from final state if available - if 'realized_buy_size_quote' in final_state: - buy_quote = Decimal(str(final_state.get('realized_buy_size_quote', 0))) - sell_quote = Decimal(str(final_state.get('realized_sell_size_quote', 0))) - # Estimate base amounts from quote (rough approximation) - # The actual fill data would be more accurate but this is a fallback - if buy_quote > 0 or sell_quote > 0: - position.buy_amount_quote += buy_quote - position.sell_amount_quote += sell_quote - if executor_record.executor_id not in position.executor_ids: - position.executor_ids.append(executor_record.executor_id) + + # Process held_position_orders (most accurate source) + held_orders = final_state.get("held_position_orders", []) + if held_orders: + buy_filled_base = Decimal("0") + buy_filled_quote = Decimal("0") + sell_filled_base = Decimal("0") + sell_filled_quote = Decimal("0") + + for order in held_orders: + if isinstance(order, dict): + trade_type = order.get("trade_type", "BUY") + exec_base = Decimal(str(order.get("executed_amount_base", 0))) + exec_quote = Decimal(str(order.get("executed_amount_quote", 0))) + + if trade_type == "BUY": + buy_filled_base += exec_base + buy_filled_quote += exec_quote + else: + sell_filled_base += exec_base + sell_filled_quote += exec_quote + + # Add fills using proper method + if buy_filled_base > 0: + position.add_fill("BUY", buy_filled_base, buy_filled_quote, executor_record.executor_id) + if sell_filled_base > 0: + position.add_fill("SELL", sell_filled_base, sell_filled_quote, executor_record.executor_id) + + logger.debug( + f"Recovered position from {executor_record.executor_id}: " + f"buy={buy_filled_base} base, sell={sell_filled_base} base" + ) + except (json.JSONDecodeError, TypeError) as e: logger.debug(f"Could not parse final_state for {executor_record.executor_id}: {e}") - # Use filled_amount_quote as fallback - elif executor_record.filled_amount_quote: - if executor_record.executor_id not in position.executor_ids: - position.executor_ids.append(executor_record.executor_id) - if self._positions_held: logger.info(f"Recovered {len(self._positions_held)} position holds from database") @@ -350,6 +364,14 @@ async def create_executor( # Persist to database await self._persist_executor_created(executor_id, executor) + # Capture created_at before potential cleanup + created_at = self._executor_metadata[executor_id]["created_at"].isoformat() + + # Check if executor terminated immediately (e.g., insufficient balance) + # If so, handle completion now rather than waiting for control loop + if executor.is_closed: + await self._handle_executor_completion(executor_id) + logger.info(f"Created {executor_type} executor {executor_id} for {connector_name}/{trading_pair}") return { @@ -358,35 +380,35 @@ async def create_executor( "connector_name": connector_name, "trading_pair": trading_pair, "status": executor.status.name, - "created_at": self._executor_metadata[executor_id]["created_at"].isoformat() + "created_at": created_at } - def get_executors( + async def get_executors( self, account_name: Optional[str] = None, connector_name: Optional[str] = None, trading_pair: Optional[str] = None, executor_type: Optional[str] = None, - status: Optional[str] = None, - include_completed: bool = False + status: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get list of executors with optional filtering. + Combines active executors from memory with completed executors from database. + Args: account_name: Filter by account name connector_name: Filter by connector name trading_pair: Filter by trading pair executor_type: Filter by executor type status: Filter by status - include_completed: Include recently completed executors Returns: List of executor information dictionaries """ result = [] - # Process active executors + # Process active executors from memory for executor_id, executor in self._active_executors.items(): metadata = self._executor_metadata.get(executor_id, {}) @@ -404,42 +426,59 @@ def get_executors( result.append(self._format_executor_info(executor_id, executor)) - # Include completed executors if requested - if include_completed: - for executor_id, completed_info in self._completed_executors.items(): - # Apply same filters to completed executors - if account_name and completed_info.get("account_name") != account_name: - continue - if connector_name and completed_info.get("connector_name") != connector_name: - continue - if trading_pair and completed_info.get("trading_pair") != trading_pair: - continue - if executor_type and completed_info.get("executor_type") != executor_type: - continue - - result.append(completed_info) + # Get completed executors from database + if self.db_manager: + try: + async with self.db_manager.get_session_context() as session: + from database.repositories.executor_repository import ExecutorRepository + repo = ExecutorRepository(session) + + db_executors = await repo.get_executors( + account_name=account_name, + connector_name=connector_name, + trading_pair=trading_pair, + executor_type=executor_type, + status=status + ) + + for record in db_executors: + # Skip if already in active executors (safety check) + if record.executor_id not in self._active_executors: + result.append(self._format_db_record(record)) + except Exception as e: + logger.error(f"Error fetching executors from database: {e}") return result - def get_executor(self, executor_id: str) -> Optional[Dict[str, Any]]: + async def get_executor(self, executor_id: str) -> Optional[Dict[str, Any]]: """ Get detailed information about a specific executor. + Checks active executors in memory first, then falls back to database. + Args: executor_id: The executor ID Returns: Detailed executor information or None if not found """ - # Check active executors first + # Check active executors first (memory) executor = self._active_executors.get(executor_id) if executor: return self._format_executor_info(executor_id, executor) - # Check completed executors - completed_info = self._completed_executors.get(executor_id) - if completed_info: - return completed_info + # Fallback to database for completed executors + if self.db_manager: + try: + async with self.db_manager.get_session_context() as session: + from database.repositories.executor_repository import ExecutorRepository + repo = ExecutorRepository(session) + + record = await repo.get_executor_by_id(executor_id) + if record: + return self._format_db_record(record) + except Exception as e: + logger.error(f"Error fetching executor from database: {e}") return None @@ -488,13 +527,6 @@ async def _handle_executor_completion(self, executor_id: str): metadata = self._executor_metadata.get(executor_id, {}) - # Format final executor info - final_info = self._format_executor_info(executor_id, executor) - final_info["completed_at"] = datetime.now(timezone.utc).isoformat() - - # Store in completed executors - self._completed_executors[executor_id] = final_info - # Check if this is a POSITION_HOLD close type (keep_position=True) if executor.close_type == CloseType.POSITION_HOLD: await self._aggregate_position_hold(executor_id, executor, metadata) @@ -515,97 +547,77 @@ def _format_executor_info( executor_id: str, executor: ExecutorBase ) -> Dict[str, Any]: - """Format executor information for API response. - - Uses Pydantic's model_dump(mode='json') for automatic serialization - of Decimal, Enum, and other complex types. - """ + """Format executor information for API response.""" metadata = self._executor_metadata.get(executor_id, {}) + executor_type = metadata.get("executor_type") - try: - # Use model_dump() then our custom serializer to handle TrackedOrder etc. - executor_info = executor.executor_info - result = json.loads(json.dumps(executor_info.model_dump(), default=_json_default)) + # Get executor_info and serialize + executor_info = executor.executor_info + result = json.loads(json.dumps(executor_info.model_dump(), default=_json_default)) - # Add our metadata (not part of ExecutorInfo model) - result["executor_id"] = executor_id - result["executor_type"] = metadata.get("executor_type") - result["account_name"] = metadata.get("account_name") - result["created_at"] = metadata.get("created_at").isoformat() if metadata.get("created_at") else None + # Add metadata + result["executor_id"] = executor_id + result["executor_type"] = executor_type + result["account_name"] = metadata.get("account_name") + result["created_at"] = metadata.get("created_at").isoformat() if metadata.get("created_at") else None - # Ensure connector_name and trading_pair from metadata take precedence - if metadata.get("connector_name"): - result["connector_name"] = metadata.get("connector_name") - if metadata.get("trading_pair"): - result["trading_pair"] = metadata.get("trading_pair") + if metadata.get("connector_name"): + result["connector_name"] = metadata.get("connector_name") + if metadata.get("trading_pair"): + result["trading_pair"] = metadata.get("trading_pair") - return result + # Read status/close_type directly from executor + result["status"] = executor.status.name + result["close_type"] = executor.close_type.name if executor.close_type else None + result["is_active"] = not executor.is_closed - except Exception as e: - # Fallback when executor_info validation fails (e.g., timestamp=None) - logger.warning(f"Error accessing executor_info for {executor_id}: {e}") - - # Try to get real values directly from executor - try: - is_trading = executor.is_trading if hasattr(executor, 'is_trading') else False - except Exception: - is_trading = False + # For grid executors, filter out heavy fields from custom_info + if executor_type == "grid_executor" and result.get("custom_info"): + heavy_fields = {"levels_by_state", "filled_orders", "failed_orders", "canceled_orders"} + result["custom_info"] = {k: v for k, v in result["custom_info"].items() if k not in heavy_fields} - try: - raw_custom_info = executor.get_custom_info() if hasattr(executor, 'get_custom_info') else None - # Convert to JSON-safe format (handles Decimals, Enums, etc.) - if raw_custom_info: - custom_info = json.loads(json.dumps(raw_custom_info, default=_json_default)) - else: - custom_info = None - except Exception: - custom_info = None + return result - try: - net_pnl_quote = float(executor.net_pnl_quote) if hasattr(executor, 'net_pnl_quote') else 0.0 - net_pnl_pct = float(executor.net_pnl_pct) if hasattr(executor, 'net_pnl_pct') else 0.0 - cum_fees_quote = float(executor.cum_fees_quote) if hasattr(executor, 'cum_fees_quote') else 0.0 - filled_amount_quote = float(executor.filled_amount_quote) if hasattr(executor, 'filled_amount_quote') else 0.0 - except Exception: - net_pnl_quote = 0.0 - net_pnl_pct = 0.0 - cum_fees_quote = 0.0 - filled_amount_quote = 0.0 - - return { - "executor_id": executor_id, - "executor_type": metadata.get("executor_type"), - "account_name": metadata.get("account_name"), - "connector_name": metadata.get("connector_name"), - "trading_pair": metadata.get("trading_pair"), - "side": None, - "status": executor.status.name if hasattr(executor, 'status') else "UNKNOWN", - "is_active": not executor.is_closed if hasattr(executor, 'is_closed') else True, - "is_trading": is_trading, - "timestamp": None, - "created_at": metadata.get("created_at").isoformat() if metadata.get("created_at") else None, - "close_type": executor.close_type.name if hasattr(executor, 'close_type') and executor.close_type else None, - "close_timestamp": None, - "controller_id": None, - "net_pnl_quote": net_pnl_quote, - "net_pnl_pct": net_pnl_pct, - "cum_fees_quote": cum_fees_quote, - "filled_amount_quote": filled_amount_quote, - "config": metadata.get("config"), - "custom_info": custom_info, - } + def _format_db_record(self, record) -> Dict[str, Any]: + """Format a database ExecutorRecord for API response.""" + return { + "executor_id": record.executor_id, + "executor_type": record.executor_type, + "account_name": record.account_name, + "connector_name": record.connector_name, + "trading_pair": record.trading_pair, + "side": None, + "status": record.status, + "close_type": record.close_type, + "is_active": record.status == "RUNNING", + "is_trading": False, + "timestamp": None, + "created_at": record.created_at.isoformat() if record.created_at else None, + "close_timestamp": record.closed_at.timestamp() if record.closed_at else None, + "closed_at": record.closed_at.isoformat() if record.closed_at else None, + "controller_id": None, + "net_pnl_quote": float(record.net_pnl_quote) if record.net_pnl_quote else 0.0, + "net_pnl_pct": float(record.net_pnl_pct) if record.net_pnl_pct else 0.0, + "cum_fees_quote": float(record.cum_fees_quote) if record.cum_fees_quote else 0.0, + "filled_amount_quote": float(record.filled_amount_quote) if record.filled_amount_quote else 0.0, + "config": json.loads(record.config) if record.config else None, + "custom_info": json.loads(record.final_state) if record.final_state else None, + } def get_summary(self) -> Dict[str, Any]: """ - Get summary statistics for all executors. + Get summary statistics for active executors. Returns: - Dictionary with aggregate statistics + Dictionary with aggregate statistics for active executors only. """ - executors = self.get_executors(include_completed=True) + executors = [] + + # Get active executors from memory + for executor_id, executor in self._active_executors.items(): + executors.append(self._format_executor_info(executor_id, executor)) - active_count = sum(1 for e in executors if e.get("is_active", False)) - completed_count = len(executors) - active_count + active_count = len(executors) total_pnl = sum(e.get("net_pnl_quote", 0) for e in executors) total_volume = sum(e.get("filled_amount_quote", 0) for e in executors) @@ -624,7 +636,6 @@ def get_summary(self) -> Dict[str, Any]: return { "total_active": active_count, - "total_completed": completed_count, "total_pnl_quote": total_pnl, "total_volume_quote": total_volume, "by_type": by_type, @@ -665,26 +676,51 @@ async def _persist_executor_completed(self, executor_id: str, executor: Executor return try: - # Try to get executor_info, handle validation errors (e.g., timestamp=None) + # Read status/close_type directly from executor (most reliable) + status_name = executor.status.name + close_type = executor.close_type.name if executor.close_type else None + + # Get PnL values from executor_info try: executor_info = executor.executor_info - status_name = executor_info.status.name - close_type = executor_info.close_type.name if executor_info.close_type else None net_pnl_quote = executor_info.net_pnl_quote net_pnl_pct = executor_info.net_pnl_pct cum_fees_quote = executor_info.cum_fees_quote filled_amount_quote = executor_info.filled_amount_quote - custom_info = executor_info.custom_info except Exception as e: - # Fallback when executor_info validation fails logger.debug(f"Error accessing executor_info for persistence: {e}") - status_name = executor.status.name if hasattr(executor, 'status') else "UNKNOWN" - close_type = executor.close_type.name if hasattr(executor, 'close_type') and executor.close_type else None net_pnl_quote = Decimal("0") net_pnl_pct = Decimal("0") cum_fees_quote = Decimal("0") filled_amount_quote = Decimal("0") - custom_info = None + + # Get custom_info directly from executor to avoid Pydantic serialization issues + # with TrackedOrder and other complex types + custom_info = executor.get_custom_info() + # Serialize custom_info, fallback to None if serialization fails + final_state_json = None + metadata = self._executor_metadata.get(executor_id, {}) + executor_type = metadata.get("executor_type") + if executor_type == "grid_executor": + heavy_fields = { + "levels_by_state", + "filled_orders", + "failed_orders", + "canceled_orders", + } + custom_info = {k: v for k, v in custom_info.items() if k not in heavy_fields} + + try: + final_state_json = json.dumps(custom_info, default=_json_default) + except Exception as e: + logger.warning(f"Failed to serialize custom_info for {executor_id}: {e}") + # Try a simpler serialization without complex objects + try: + simple_info = {k: v for k, v in custom_info.items() + if isinstance(v, (str, int, float, bool, list, dict, type(None)))} + final_state_json = json.dumps(simple_info) + except Exception: + final_state_json = None async with self.db_manager.get_session_context() as session: from database.repositories.executor_repository import ExecutorRepository @@ -698,7 +734,7 @@ async def _persist_executor_completed(self, executor_id: str, executor: Executor net_pnl_pct=net_pnl_pct, cum_fees_quote=cum_fees_quote, filled_amount_quote=filled_amount_quote, - final_state=json.dumps(custom_info, default=_json_default) if custom_info else None + final_state=final_state_json ) logger.debug(f"Persisted executor {executor_id} completion to database") @@ -706,21 +742,6 @@ async def _persist_executor_completed(self, executor_id: str, executor: Executor except Exception as e: logger.error(f"Error persisting executor completion: {e}") - def remove_completed_executor(self, executor_id: str) -> bool: - """ - Remove a completed executor from tracking. - - Args: - executor_id: The executor ID to remove - - Returns: - True if removed, False if not found - """ - if executor_id in self._completed_executors: - del self._completed_executors[executor_id] - return True - return False - # ======================================== # Position Hold Tracking Methods # ======================================== @@ -795,16 +816,15 @@ async def _aggregate_position_hold( if filled_amount_quote == 0 and custom_info: filled_amount_quote = Decimal(str(custom_info.get("filled_amount_quote", 0))) - # For grid executors, aggregate from held_position_orders - if metadata.get("executor_type") == "grid_executor" and custom_info: + # Check for held_position_orders (used by grid_executor, position_executor, etc.) + held_orders = custom_info.get("held_position_orders", []) if custom_info else [] + + if held_orders: buy_filled_base = Decimal("0") buy_filled_quote = Decimal("0") sell_filled_base = Decimal("0") sell_filled_quote = Decimal("0") - # held_position_orders contains the orders kept when keep_position=True - held_orders = custom_info.get("held_position_orders", []) - for order in held_orders: if isinstance(order, dict): trade_type = order.get("trade_type", "BUY") @@ -825,7 +845,7 @@ async def _aggregate_position_hold( position.add_fill("SELL", sell_filled_base, sell_filled_quote, executor_id) logger.info( - f"Aggregated grid executor {executor_id} to position {position_key}: " + f"Aggregated executor {executor_id} to position {position_key}: " f"buy={buy_filled_base} base, sell={sell_filled_base} base" )