diff --git a/mesh-agents/TokenIntel-TelegramBot/main.py b/mesh-agents/TokenIntel-TelegramBot/main.py index 9272d6d..33f6280 100644 --- a/mesh-agents/TokenIntel-TelegramBot/main.py +++ b/mesh-agents/TokenIntel-TelegramBot/main.py @@ -1,31 +1,84 @@ import os +import sys import anyio -from agents import Agent, function_tool, set_tracing_disabled, Runner + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "src"))) + +# from agents import Agent, function_tool, set_tracing_disabled, Runner from pydantic import BaseModel from dotenv import load_dotenv -from tools import ( + +# Import tools +from src.tools import ( ElfaTwitterIntelligenceAgent, ExaSearchAgent, MetaSleuthSolTokenWalletClusterAgent, SolWalletAgent, FirecrawlSearchAgent, ) + +# Import model configuration and custom agent wrapper +from src.model_config import validate_model_setup, get_model_config +from src.custom_agent_wrapper import create_custom_agent, MockRunner + import re from typing import Optional, Dict, Any from datetime import datetime from telebot.async_telebot import AsyncTeleBot import httpx +# Load environment variables load_dotenv() -OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") + +print("๐ Starting Multi-Provider Token Intelligence Bot...") +print("=" * 60) + +# Validate model configuration at startup +print("๐ง Validating model configuration...") +if not validate_model_setup(): + print("โ Model setup validation failed. Please check your configuration.") + print("Make sure your .env file has correct MODEL_PROVIDER, MODEL, and API_KEY") + sys.exit(1) + +config = get_model_config() +print( + f"โ Successfully configured {config.provider.upper()} with model: {config.model}" +) + +# Telegram configuration validation +TELEGRAM_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN") +TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID") HEURIST_KEY = os.environ.get("HEURIST_KEY") + +print("\n๐ง Validating Telegram configuration...") +if not TELEGRAM_TOKEN: + print("โ TELEGRAM_BOT_TOKEN is required in .env file") + sys.exit(1) +if not TELEGRAM_CHAT_ID: + print("โ TELEGRAM_CHAT_ID is required in .env file") + sys.exit(1) +if not HEURIST_KEY: + print("โ HEURIST_KEY is required for intelligence tools") + sys.exit(1) + +print(f"โ Telegram bot configured for chat ID: {TELEGRAM_CHAT_ID}") +print("โ Intelligence tools configured with Heurist API") + +# Optional: Disable tracing for cleaner output # set_tracing_disabled(True) -elfa = ElfaTwitterIntelligenceAgent() -exa = ExaSearchAgent() -crawl = FirecrawlSearchAgent() -cluster = MetaSleuthSolTokenWalletClusterAgent() -helius = SolWalletAgent() +# Initialize intelligence agents +print("\n๐ง Initializing intelligence agents...") +try: + elfa = ElfaTwitterIntelligenceAgent() + exa = ExaSearchAgent() + crawl = FirecrawlSearchAgent() + cluster = MetaSleuthSolTokenWalletClusterAgent() + helius = SolWalletAgent() + print("โ All intelligence agents initialized successfully") +except Exception as e: + print(f"โ Failed to initialize intelligence agents: {e}") + sys.exit(1) class SearchResult(BaseModel): @@ -37,34 +90,39 @@ def __init__(self): self.tokens = {} async def check_token_address(self, address: str) -> Optional[Dict[str, Any]]: + """Check token information by address across multiple chains""" chains = ["solana", "bsc", "base", "eth"] for chain in chains: url = f"https://api.dexscreener.com/tokens/v1/{chain}/{address}" try: - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) data = response.json() if isinstance(data, list) and len(data) > 0: pair = data[0] return self._extract_token_info(pair) - except Exception: + except Exception as e: + print(f"Error checking {chain} for {address}: {e}") continue return None async def search_token_symbol(self, symbol: str) -> Optional[Dict[str, Any]]: + """Search token information by symbol""" url = f"https://api.dexscreener.com/latest/dex/search?q={symbol}" try: - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) data = response.json() if "pairs" in data and data["pairs"]: pair = data["pairs"][0] return self._extract_token_info(pair) - except Exception: + except Exception as e: + print(f"Error searching symbol {symbol}: {e}") return None return None def _extract_token_info(self, pair: Dict) -> Dict[str, Any]: + """Extract relevant token information from API response""" info = pair.get("info", {}) website = next( (w["url"] for w in info.get("websites", []) if w.get("label") == "Website"), @@ -91,138 +149,226 @@ def _extract_token_info(self, pair: Dict) -> Dict[str, Any]: return token_info -twitter_search_agent = Agent( - name="twitter symbol searcher", - instructions="Search tweets for the given symbol/token address. Answer user questions first; if none, provide a concise bullet-point summary in English. Avoid styling like ** or ##.", - model="gpt-4o", - tools=[function_tool(elfa.search_mentions)], -) - -twitter_account_agent = Agent( - name="twitter account searcher", - instructions="Get tweets for the given username. Answer user questions first; if none, provide a concise bullet-point summary of this account's tweets in English. Avoid styling like ** or ##.", - model="gpt-4o", - tools=[function_tool(elfa.search_account)], -) - -exa_agent = Agent( - name="web3 info searcher", - instructions="Search for information about the given crypto symbol. Answer user questions first; if none, briefly summarize in English. Avoid styling like ** or ##.", - model="gpt-4o", - tools=[function_tool(exa.exa_web_search), function_tool(exa.exa_answer_question)], -) - -cluster_agent = Agent( - name="token clusters analyzer", - instructions="Extract the token address from the user's query. Identify and explain significant holder clusters in English. Present findings in a structured format. Use only available data for the final analysis.", - model="gpt-4o", - tools=[function_tool(cluster.fetch_token_clusters)], -) - -wallet_agent = Agent( - name="solana wallet analyzer", - instructions="Extract the token address from the user's query. Provide insights into the most valuable tokens held by top holders in English.", - model="gpt-4o", - tools=[function_tool(helius.analyze_common_holdings_of_top_holders)], -) - -crawl_agent = Agent( - name="webpage crawler", - instructions="Crawl the webpage and extract the data. Briefly summarize the purpose of the site in English.", - model="gpt-4o", - tools=[function_tool(crawl.firecrawl_extract_web_data)], -) - -bot = AsyncTeleBot(os.environ.get("TELEGRAM_TOKEN")) +print("\n๐ค Setting up AI agents with custom multi-provider wrapper...") +print(f"๐ฏ Provider: {config.provider}") +print(f"๐ง Model: {config.model}") +print(f"๐ API key configured: {config.api_key[:10]}...") + +try: + twitter_search_agent = create_custom_agent( + name="twitter symbol searcher", + instructions="Search tweets for the given symbol/token address. Answer user questions first; if none, provide a concise bullet-point summary in English. Avoid styling like ** or ##.", + tools=[elfa.search_mentions], + ) + + twitter_account_agent = create_custom_agent( + name="twitter account searcher", + instructions="Get tweets for the given username. Answer user questions first; if none, provide a concise bullet-point summary of this account's tweets in English. Avoid styling like ** or ##.", + tools=[elfa.search_account], + ) + + exa_agent = create_custom_agent( + name="web3 info searcher", + instructions="Search for information about the given crypto symbol. Answer user questions first; if none, briefly summarize in English. Avoid styling like ** or ##.", + tools=[exa.exa_web_search, exa.exa_answer_question], + ) + + cluster_agent = create_custom_agent( + name="token clusters analyzer", + instructions="Extract the token address from the user's query. Identify and explain significant holder clusters in English. Present findings in a structured format. Use only available data for the final analysis.", + tools=[cluster.fetch_token_clusters], + ) + + wallet_agent = create_custom_agent( + name="solana wallet analyzer", + instructions="Extract the token address from the user's query. Provide insights into the most valuable tokens held by top holders in English.", + tools=[helius.analyze_common_holdings_of_top_holders], + ) + + crawl_agent = create_custom_agent( + name="webpage crawler", + instructions="Crawl the webpage and extract the data. Briefly summarize the purpose of the site in English.", + tools=[crawl.firecrawl_extract_web_data], + ) + + print("โ All AI agents configured successfully with custom wrapper") + print(f"๐ง Using {config.provider} provider with model: {config.model}") + +except Exception as e: + print(f"โ Failed to initialize AI agents: {e}") + sys.exit(1) + +bot = AsyncTeleBot(TELEGRAM_TOKEN) token_tracker = TokenInfo() def task_wrapper(agent, param, message): + """Wrapper function to handle agent tasks with error handling""" + async def task(): try: - result = await Runner.run(agent, param) - if result and result.final_output: - prefix = "" - if result.last_agent.name == "twitter symbol searcher": - prefix = "๐ท Social Media Sentiment Scan ๐ท\n" - elif result.last_agent.name == "twitter account searcher": - prefix = "๐ท Key Events from Official Twitter ๐ท\n" - elif result.last_agent.name == "web3 info searcher": - prefix = "๐ท Search Engine Data Scan ๐ท\n" - elif result.last_agent.name == "webpage crawler": - prefix = "๐ท Website Data Capture ๐ท\n" - elif result.last_agent.name == "solana wallet analyzer": - prefix = "๐ท Related Holdings Analysis ๐ท\n" - output = re.sub( - r"\*\*([^*]+)\*\*", r"\1", result.final_output - ) - final_message = prefix + f"
{output}" - await bot.reply_to(message, final_message, parse_mode="HTML") - return - await bot.reply_to(message, prefix + result.final_output) + print(f"๐ Running {agent.name} with param: {param[:100]}...") + result = await MockRunner.run(agent, param) + + if result and result.get("final_output"): + final_output = result["final_output"] + if ( + result["last_agent"].name == "solana wallet analyzer" + and "**" in final_output + ): + output = re.sub(r"\*\*([^*]+)\*\*", r"\1", final_output) + await bot.reply_to(message, output, parse_mode="HTML") + else: + await bot.reply_to(message, final_output) + + print(f"โ Completed {agent.name}") + else: + print(f"โ ๏ธ No output from {agent.name}") + except Exception as e: - print(f"Error in task: {e}") + print(f"โ Error in {agent.name}: {e}") + error_msg = f"โ ๏ธ {agent.name.title()} Temporarily Unavailable\n\nPlease try again in a moment." + await bot.reply_to(message, error_msg) return task @bot.message_handler(func=lambda message: True) async def msg_entry(message): + """Main message handler for the bot""" try: - print(f"Received message: {message.text} from chat ID: {message.chat.id}") + print( + f"\n๐จ Received message: '{message.text}' from chat ID: {message.chat.id}" + ) + if str(message.chat.id) != TELEGRAM_CHAT_ID: + print(f"๐ซ Unauthorized chat ID: {message.chat.id}") + return - if str(message.chat.id) != os.environ.get("TELEGRAM_CHAT_ID"): + message_text = message.text.strip() + if message_text.lower() == "/status": + status_msg = f"""๐ค **Bot Status Report** + + **AI Configuration:** + โข Provider: {config.provider.upper()} + โข Model: {config.model} + โข Temperature: {config.temperature} + โข Max Tokens: {config.max_tokens} + + **System Status:** + โ Model validated and operational + โ Intelligence tools connected + โ Telegram bot active + โ Chat authorized: {TELEGRAM_CHAT_ID} + + **Available Intelligence:** + ๐ฆ Twitter sentiment analysis + ๐ Web search and crawling + ๐ฐ Solana wallet analysis + ๐ Token holder clusters + ๐ Website content extraction + + Ready for token analysis! ๐""" + await bot.reply_to(message, status_msg, parse_mode="Markdown") return - message_text = message.text + if message_text.lower() == "/help": + help_msg = f"""๐ **Token Intelligence Bot Help** + + **Current Setup:** {config.provider.upper()} ({config.model}) + + **How to Use:** + โข Send a token symbol: `$BTC`, `$ETH`, `$SOL` + โข Send a contract address: `0x1234...` or Solana address + โข Add questions after the symbol: `$BTC what's the sentiment?` + + **Commands:** + โข `/status` - Check bot configuration and health + โข `/help` - Show this help message + + **Analysis Features:** + ๐ฆ Social media sentiment from Twitter + ๐ Web intelligence and news analysis + ๐ฐ Wallet analysis for top holders + ๐ Token holder cluster identification + ๐ Official website content analysis + ๐ฑ Official social media activity + + **Supported Providers:** + OpenAI โข Anthropic โข OpenRouter โข XAI โข Heurist + + *Currently using: {config.provider.upper()}*""" + await bot.reply_to(message, help_msg, parse_mode="Markdown") + return + + # Parse token symbols and addresses symbol_match = re.search(r"\$(\w+)", message_text) sol_address_match = re.search( r"([A-Za-z0-9]{32,44}(?:pump|sol)?)", message_text ) evm_address_match = re.search(r"(0x[A-Fa-f0-9]{40})", message_text) + # Get token information token_info = None + search_term = None + if symbol_match: - symbol = symbol_match.group(1) - token_info = await token_tracker.search_token_symbol(symbol) + search_term = symbol_match.group(1) + print(f"๐ Searching for token symbol: ${search_term}") + token_info = await token_tracker.search_token_symbol(search_term) elif sol_address_match or evm_address_match: - address = ( + search_term = ( sol_address_match.group(1) if sol_address_match else evm_address_match.group(1) ) - token_info = await token_tracker.check_token_address(address) + print(f"๐ Searching for token address: {search_term}") + token_info = await token_tracker.check_token_address(search_term) if not token_info: - await bot.reply_to( - message, - "โ ๏ธ Could not recognize this symbol or address. Please check the format.", - ) + error_msg = f"""โ ๏ธ **Token Not Found** + + Could not find information for: `{search_term or "unknown"}` + + **Supported formats:** + โข Symbols: $BTC, $ETH, $SOL + โข Ethereum: 0x1234567890abcdef1234567890abcdef12345678 + โข Solana: 7h8Nq9XYz5T6jQqsQ8E3jKQwHYyGFw8z9xqGc3T4vJHQf + + Please check the format and try again.""" + await bot.reply_to(message, error_msg, parse_mode="Markdown") return - plan_message = "AI Research Plan Generated\n" + print(f"โ Found token: {token_info['symbol']} ({token_info['chain']})") + + # Generate analysis plan + plan_message = ( + f"๐ค **AI Research Plan Generated** ({config.provider.upper()})\n\n" + ) plan_items = [] tasks = [] + + # Extract any additional query content query_content = re.sub( r"\$\w+|[A-Za-z0-9]{43,44}|0x[A-Fa-f0-9]{40}", "", message_text ).strip() + # 1. Twitter sentiment analysis (always included) twitter_search_msg = ( f"symbol:{token_info['symbol']},token address:{token_info['address']}" ) if query_content: twitter_search_msg += f",query:{query_content}" tasks.append(task_wrapper(twitter_search_agent, twitter_search_msg, message)) - plan_items.append( - "๐น Social sentiment scanning initiated using Twitter agent..." - ) + plan_items.append("๐น Social sentiment scanning via Twitter intelligence...") + # 2. Website crawling (if website available) if token_info.get("website"): tasks.append( task_wrapper(crawl_agent, f"check url:{token_info['website']}", message) ) - plan_items.append("๐น Website crawling initiated using crawler agent...") + plan_items.append("๐น Website content extraction and analysis...") + # 3. Official Twitter account analysis (if available) if token_info.get("twitter"): twitter_username = re.search(r"twitter\.com/([^/]+)", token_info["twitter"]) if twitter_username: @@ -230,18 +376,18 @@ async def msg_entry(message): tasks.append( task_wrapper(twitter_account_agent, f"username:{username}", message) ) - plan_items.append("๐น Official Twitter activity check triggered...") + plan_items.append("๐น Official Twitter account activity analysis...") + # 4. Solana-specific analysis (wallet and clusters) if token_info.get("chain") == "solana": tasks.append( task_wrapper( wallet_agent, f"token address:{token_info['address']}", message ) ) - plan_items.append( - "๐น Solana wallet scan triggered to analyze top holder assets..." - ) + plan_items.append("๐น Solana top holder portfolio analysis...") + # 5. Web search for established tokens try: created_at = datetime.strptime(token_info["createdAt"], "%Y-%m-%d %H:%M:%S") days_since_creation = (datetime.now() - created_at).days @@ -252,38 +398,69 @@ async def msg_entry(message): if query_content: exa_msg += f",query:{query_content}" tasks.append(task_wrapper(exa_agent, exa_msg, message)) - plan_items.append( - "๐น Search engine analysis triggered via Exa agent..." - ) + plan_items.append("๐น Web intelligence gathering via search engines...") except (ValueError, TypeError) as e: - print(f"Error processing Exa search task: {e}") + print(f"โ ๏ธ Error processing Exa search criteria: {e}") + + # Send the plan + full_plan = plan_message + "\n".join(plan_items) + full_plan += f"\n\n๐ **Token:** {token_info['symbol']} ({token_info['chain']})" + full_plan += f"\n๐ **Address:** `{token_info['address'][:10]}...{token_info['address'][-6:]}`" - await bot.reply_to(message, plan_message + "\n".join(plan_items)) + await bot.reply_to(message, full_plan, parse_mode="Markdown") + # Execute all tasks concurrently if tasks: + print(f"๐ Executing {len(tasks)} analysis tasks...") try: async with anyio.create_task_group() as tg: for task in tasks: tg.start_soon(task) + print("โ All analysis tasks completed") except Exception as e: - print(f"Error in task group: {e}") + print(f"โ Error in task execution: {e}") + await bot.reply_to( + message, + f"โ Error executing analysis tasks: {str(e)[:100]}...\n\nPlease try again or contact support.", + ) + else: + await bot.reply_to( + message, + "โ ๏ธ No analysis tasks were scheduled. Please try a different token.", + ) except Exception as e: - print(f"Error in message handler: {e}") + print(f"โ Critical error in message handler: {e}") + await bot.reply_to( + message, + f"โ A critical error occurred: {str(e)[:100]}...\n\nPlease try again or use /status to check bot health.", + ) async def main(): + """Main bot loop with error handling""" try: - print("Bot started...") + print(f"\n๐ Starting bot with {config.provider.upper()} ({config.model})") + print(f"๐ฑ Authorized for chat ID: {TELEGRAM_CHAT_ID}") + print(f"๐ง Temperature: {config.temperature}, Max tokens: {config.max_tokens}") + print("=" * 60) + print("๐ฏ Bot is ready! Send a token symbol or address to begin analysis.") + print("๐ก Use /help for usage instructions or /status for health check") + print("=" * 60) + await bot.infinity_polling(skip_pending=True) + except Exception as e: - print(f"Error in main loop: {e}") + print(f"โ Critical error in bot main loop: {e}") + print("๐ Bot will attempt to restart...") if __name__ == "__main__": try: anyio.run(main) except KeyboardInterrupt: - print("Bot stopped gracefully") + print("\n๐ Bot stopped by user (Ctrl+C)") + print("๐ Goodbye!") except Exception as e: - print(f"Fatal error: {e}") + print(f"๐ฅ Fatal error: {e}") + print("๐ง Please check your configuration and try again") diff --git a/mesh-agents/TokenIntel-TelegramBot/src/custom_agent_wrapper.py b/mesh-agents/TokenIntel-TelegramBot/src/custom_agent_wrapper.py new file mode 100644 index 0000000..013c6c3 --- /dev/null +++ b/mesh-agents/TokenIntel-TelegramBot/src/custom_agent_wrapper.py @@ -0,0 +1,392 @@ +import httpx +from typing import Any, Dict, List, Optional +from .model_config import get_model_config +import logging + +# Set up logging +logger = logging.getLogger(__name__) + + +class CustomAgentWrapper: + """Custom agent wrapper that uses our multi-provider configuration""" + + def __init__(self, name: str, instructions: str, tools: Optional[List] = None): + self.name = name + self.instructions = instructions + self.tools = tools or [] + self.config = get_model_config() + + # Agent type mappings for better organization + self.agent_types = { + "twitter_symbol": "twitter symbol searcher", + "twitter_account": "twitter account searcher", + "web3_info": "web3 info searcher", + "webpage_crawler": "webpage crawler", + "solana_wallet": "solana wallet analyzer", + "token_clusters": "token clusters analyzer", + } + + async def run_with_tools(self, param: str) -> Dict[str, Any]: + """Run the agent with tools simulation""" + try: + context = self._parse_parameters(param) + prompt = self._generate_prompt(context, param) + response = await self._make_api_call(prompt) + + return {"final_output": response, "last_agent": self, "status": "success"} + + except Exception as e: + logger.error(f"Error in {self.name}: {e}") + error_response = self._generate_error_response() + return { + "final_output": error_response, + "last_agent": self, + "status": "error", + "error": str(e), + } + + def _parse_parameters(self, param: str) -> Dict[str, str]: + """Parse input parameters into a context dictionary""" + context = {} + if not param: + return context + + # Handle both comma-separated and colon-separated formats + parts = param.split(",") + for part in parts: + part = part.strip() + if ":" in part: + key, value = part.split(":", 1) + context[key.strip().lower()] = value.strip() + else: + # Handle simple parameters without colons + context["query"] = part + + return context + + def _generate_prompt(self, context: Dict[str, str], original_param: str) -> str: + """Generate appropriate prompt based on agent type and context""" + agent_name_lower = self.name.lower() + + # Twitter symbol analysis + if any(x in agent_name_lower for x in ["twitter symbol", "symbol searcher"]): + symbol = context.get("symbol") or context.get("query", "unknown") + return f"Analyze social media sentiment for cryptocurrency symbol: {symbol}. Provide a brief summary of recent mentions, sentiment trends, and key insights." + + # Twitter account analysis + elif any( + x in agent_name_lower for x in ["twitter account", "account searcher"] + ): + username = context.get("username") or context.get("query", "unknown") + return f"Analyze Twitter activity for username: {username}. Summarize recent tweets, engagement patterns, and notable activity." + + # Web3 information search + elif any(x in agent_name_lower for x in ["web3 info", "info searcher"]): + symbol = context.get("symbol") or context.get("query", "unknown") + return f"Search for comprehensive information about cryptocurrency: {symbol}. Include recent news, price movements, development updates, and market analysis." + + # Webpage crawler + elif any(x in agent_name_lower for x in ["webpage", "crawler"]): + url = context.get("url") or original_param.replace("check url:", "").strip() + return f"Analyze the website: {url}. Provide a comprehensive summary including the site's purpose, key information, credibility assessment, and relevant details." + + # Solana wallet analyzer + elif any(x in agent_name_lower for x in ["solana wallet", "wallet analyzer"]): + token_address = ( + context.get("token address") + or context.get("address") + or context.get("query", "unknown") + ) + return f"Analyze Solana token holders for address: {token_address}. Provide detailed insights into holder distribution, whale activity, and significant wallet patterns." + + # Token clusters analyzer + elif any( + x in agent_name_lower for x in ["token clusters", "clusters analyzer"] + ): + token_address = ( + context.get("token address") + or context.get("address") + or context.get("query", "unknown") + ) + return f"Analyze wallet clusters for token address: {token_address}. Identify behavioral patterns, cluster relationships, and notable holder dynamics." + + # Default fallback + else: + query = context.get("query") or original_param + return f"Provide a comprehensive analysis for the following request: {query}. Include relevant insights and actionable information." + + def _generate_error_response(self) -> str: + """Generate a user-friendly error response""" + agent_type = "analysis" + if "twitter" in self.name.lower(): + agent_type = "social media analysis" + elif "web3" in self.name.lower() or "solana" in self.name.lower(): + agent_type = "blockchain analysis" + elif "webpage" in self.name.lower(): + agent_type = "website analysis" + + return f"๐ง {agent_type.title()} Service Unavailable\n\nโข The {agent_type} service is currently experiencing issues\nโข Please try again in a few moments\nโข Contact support if the issue persists\n\n๐ก We're working to resolve this quickly" + + async def _make_api_call(self, prompt: str) -> str: + """Make API call using the configured provider""" + try: + if self.config.provider == "heurist": + return await self._call_heurist_api(prompt) + else: + # For other providers, use model manager + return await self._call_generic_api(prompt) + except Exception as e: + logger.error(f"API call failed: {e}") + raise + + async def _call_generic_api(self, prompt: str) -> str: + """Call API using generic model manager""" + try: + from .model_config import get_model_manager + + manager = get_model_manager() + response = await manager.complete([{"role": "user", "content": prompt}]) + return self._format_response(response) + except Exception as e: + logger.error(f"Generic API call failed: {e}") + raise + + async def _call_heurist_api(self, prompt: str) -> str: + """Call Heurist API with proper error handling and formatting""" + url = "https://llm-gateway.heurist.xyz/v1/chat/completions" + + system_message = self._create_system_message() + + payload = { + "max_tokens": min(self.config.max_tokens, 800), + "stream": False, + "model": self.config.model, + "temperature": getattr(self.config, "temperature", 0.3), + "messages": [ + {"role": "system", "content": system_message}, + {"role": "user", "content": prompt}, + ], + } + + headers = { + "Authorization": f"Bearer {self.config.api_key}", + "Content-Type": "application/json", + } + + try: + timeout = httpx.Timeout(30.0, connect=10.0) + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post(url, json=payload, headers=headers) + response.raise_for_status() + + response_data = response.json() + + if "choices" in response_data and response_data["choices"]: + content = response_data["choices"][0]["message"]["content"] + return self._format_response(content) + else: + raise ValueError("No valid response from API") + + except httpx.HTTPStatusError as e: + logger.error( + f"HTTP error calling Heurist API: {e.response.status_code} - {e.response.text}" + ) + raise + except httpx.TimeoutException: + logger.error("Timeout calling Heurist API") + raise + except Exception as e: + logger.error(f"Unexpected error calling Heurist API: {e}") + raise + + def _create_system_message(self) -> str: + """Create a comprehensive system message""" + base_instructions = f"{self.instructions}\n\n" + + formatting_rules = """ + You are a specialized assistant for cryptocurrency and blockchain analysis. Follow these formatting guidelines: + + RESPONSE FORMAT: + 1. Start with an emoji header (๐, ๐, ๐ฐ, etc.) + 2. Use bullet points with โข symbol (never *) + 3. Keep responses under 600 characters for mobile + 4. NO bold formatting with ** + 5. Include strategic emojis for readability + 6. End with a brief insight or conclusion + + CONTENT GUIDELINES: + - Provide accurate, actionable information + - Focus on key insights and trends + - Use clear, concise language + - Include relevant context when possible + - Maintain professional but accessible tone + """ + + return base_instructions + formatting_rules + + def _format_response(self, content: str) -> str: + """Format response for optimal Telegram display""" + if not content or not content.strip(): + return "๐ Analysis Completed\n\nโข Processing completed successfully\nโข No specific insights generated\n\n๐ก Try a more specific query" + + content = content.strip() + + # Clean up formatting + content = self._clean_formatting(content) + + # Ensure proper structure + content = self._ensure_proper_structure(content) + + # Limit length + content = self._limit_length(content) + + return content + + def _clean_formatting(self, content: str) -> str: + """Clean up text formatting""" + # Remove bold formatting + content = content.replace("**", "").replace("__", "") + + # Standardize bullet points + content = content.replace("* ", "โข ") + content = content.replace("* ", "โข ") + content = content.replace("- ", "โข ") + + # Clean up multiple newlines + while "\n\n\n" in content: + content = content.replace("\n\n\n", "\n\n") + + return content + + def _ensure_proper_structure(self, content: str) -> str: + """Ensure content has proper structure with emoji header""" + lines = [line.strip() for line in content.split("\n") if line.strip()] + + # Add emoji header if missing + if lines and not any( + emoji in lines[0] + for emoji in [ + "๐", + "๐", + "๐ก", + "โ ๏ธ", + "๐", + "๐", + "๐", + "๐ฐ", + "๐ฅ", + "๐", + "๐ฑ", + "๐ฏ", + ] + ): + # Choose appropriate emoji based on agent type + if "twitter" in self.name.lower(): + emoji_header = "๐ฆ Social Analysis" + elif "web3" in self.name.lower() or "solana" in self.name.lower(): + emoji_header = "โ๏ธ Blockchain Analysis" + elif "webpage" in self.name.lower(): + emoji_header = "๐ Website Analysis" + else: + emoji_header = "๐ Analysis" + + lines.insert(0, emoji_header) + lines.insert(1, "") # Add spacing + + return "\n".join(lines) + + def _limit_length(self, content: str, max_length: int = 600) -> str: + """Limit content length while preserving structure""" + if len(content) <= max_length: + # Ensure it ends with conclusion + if not any( + keyword in content.lower() + for keyword in ["๐ก", "๐ฏ", "conclusion", "insight"] + ): + content += "\n\n๐ก Analysis complete" + return content + + # Truncate at sentence or line boundary + truncated = content[: max_length - 50] + + # Find last complete line + last_newline = truncated.rfind("\n") + if last_newline > max_length * 0.7: # If we can keep most content + truncated = truncated[:last_newline] + else: + # Find last sentence + last_period = truncated.rfind(".") + if last_period > max_length * 0.5: + truncated = truncated[: last_period + 1] + + truncated += "\n\n๐ก Full analysis available on request" + return truncated + + +class MockRunner: + """Mock runner to simulate the agents library Runner""" + + @staticmethod + async def run( + starting_agent: CustomAgentWrapper, input_param: str, **kwargs + ) -> Dict[str, Any]: + """Run the custom agent with proper error handling""" + if not isinstance(starting_agent, CustomAgentWrapper): + raise TypeError("starting_agent must be a CustomAgentWrapper instance") + + try: + return await starting_agent.run_with_tools(input_param) + except Exception as e: + logger.error(f"MockRunner error: {e}") + return { + "final_output": "๐ง Service Temporarily Unavailable\n\nโข Analysis could not be completed\nโข Please try again shortly\n\n๐ก Contact support if issues persist", + "last_agent": starting_agent, + "status": "error", + "error": str(e), + } + + +def create_custom_agent( + name: str, instructions: str, tools: Optional[List] = None +) -> CustomAgentWrapper: + """ + Create a custom agent that works with our multi-provider setup + + Args: + name: Agent name/identifier + instructions: System instructions for the agent + tools: Optional list of tools (currently unused but kept for compatibility) + + Returns: + CustomAgentWrapper instance + """ + if not name or not instructions: + raise ValueError("Both name and instructions are required") + + return CustomAgentWrapper(name, instructions, tools) + + +# Utility functions for common agent types +def create_twitter_symbol_agent(instructions: str = "") -> CustomAgentWrapper: + """Create a Twitter symbol analysis agent""" + default_instructions = ( + "Analyze social media sentiment and mentions for cryptocurrency symbols." + ) + final_instructions = instructions or default_instructions + return create_custom_agent("twitter symbol searcher", final_instructions) + + +def create_web3_info_agent(instructions: str = "") -> CustomAgentWrapper: + """Create a Web3 information search agent""" + default_instructions = ( + "Search and analyze cryptocurrency and blockchain information." + ) + final_instructions = instructions or default_instructions + return create_custom_agent("web3 info searcher", final_instructions) + + +def create_solana_wallet_agent(instructions: str = "") -> CustomAgentWrapper: + """Create a Solana wallet analysis agent""" + default_instructions = "Analyze Solana wallet addresses and token holder patterns." + final_instructions = instructions or default_instructions + return create_custom_agent("solana wallet analyzer", final_instructions) diff --git a/mesh-agents/TokenIntel-TelegramBot/src/model_config.py b/mesh-agents/TokenIntel-TelegramBot/src/model_config.py new file mode 100644 index 0000000..b61512f --- /dev/null +++ b/mesh-agents/TokenIntel-TelegramBot/src/model_config.py @@ -0,0 +1,288 @@ +import os +from typing import Dict, Any, Optional +from dataclasses import dataclass +import litellm +from dotenv import load_dotenv + +load_dotenv() + + +@dataclass +class ModelConfig: + """Configuration class for multi-provider model support""" + + provider: str + model: str + api_key: str + base_url: Optional[str] = None + temperature: float = 0.1 + max_tokens: int = 4000 + + @classmethod + def from_env(cls) -> "ModelConfig": + """Create ModelConfig from environment variables""" + provider = os.environ.get("MODEL_PROVIDER", "openai").lower() + model = os.environ.get("MODEL", "gpt-4o") + api_key = os.environ.get("API_KEY") + temperature = float(os.environ.get("TEMPERATURE", "0.1")) + max_tokens = int(os.environ.get("MAX_TOKENS", "4000")) + + if not api_key: + raise ValueError("API_KEY is required in environment variables") + + valid_providers = ["openai", "anthropic", "openrouter", "xai", "heurist"] + if provider not in valid_providers: + raise ValueError( + f"MODEL_PROVIDER must be one of: {valid_providers}. Got: {provider}" + ) + + return cls( + provider=provider, + model=model, + api_key=api_key, + temperature=temperature, + max_tokens=max_tokens, + ) + + def setup_environment_keys(self): + """Setup environment variables for LiteLLM based on provider""" + # Clear any existing API keys to avoid conflicts + keys_to_clear = [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "OPENROUTER_API_KEY", + "XAI_API_KEY", + ] + + for key in keys_to_clear: + if key in os.environ: + del os.environ[key] + + # Set the appropriate API key based on provider (following reference pattern) + if self.provider == "openai": + os.environ["OPENAI_API_KEY"] = self.api_key + + elif self.provider == "anthropic": + os.environ["ANTHROPIC_API_KEY"] = self.api_key + + elif self.provider == "openrouter": + os.environ["OPENROUTER_API_KEY"] = self.api_key + + elif self.provider == "xai": + os.environ["XAI_API_KEY"] = self.api_key + + elif self.provider == "heurist": + # For Heurist, set OpenAI key since it's OpenAI-compatible + os.environ["OPENAI_API_KEY"] = self.api_key + + def get_litellm_model_name(self) -> str: + """Get the properly formatted model name for LiteLLM based on provider (following reference pattern)""" + if self.provider == "openai": + return self.model + + elif self.provider == "anthropic": + if self.model.startswith("anthropic/"): + return self.model + return f"anthropic/{self.model}" + + elif self.provider == "openrouter": + # For OpenRouter, use openrouter/ prefix to tell LiteLLM to route through OpenRouter + if self.model.startswith("openrouter/"): + return self.model + return f"openrouter/{self.model}" + + elif self.provider == "xai": + if self.model.startswith("xai/"): + return self.model + return f"xai/{self.model}" + + elif self.provider == "heurist": + # Following the reference pattern from settings.py + # Heurist is an OpenAI-compatible endpoint, so use openai/ prefix + if self.model.startswith("openai/"): + return self.model + return f"openai/{self.model}" + + return self.model + + def get_litellm_config(self) -> Dict[str, Any]: + """Get configuration dict for LiteLLM following reference pattern""" + self.setup_environment_keys() + + config = { + "temperature": self.temperature, + "max_tokens": self.max_tokens, + } + + # Apply provider-specific configuration following reference pattern + if self.provider == "openai": + config["model"] = self.model + + elif self.provider == "anthropic": + config["model"] = ( + f"anthropic/{self.model}" + if not self.model.startswith("anthropic/") + else self.model + ) + + elif self.provider == "openrouter": + # For OpenRouter, we need to use the openrouter/ prefix to tell LiteLLM to route through OpenRouter + config["model"] = ( + f"openrouter/{self.model}" + if not self.model.startswith("openrouter/") + else self.model + ) + + elif self.provider == "xai": + config["model"] = ( + f"xai/{self.model}" if not self.model.startswith("xai/") else self.model + ) + + elif self.provider == "heurist": + # Following the exact pattern from reference settings.py + config["base_url"] = "https://llm-gateway.heurist.xyz/v1" + config["model"] = ( + f"openai/{self.model}" + if not self.model.startswith("openai/") + else self.model + ) + + print(f"๐ง Final LiteLLM config: {config}") + return config + + def setup_litellm(self): + """Setup LiteLLM global configuration""" + self.setup_environment_keys() + + # Configure LiteLLM settings + litellm.drop_params = True + litellm.set_verbose = False + + return self.get_litellm_config() + + def __str__(self) -> str: + return f"ModelConfig(provider={self.provider}, model={self.model})" + + +class ModelManager: + """Manager class to handle model operations across providers""" + + def __init__(self): + self.config = ModelConfig.from_env() + self.litellm_config = self.config.setup_litellm() + print(f"๐ค Initialized {self.config}") + print(f"๐ง Provider: {self.config.provider}") + print(f"๐ง Raw model from env: {self.config.model}") + print( + f"๐ง Final model for LiteLLM: {self.litellm_config.get('model', 'unknown')}" + ) + + async def complete(self, messages: list, **kwargs) -> str: + """Complete a chat using the configured model""" + try: + config = self.config.get_litellm_config().copy() + + # Override config with any passed kwargs (avoid duplicates) + for key, value in kwargs.items(): + if key in [ + "temperature", + "max_tokens", + "top_p", + "frequency_penalty", + "presence_penalty", + ]: + config[key] = value + + # Use async completion + response = await litellm.acompletion(messages=messages, **config) + + return response.choices[0].message.content + + except Exception as e: + print(f"โ Error in model completion: {e}") + raise + + def complete_sync(self, messages: list, **kwargs) -> str: + """Synchronous completion for compatibility""" + try: + config = self.config.get_litellm_config().copy() + + # Override config with any passed kwargs (avoid duplicates) + for key, value in kwargs.items(): + if key in [ + "temperature", + "max_tokens", + "top_p", + "frequency_penalty", + "presence_penalty", + ]: + config[key] = value + + print("๐ง Making sync completion with final config:") + print(f" Model: {config.get('model', 'unknown')}") + print(f" Provider: {self.config.provider}") + print(f" Base URL: {config.get('base_url', 'default')}") + print(f" Temperature: {config.get('temperature')}") + print(f" Max Tokens: {config.get('max_tokens')}") + + response = litellm.completion(messages=messages, **config) + + return response.choices[0].message.content + + except Exception as e: + print(f"โ Error in sync model completion: {e}") + print(f"๐ Failed config keys: {list(config.keys())}") + raise + + def get_model_for_agent(self) -> str: + """Get the model name to use for Agent initialization""" + return self.litellm_config.get("model", self.config.model) + + def validate_setup(self) -> bool: + """Validate that the model configuration is working""" + try: + print(f"๐ Validating {self.config.provider} setup...") + + # Try a simple completion to validate setup + test_response = self.complete_sync( + messages=[{"role": "user", "content": "Hello"}], + max_tokens=20, # Increased to meet minimum requirements + ) + + print(f"โ Model validation successful: {self.config.provider}") + print(f"๐ Model response preview: {test_response[:50]}...") + return True + + except Exception as e: + print(f"โ Model validation failed for {self.config.provider}: {e}") + return False + + +# Global model manager instance +try: + model_manager = ModelManager() +except Exception as e: + print(f"โ Failed to initialize ModelManager: {e}") + print("Please check your .env configuration") + exit(1) + + +# Export commonly used functions +def get_model_name() -> str: + """Get the current model name for agent initialization""" + return model_manager.get_model_for_agent() + + +def validate_model_setup() -> bool: + """Validate the current model setup""" + return model_manager.validate_setup() + + +def get_model_config() -> ModelConfig: + """Get the current model configuration""" + return model_manager.config + + +def get_model_manager() -> ModelManager: + """Get the global model manager instance""" + return model_manager diff --git a/mesh-agents/TokenIntel-TelegramBot/tools.py b/mesh-agents/TokenIntel-TelegramBot/src/tools.py similarity index 51% rename from mesh-agents/TokenIntel-TelegramBot/tools.py rename to mesh-agents/TokenIntel-TelegramBot/src/tools.py index d213bc1..7649595 100644 --- a/mesh-agents/TokenIntel-TelegramBot/tools.py +++ b/mesh-agents/TokenIntel-TelegramBot/src/tools.py @@ -83,7 +83,8 @@ async def wrapper(self, *args, **kwargs): } print(f"\n๐ง Preparing POST call for tool: {func.__name__}") - print(f"๐ค Request Payload:\n{json}\n") + print(f"๐ค Agent: {self.__class__.__name__}") + print(f"๐ฏ Tool arguments: {str(payload)[:200]}...") return await self.request(method="POST", json=json) @@ -96,6 +97,10 @@ class HeuAgentApiWrapper: heu_key = os.environ.get("HEURIST_KEY") timeout = 10 + def __init__(self): + if not self.heu_key: + raise ValueError("HEURIST_KEY is required in environment variables") + async def request( self, method: Literal["GET", "POST"], @@ -105,37 +110,63 @@ async def request( ) -> Dict[str, Any]: headers = {"Content-Type": "application/json"} - async with httpx.AsyncClient() as client: + async with httpx.AsyncClient(timeout=self.timeout) as client: try: + print(f"๐ Making {method} request to Heurist API...") + response = await client.request( method=method, url=self.base_url, params=params, json=json, headers=headers, - timeout=self.timeout, **kwargs, ) - print(f"Response Status Code: {response.status_code}") - print(f"๐จ Response Content:\n{response.text}\n") + print(f"๐ Response Status: {response.status_code}") if response.status_code == 200: - return response.json() + result = response.json() + print(f"โ Request successful - received {len(str(result))} chars") + return result else: + error_text = response.text[:500] # Limit error text length + print(f"โ API Error: {response.status_code}") + print(f"๐ Error details: {error_text}") raise HeuristAPIError( - message="Something went wrong with the Heurist API", + message=f"Heurist API error: {response.status_code}", status_code=response.status_code, - response=response.text, + response=error_text, ) + except httpx.TimeoutException: + print(f"โฐ Request timeout after {self.timeout}s") + raise HeuristAPIError( + message=f"Request timeout after {self.timeout} seconds", + status_code=408, + ) + except httpx.RequestError as e: + print(f"๐ Network error: {str(e)}") + raise HeuristAPIError( + message=f"Network error: {str(e)}", + status_code=500, + ) except Exception as e: - print(f"Unknown error occurred: {str(e)}\n") - raise + print(f"๐ฅ Unexpected error: {str(e)}") + raise HeuristAPIError( + message=f"Unexpected error: {str(e)}", + status_code=500, + ) # ---------- Agent Classes ---------- class ExaSearchAgent(HeuAgentApiWrapper): + """Agent for web search and question answering""" + + def __init__(self): + super().__init__() + self.timeout = 30 + @post async def exa_web_search(self, args: WebSearch): return args @@ -146,7 +177,11 @@ async def exa_answer_question(self, args: WebSearch): class ElfaTwitterIntelligenceAgent(HeuAgentApiWrapper): - timeout = 30 + """Agent for Twitter intelligence and social media analysis""" + + def __init__(self): + super().__init__() + self.timeout = 30 @post async def search_mentions(self, args: SearchMentions): @@ -162,7 +197,11 @@ async def get_trending_tokens(self, args: TrendingTokens): class FirecrawlSearchAgent(HeuAgentApiWrapper): - timeout = 200 + """Agent for web crawling and data extraction""" + + def __init__(self): + super().__init__() + self.timeout = 200 # Web crawling can take longer @post async def firecrawl_web_search(self, args: WebSearch): @@ -174,7 +213,11 @@ async def firecrawl_extract_web_data(self, args: WebDataExtraction): class MetaSleuthSolTokenWalletClusterAgent(HeuAgentApiWrapper): - timeout = 100 + """Agent for analyzing Solana token wallet clusters""" + + def __init__(self): + super().__init__() + self.timeout = 100 @post async def fetch_token_clusters(self, args: SearchTokenAddressClusters): @@ -182,7 +225,11 @@ async def fetch_token_clusters(self, args: SearchTokenAddressClusters): class SolWalletAgent(HeuAgentApiWrapper): - timeout = 120 + """Agent for Solana wallet analysis and holder insights""" + + def __init__(self): + super().__init__() + self.timeout = 120 @post async def analyze_common_holdings_of_top_holders(self, args: ScanTokenAddress): @@ -190,6 +237,12 @@ async def analyze_common_holdings_of_top_holders(self, args: ScanTokenAddress): class TwitterInsightAgent(HeuAgentApiWrapper): + """Agent for advanced Twitter insights and analytics""" + + def __init__(self): + super().__init__() + self.timeout = 60 + @post async def get_smart_followers_history(self, args: SmartFollowers): return args @@ -197,3 +250,49 @@ async def get_smart_followers_history(self, args: SmartFollowers): @post async def get_smart_mentions_feed(self, args: SmartMentions): return {"username": args["username"], "limit": str(args["limit"])} + + +# ---------- Utility Functions ---------- +def validate_tools_configuration(): + """Validate that all required environment variables are set""" + heurist_key = os.environ.get("HEURIST_KEY") + + if not heurist_key: + raise ValueError( + "HEURIST_KEY is required for intelligence tools. " + "Get your API key from https://heurist.xyz" + ) + + print("โ Tools configuration validated") + return True + + +def get_available_tools(): + """Get list of available intelligence tools""" + return [ + "ElfaTwitterIntelligenceAgent - Twitter sentiment and social media analysis", + "ExaSearchAgent - Web search and question answering", + "FirecrawlSearchAgent - Website crawling and data extraction", + "MetaSleuthSolTokenWalletClusterAgent - Solana token wallet cluster analysis", + "SolWalletAgent - Solana wallet analysis and holder insights", + "TwitterInsightAgent - Advanced Twitter analytics and insights", + ] + + +# Initialize and validate tools on import +if __name__ == "__main__": + try: + validate_tools_configuration() + tools = get_available_tools() + print("\n๐ ๏ธ Available Intelligence Tools:") + for i, tool in enumerate(tools, 1): + print(f"{i}. {tool}") + except Exception as e: + print(f"โ Tools validation failed: {e}") +else: + # Validate when imported + try: + validate_tools_configuration() + except Exception as e: + print(f"โ ๏ธ Tools configuration warning: {e}") + print("Some intelligence features may not work properly") diff --git a/mesh-agents/TokenIntel-TelegramBot/test_config.py b/mesh-agents/TokenIntel-TelegramBot/test_config.py new file mode 100644 index 0000000..083e375 --- /dev/null +++ b/mesh-agents/TokenIntel-TelegramBot/test_config.py @@ -0,0 +1,98 @@ +import sys +import os + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "src"))) + +import litellm +from dotenv import load_dotenv +from src.model_config import ModelConfig + + +def test_provider_config(): + """Test the provider configuration logic""" + load_dotenv() + provider = os.environ.get("MODEL_PROVIDER") + model = os.environ.get("MODEL") + api_key = os.environ.get("API_KEY") + print("๐ Testing configuration:") + print(f" Provider: {provider}") + print(f" Model: {model}") + print(f" API Key: {api_key[:10]}..." if api_key else "None") + print() + + # Test the model configuration + try: + config = ModelConfig.from_env() + print("โ ModelConfig created successfully") + print(f" Provider: {config.provider}") + print(f" Model: {config.model}") + print() + litellm_model = config.get_litellm_model_name() + print(f"๐ง LiteLLM model name: {litellm_model}") + config.setup_environment_keys() + print(f"๐ Environment keys set for {config.provider}") + env_keys = [] + for key in [ + "OPENAI_API_KEY", + "ANTHROPIC_API_KEY", + "OPENROUTER_API_KEY", + "XAI_API_KEY", + ]: + if os.environ.get(key): + env_keys.append(key) + print(f"๐ Active environment keys: {env_keys}") + + litellm_config = config.get_litellm_config() + print("โ๏ธ LiteLLM config:") + for key, value in litellm_config.items(): + if key == "api_key": + print(f" {key}: {str(value)[:10]}...") + else: + print(f" {key}: {value}") + + return True + + except Exception as e: + print(f"โ Error: {e}") + return False + + +def test_simple_call(): + """Test a simple LiteLLM call""" + print("\n๐งช Testing simple LiteLLM call...") + + try: + config = ModelConfig.from_env() + litellm_config = config.get_litellm_config() + litellm.set_verbose = True + + print("๐ง Making test call with final config:") + for key, value in litellm_config.items(): + print(f" {key}: {value}") + test_config = litellm_config.copy() + test_config["max_tokens"] = 20 # Increased from 5 to meet OpenAI minimum + + response = litellm.completion( + messages=[{"role": "user", "content": "Say 'OK'"}], **test_config + ) + + print(f"โ Success! Response: {response.choices[0].message.content}") + return True + + except Exception as e: + print(f"โ LiteLLM call failed: {e}") + return False + + +if __name__ == "__main__": + print("๐ Provider Configuration Debug Test") + print("=" * 50) + + if test_provider_config(): + test_simple_call() + + print("\n" + "=" * 50) + print("๐ก If you're still getting errors:") + print("1. Check that MODEL doesn't have provider prefix for Heurist") + print("2. Verify your Heurist API key is correct") + print("3. Try a different model name like 'meta-llama/llama-3.1-70b-instruct'")