From c0793578e0828390344e76ce20eb74f5417f2ad1 Mon Sep 17 00:00:00 2001 From: Ahmad Wilson Date: Tue, 20 Jan 2026 14:21:59 -0600 Subject: [PATCH 1/3] fix: add missing cli v2 parser src --- Makefile | 2 +- src/binary_finder.py | 344 +++++++++++++++++++++++++++ src/command_executor.py | 285 ++++++++++++++++++++++ src/discover_binaries.py | 496 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 1126 insertions(+), 1 deletion(-) create mode 100755 src/binary_finder.py create mode 100755 src/command_executor.py create mode 100755 src/discover_binaries.py diff --git a/Makefile b/Makefile index 6890816..4c17a5e 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ run_script: clean: find . -type d -name "__pycache__" -exec rm -r {} + - rm -rf build dist *.egg-info result.json + rm -rf build dist *.egg-info help: @echo "Makefile targets:" diff --git a/src/binary_finder.py b/src/binary_finder.py new file mode 100755 index 0000000..ade9add --- /dev/null +++ b/src/binary_finder.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python3 +""" +Binary Finder Module - Locate binaries in Docker containers and host systems + +Features: +- Efficient single-pass filesystem search using find command +- Substring matching for tool name discovery +- Caching to avoid repeated scans +- 2-minute timeout for large container images +""" +import os +import re +import subprocess +from typing import Optional, List, Tuple, Set +from pathlib import Path + + +class BinaryFinder: + """Find binaries in Docker containers and host systems""" + + # Cache for container scans to avoid repeated searches + _container_cache = {} + + # Timeout for container operations (2 minutes) + CONTAINER_TIMEOUT = 120 + + @staticmethod + def find_on_host(binary_name: str) -> Tuple[Optional[str], str]: + """ + Find binary on host system + + Args: + binary_name: Name of binary to find + + Returns: + Tuple of (binary_path, discovery_method) or (None, "not_found") + """ + # Try which command first (fastest) + try: + result = subprocess.run( + ['which', binary_name], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip(), "which" + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + # Try common locations + common_paths = [ + '/usr/bin', + '/usr/local/bin', + '/bin', + '/opt/bin', + os.path.expanduser('~/.local/bin') + ] + + for path in common_paths: + binary_path = Path(path) / binary_name + if binary_path.exists() and os.access(binary_path, os.X_OK): + return str(binary_path), "common_path" + + return None, "not_found" + + @staticmethod + def find_in_container(docker_image: str, binary_name: str) -> Tuple[Optional[str], str]: + """ + Find binary in Docker container + + Args: + docker_image: Full Docker image name (e.g., "alpine:latest") + binary_name: Name of binary to find + + Returns: + Tuple of (binary_path, discovery_method) or (None, "not_found") + """ + # Try which command in container first (fastest) + try: + result = subprocess.run( + ['docker', 'run', '--rm', '--entrypoint', 'which', docker_image, binary_name], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip(), "which_in_container" + except subprocess.TimeoutExpired: + pass + + # Try command -v (works in more minimal containers) + try: + result = subprocess.run( + ['docker', 'run', '--rm', '--entrypoint', 'sh', docker_image, + '-c', f'command -v {binary_name}'], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0 and result.stdout.strip(): + return result.stdout.strip(), "command_v" + except subprocess.TimeoutExpired: + pass + + return None, "not_found" + + @staticmethod + def discover_all_executables(docker_image: str, use_cache: bool = True) -> List[str]: + """ + Discover all executable files in a Docker container using efficient find command + + Args: + docker_image: Full Docker image name + use_cache: Whether to use cached results + + Returns: + List of executable file paths found in container + """ + # Check cache first + if use_cache and docker_image in BinaryFinder._container_cache: + return BinaryFinder._container_cache[docker_image] + + print(f" → Scanning container filesystem for executables (timeout: {BinaryFinder.CONTAINER_TIMEOUT}s)...") + + # Efficient find command from root, excluding pseudo-filesystems + find_cmd = [ + 'docker', 'run', '--rm', '--entrypoint', 'sh', docker_image, + '-c', + 'find / -type f -executable ' + '-not -path "/proc/*" ' + '-not -path "/sys/*" ' + '-not -path "/dev/*" ' + '-not -path "/var/*" ' + '-not -path "*/.git/*" ' + '2>/dev/null || true' + ] + + try: + result = subprocess.run( + find_cmd, + capture_output=True, + text=True, + timeout=BinaryFinder.CONTAINER_TIMEOUT + ) + + if result.returncode in [0, 1]: # 0 = success, 1 = some files not found (OK) + executables = [line.strip() for line in result.stdout.split('\n') if line.strip()] + print(f" → Found {len(executables)} executables") + + # Cache results + BinaryFinder._container_cache[docker_image] = executables + return executables + else: + print(f" → Find command failed with exit code {result.returncode}") + return [] + + except subprocess.TimeoutExpired: + print(f" → Timeout after {BinaryFinder.CONTAINER_TIMEOUT}s (large image or slow filesystem)") + return [] + except Exception as e: + print(f" → Error scanning container: {e}") + return [] + + @staticmethod + def generate_candidates(tool_name: str, min_length: int = 3) -> Set[str]: + """ + Generate candidate binary names by simply splitting tool name into words + + Args: + tool_name: Tool name from config (e.g., "Apache Drill" or "kubectl") + min_length: Minimum word length to consider + + Returns: + Set of words to search for in binary names + """ + # Normalize and split into words + normalized = tool_name.lower() + words = re.split(r'[\s\-_]+', normalized) + + # Remove common prefix words that aren't part of binary names + ignore_words = {'the', 'a', 'an', 'apache', 'project', 'foundation'} + words = [w for w in words if w and w not in ignore_words and len(w) >= min_length] + + return set(words) + + @staticmethod + def verify_executable_responds_to_help(binary_path: str, docker_image: str) -> bool: + """ + Quick check if binary responds to basic help commands + + Args: + binary_path: Path to binary + docker_image: Docker image to test in + + Returns: + True if binary responds to --help, -h, or help + """ + import subprocess + + binary_name = os.path.basename(binary_path) + + # Try quick help variations (2 second timeout each) + help_variations = ['--help', '-h', 'help'] + + for help_arg in help_variations: + try: + result = subprocess.run( + ['docker', 'run', '--rm', docker_image, help_arg], + capture_output=True, + text=True, + timeout=2 + ) + + output = (result.stdout + result.stderr).strip() + + # Check if we got any meaningful output + if len(output) > 50 and result.returncode in [0, 1]: + return True + + except (subprocess.TimeoutExpired, Exception): + continue + + return False + + @staticmethod + def match_executables_to_candidates( + executables: List[str], + candidates: Set[str], + docker_image: Optional[str] = None, + verify_help: bool = False + ) -> List[Tuple[str, str, float]]: + """ + Match executables that contain any of the candidate words + + Args: + executables: List of full paths to executables + candidates: Set of words to search for + docker_image: Optional Docker image for verification + verify_help: If True, verify executable responds to help commands + + Returns: + List of (exe_path, match_type, confidence) tuples, sorted by match quality + """ + matches = [] + + # Skip common system binaries and script files + skip_binaries = {'sh', 'bash', 'ls', 'cat', 'echo', 'true', 'false', 'test', 'id', 'tr', 'ar', 'as'} + skip_extensions = {'.js', '.ts', '.d.ts', '.json', '.py', '.rb', '.pl', '.sh', '.txt', '.md', '.xml', '.html'} + + for exe_path in executables: + exe_name = os.path.basename(exe_path) + + # Skip system binaries and scripts + if exe_name in skip_binaries: + continue + if any(exe_name.endswith(ext) for ext in skip_extensions): + continue + + # Check if any candidate word is in the executable name + for word in candidates: + if word in exe_name: + # Simple confidence based on match quality + if exe_name == word: + confidence = 1.0 + match_type = 'exact' + elif exe_name.startswith(word): + confidence = 0.9 + match_type = 'starts_with' + else: + confidence = 0.7 + match_type = 'contains' + + # Bonus for binaries in standard locations + if '/usr/bin/' in exe_path or '/usr/local/bin/' in exe_path: + confidence = min(confidence + 0.1, 1.0) + + # Optional: Verify it responds to help (quick check) + if verify_help and docker_image: + if not BinaryFinder.verify_executable_responds_to_help(exe_path, docker_image): + # Penalize if it doesn't respond to help + confidence *= 0.5 + + matches.append((exe_path, match_type, confidence)) + break # Only count each executable once + + # Sort by confidence (highest first), then by path length (shorter preferred) + matches.sort(key=lambda x: (-x[2], len(x[0]))) + + return matches + + @staticmethod + def discover_binaries_for_tool(docker_image: str, tool_name: str) -> List[Tuple[str, str, float]]: + """ + Main discovery method - find all matching binaries for a tool + + Args: + docker_image: Docker image to search in + tool_name: Name of the tool + + Returns: + List of (binary_path, match_type, confidence) tuples + """ + print(f"\n Discovering binaries for: {tool_name}") + print(f" Image: {docker_image}") + + # Generate candidates + candidates = BinaryFinder.generate_candidates(tool_name) + print(f" → Generated {len(candidates)} candidates: {', '.join(sorted(candidates)[:10])}{'...' if len(candidates) > 10 else ''}") + + # Try quick direct lookup first + for candidate in sorted(candidates, key=len, reverse=True)[:5]: # Try top 5 most likely + binary_path, method = BinaryFinder.find_in_container(docker_image, candidate) + if binary_path: + print(f" → Quick match found: {binary_path} (method: {method})") + return [(binary_path, method, 1.0)] + + # Fall back to full filesystem scan + print(f" → Quick lookup failed, performing full scan...") + executables = BinaryFinder.discover_all_executables(docker_image) + + if not executables: + print(f" → No executables found in container") + return [] + + # Match executables to candidates (with help verification) + matches = BinaryFinder.match_executables_to_candidates( + executables, + candidates, + docker_image=docker_image, + verify_help=True # Enable help verification + ) + + if matches: + print(f" → Found {len(matches)} matches") + # Show top 5 matches + for exe_path, match_type, confidence in matches[:5]: + print(f" • {os.path.basename(exe_path)} ({match_type}, confidence: {confidence:.2f})") + else: + print(f" → No matches found") + + return matches + + diff --git a/src/command_executor.py b/src/command_executor.py new file mode 100755 index 0000000..9fa510d --- /dev/null +++ b/src/command_executor.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +""" +Command Executor Module - Execute commands in Docker containers with fallback strategies + +Features: +- Multiple help command variations (--help, -h, help, etc.) +- Version detection with fallbacks +- 2-minute timeout for slow commands +- Error handling and validation +""" +import subprocess +from typing import Optional, Dict, List, Tuple + + +class CommandExecutor: + """Execute commands in Docker containers and on host""" + + # Timeout for command execution (2 minutes) + COMMAND_TIMEOUT = 120 + + # Help command variations to try (in order of preference) + HELP_VARIATIONS = [ + ['--help'], + ['-h'], + ['help'], + ['-help'], + ['--usage'], + [], # No args - some tools print help by default + ] + + # Version command variations to try + VERSION_VARIATIONS = [ + ['--version'], + ['-v'], + ['version'], + ['-version'], + ['--v'], + ] + + @staticmethod + def execute_command( + binary_path: str, + args: List[str], + docker_image: Optional[str] = None, + timeout: int = None + ) -> Tuple[int, str, str]: + """ + Execute a command and return result + + Args: + binary_path: Path to binary + args: Command arguments + docker_image: Optional Docker image to run in + timeout: Timeout in seconds (default: COMMAND_TIMEOUT) + + Returns: + Tuple of (exit_code, stdout, stderr) + """ + if timeout is None: + timeout = CommandExecutor.COMMAND_TIMEOUT + + if docker_image: + # Run in Docker container + # For containers, we need to check if the binary is the entrypoint + # If it is, don't repeat it; if not, pass it explicitly + import os + binary_name = os.path.basename(binary_path) + + # Try without specifying binary (assumes it's in entrypoint) + cmd = ['docker', 'run', '--rm', docker_image] + args + else: + # Run on host + cmd = [binary_path] + args + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=timeout + ) + return result.returncode, result.stdout, result.stderr + + except subprocess.TimeoutExpired: + return -1, '', f'Timeout after {timeout}s' + except FileNotFoundError: + return -1, '', f'Binary not found: {binary_path}' + except Exception as e: + return -1, '', str(e) + + @staticmethod + def is_valid_help_output(output: str, exit_code: int) -> bool: + """ + Check if output looks like valid help text + + Args: + output: Command output (stdout + stderr) + exit_code: Exit code from command + + Returns: + True if output appears to be valid help text + """ + # Exit code check - help commands typically return 0 or 1 + if exit_code not in [0, 1, 2]: + return False + + # Must have reasonable amount of content + if len(output) < 50: + return False + + # Check for help indicators (case-insensitive) + output_lower = output.lower() + help_indicators = [ + 'usage:', 'usage :', 'usage:', + 'options:', 'flags:', + 'commands:', 'subcommands:', + 'help', 'examples:', + 'arguments:', + 'synopsis', + 'description:', + ] + + has_indicator = any(indicator in output_lower for indicator in help_indicators) + + # Or has option-like patterns (--something or -x) + has_options = ('--' in output or ' -' in output) + + return has_indicator or has_options + + @staticmethod + def execute_help( + binary_path: str, + parent_command: Optional[str] = None, + docker_image: Optional[str] = None + ) -> Optional[str]: + """ + Execute help command with fallback strategies + + Args: + binary_path: Path to binary + parent_command: Optional parent command (e.g., "config" for "git config") + docker_image: Optional Docker image to run in + + Returns: + Help text or None if all attempts fail + """ + # Build command parts + if parent_command: + # For subcommands like "git config --help" + cmd_parts = parent_command.split() + else: + cmd_parts = [] + + # Try each help variation + for help_args in CommandExecutor.HELP_VARIATIONS: + full_args = cmd_parts + help_args + + exit_code, stdout, stderr = CommandExecutor.execute_command( + binary_path, + full_args, + docker_image, + timeout=10 # Shorter timeout for help commands (reduced from 30s) + ) + + # Combine stdout and stderr (some tools print help to stderr) + output = stdout + '\n' + stderr + output = output.strip() + + if CommandExecutor.is_valid_help_output(output, exit_code): + return output + + return None + + @staticmethod + def execute_version( + binary_path: str, + docker_image: Optional[str] = None + ) -> Optional[str]: + """ + Execute version command with fallback strategies + + Args: + binary_path: Path to binary + docker_image: Optional Docker image to run in + + Returns: + Version text or None if all attempts fail + """ + # Try each version variation + for version_args in CommandExecutor.VERSION_VARIATIONS: + exit_code, stdout, stderr = CommandExecutor.execute_command( + binary_path, + version_args, + docker_image, + timeout=10 # Shorter timeout for version commands (reduced from 30s) + ) + + # Combine stdout and stderr + output = stdout + '\n' + stderr + output = output.strip() + + # Version output typically has version numbers + if output and (exit_code in [0, 1]) and len(output) > 0: + # Check if it looks like version output (has numbers) + import re + if re.search(r'\d+\.\d+', output): + return output + + return None + + @staticmethod + def test_help_variations( + binary_path: str, + docker_image: Optional[str] = None + ) -> Dict[str, any]: + """ + Test all help command variations and return detailed results + + Args: + binary_path: Path to binary + docker_image: Optional Docker image to run in + + Returns: + Dict with test results for each variation + """ + results = { + 'binary': binary_path, + 'image': docker_image, + 'tests': [], + 'working_commands': [], + 'best_command': None + } + + for help_args in CommandExecutor.HELP_VARIATIONS: + exit_code, stdout, stderr = CommandExecutor.execute_command( + binary_path, + help_args, + docker_image, + timeout=30 + ) + + output = stdout + '\n' + stderr + output = output.strip() + + is_valid = CommandExecutor.is_valid_help_output(output, exit_code) + + cmd_str = ' '.join(help_args) if help_args else '(no args)' + + test_result = { + 'command': cmd_str, + 'args': help_args, + 'exit_code': exit_code, + 'output_length': len(output), + 'is_valid': is_valid + } + + results['tests'].append(test_result) + + if is_valid: + results['working_commands'].append(cmd_str) + if not results['best_command']: + results['best_command'] = cmd_str + + return results + + @staticmethod + def verify_binary( + binary_path: str, + docker_image: Optional[str] = None + ) -> bool: + """ + Verify that a binary exists and can be executed + + Args: + binary_path: Path to binary + docker_image: Optional Docker image to run in + + Returns: + True if binary can be executed + """ + # Try to get help text + help_text = CommandExecutor.execute_help(binary_path, docker_image=docker_image) + return help_text is not None and len(help_text) > 50 + + diff --git a/src/discover_binaries.py b/src/discover_binaries.py new file mode 100755 index 0000000..d74fd15 --- /dev/null +++ b/src/discover_binaries.py @@ -0,0 +1,496 @@ +#!/usr/bin/env python3 +""" +Binary Discovery Tool - Discover and validate binaries for CLI tools in Docker images + +This tool processes CLI tools that don't have binary names defined and attempts to: +1. Discover executables in their Docker images using efficient find from root (/) +2. Match executables to tool names using substring matching +3. Test help commands to verify binaries work +4. Update configuration with discovered binaries + +Features: +- Efficient single-pass filesystem search with 2-minute timeout +- Substring matching for fuzzy binary name discovery +- Validation that help commands work +- Dry-run mode for testing +- Detailed progress reporting +""" +import argparse +import json +import sys +import os +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Optional +import subprocess + +# Add src to path +sys.path.insert(0, os.path.dirname(__file__)) + +from binary_finder import BinaryFinder +from command_executor import CommandExecutor + + +class BinaryDiscoveryTool: + """Discover binaries for tools without binary names""" + + def __init__(self, config_file: str, dry_run: bool = False): + """ + Initialize the discovery tool + + Args: + config_file: Path to cli_tools.json config file + dry_run: If True, don't update config file + """ + self.config_file = Path(config_file) + self.dry_run = dry_run + self.config = None + self.tools = [] + + self.stats = { + 'total': 0, + 'success': 0, + 'partial': 0, # Found binary but help doesn't work + 'failed': 0, + 'skipped': 0, + 'start_time': datetime.now() + } + + def log(self, message: str, level: str = "INFO"): + """Log message with timestamp""" + timestamp = datetime.now().strftime("%H:%M:%S") + print(f"[{timestamp}] {level}: {message}") + + def load_config(self) -> bool: + """Load configuration file""" + try: + with open(self.config_file, 'r') as f: + self.config = json.load(f) + + self.tools = self.config.get('cli_tools', []) + self.log(f"Loaded {len(self.tools)} tools from config") + return True + + except Exception as e: + self.log(f"Failed to load config: {e}", "ERROR") + return False + + def save_config(self): + """Save updated configuration file""" + if self.dry_run: + self.log("Dry-run mode: skipping config save", "INFO") + return + + try: + # Create backup + backup_path = self.config_file.with_suffix('.json.backup') + with open(backup_path, 'w') as f: + json.dump(self.config, f, indent=2) + self.log(f"Created backup: {backup_path}") + + # Save updated config + with open(self.config_file, 'w') as f: + json.dump(self.config, f, indent=2) + self.log(f"Updated config saved: {self.config_file}") + + except Exception as e: + self.log(f"Failed to save config: {e}", "ERROR") + + def check_docker_available(self) -> bool: + """Check if Docker is available""" + try: + result = subprocess.run( + ['docker', 'version'], + capture_output=True, + timeout=10 + ) + return result.returncode == 0 + except Exception: + return False + + def pull_image(self, image: str) -> bool: + """ + Pull Docker image if not available locally + + Args: + image: Full image name (e.g., "alpine:latest") + + Returns: + True if image is available + """ + self.log(f" Checking image availability: {image}") + + # Check if image exists locally + try: + result = subprocess.run( + ['docker', 'image', 'inspect', image], + capture_output=True, + timeout=10 + ) + if result.returncode == 0: + self.log(f" → Image available locally") + return True + except Exception: + pass + + # Try to pull image + self.log(f" → Pulling image (this may take a while)...") + try: + result = subprocess.run( + ['docker', 'pull', image], + capture_output=True, + text=True, + timeout=300 # 5 minutes for image pull + ) + if result.returncode == 0: + self.log(f" → Image pulled successfully") + return True + else: + self.log(f" → Failed to pull image: {result.stderr[:200]}", "WARN") + return False + except subprocess.TimeoutExpired: + self.log(f" → Timeout pulling image", "WARN") + return False + except Exception as e: + self.log(f" → Error pulling image: {e}", "WARN") + return False + + def discover_for_tool(self, tool: Dict) -> Optional[Dict]: + """ + Discover binaries for a single tool + + Args: + tool: Tool dictionary from config + + Returns: + Discovery result dict or None if failed + """ + tool_name = tool.get('name', 'Unknown') + self.log(f"\n{'='*80}") + self.log(f"Processing: {tool_name}") + self.log(f"{'='*80}") + + # Check if tool has image_repo + image_repo = tool.get('image_repo', {}) + if not image_repo or not image_repo.get('image'): + self.log(f" No image_repo defined, skipping", "WARN") + return None + + # Get image and tag + image_base = image_repo['image'] + image_tags = tool.get('image_tags', ['latest']) + image_tag = 'latest' if 'latest' in image_tags else image_tags[0] + docker_image = f"{image_base}:{image_tag}" + + self.log(f" Image: {docker_image}") + + # Check image availability + if not self.pull_image(docker_image): + return { + 'status': 'failed', + 'reason': 'image_unavailable', + 'image': docker_image + } + + # Discover binaries + matches = BinaryFinder.discover_binaries_for_tool(docker_image, tool_name) + + if not matches: + return { + 'status': 'failed', + 'reason': 'no_binaries_found', + 'image': docker_image + } + + # Test each match to find working binaries + working_binaries = [] + consecutive_failures = 0 + max_consecutive_failures = 3 # Stop after 3 consecutive failures + min_confidence = 0.40 # Minimum confidence to consider (lowered to catch more valid binaries) + + for binary_path, match_type, confidence in matches[:10]: # Test top 10 matches + # Skip low confidence matches + if confidence < min_confidence: + self.log(f" Skipping: {os.path.basename(binary_path)} (confidence {confidence:.2f} below threshold {min_confidence})") + consecutive_failures += 1 + if consecutive_failures >= max_consecutive_failures: + break + continue + binary_name = os.path.basename(binary_path) + self.log(f" Testing: {binary_name} (confidence: {confidence:.2f}, type: {match_type})") + + # Test help command + test_results = CommandExecutor.test_help_variations(binary_path, docker_image) + + if test_results['working_commands']: + self.log(f" ✓ Working help commands: {', '.join(test_results['working_commands'])}") + working_binaries.append({ + 'path': binary_path, + 'name': binary_name, + 'confidence': confidence, + 'match_type': match_type, + 'help_command': test_results['best_command'], + 'working_commands': test_results['working_commands'] + }) + consecutive_failures = 0 # Reset counter on success + else: + self.log(f" ✗ No working help commands found") + consecutive_failures += 1 + + # Stop early if too many consecutive failures + if consecutive_failures >= max_consecutive_failures: + self.log(f" → Stopping after {consecutive_failures} consecutive failures") + break + + if not working_binaries: + return { + 'status': 'partial', + 'reason': 'binary_found_no_help', + 'image': docker_image, + 'binaries_tested': len(matches[:10]) + } + + # Select primary binary (highest confidence with working help) + primary = working_binaries[0] + alternates = [b['name'] for b in working_binaries[1:] if b['name'] != primary['name']] + + self.log(f"\n ✓ Discovery successful!") + self.log(f" Primary binary: {primary['name']}") + self.log(f" Help command: {primary['help_command']}") + if alternates: + self.log(f" Alternate binaries: {', '.join(alternates[:5])}") + + return { + 'status': 'success', + 'binary': primary['name'], + 'alternate_binaries': alternates[:5], # Limit to 5 alternates + 'help_command': primary['help_command'], + 'confidence': primary['confidence'], + 'match_type': primary['match_type'], + 'image': docker_image, + 'discovery_metadata': { + 'discovered_at': datetime.now().isoformat(), + 'method': primary['match_type'], + 'confidence': primary['confidence'], + 'verified': True + } + } + + def update_tool_config(self, tool: Dict, discovery_result: Dict): + """ + Update tool configuration with discovery results + + Args: + tool: Original tool dict + discovery_result: Discovery result dict + """ + if discovery_result['status'] == 'success': + tool['binary'] = discovery_result['binary'] + tool['alternate_binaries'] = discovery_result['alternate_binaries'] + + # Add discovery metadata to notes + if 'notes' not in tool: + tool['notes'] = '' + + note = f"Binary discovered automatically on {datetime.now().strftime('%Y-%m-%d')} " \ + f"(method: {discovery_result['match_type']}, confidence: {discovery_result['confidence']:.2f})" + + if tool['notes']: + tool['notes'] += f" | {note}" + else: + tool['notes'] = note + + def process_tools( + self, + only_tools: Optional[List[str]] = None, + category: Optional[str] = None, + limit: Optional[int] = None + ): + """ + Process tools and discover binaries + + Args: + only_tools: If set, only process these specific tools + category: If set, only process tools in this category + limit: If set, limit number of tools to process + """ + self.log("="*80) + self.log("CLI Binary Discovery Tool") + self.log("="*80) + self.log(f"Config: {self.config_file}") + self.log(f"Dry-run: {self.dry_run}") + self.log(f"Timeout: 2 minutes per container scan") + self.log("="*80) + + # Check Docker + if not self.check_docker_available(): + self.log("Docker not available! Please install Docker.", "ERROR") + return + + self.log("✓ Docker is available") + + # Load config + if not self.load_config(): + return + + # Filter tools + filtered_tools = [] + for tool in self.tools: + # Skip if has binary already + if tool.get('binary'): + continue + + # Filter by category + if category and tool.get('category') != category: + continue + + # Filter by name + if only_tools and tool.get('name') not in only_tools: + continue + + # Must have image_repo + if not tool.get('image_repo', {}).get('image'): + continue + + filtered_tools.append(tool) + + if not filtered_tools: + self.log("No tools to process after filtering", "WARN") + return + + # Apply limit + if limit: + filtered_tools = filtered_tools[:limit] + + self.log(f"\nProcessing {len(filtered_tools)} tools") + self.log("="*80) + + # Process each tool + results = [] + for i, tool in enumerate(filtered_tools, 1): + self.stats['total'] += 1 + + self.log(f"\n[{i}/{len(filtered_tools)}]") + + result = self.discover_for_tool(tool) + + if result: + results.append({ + 'tool': tool['name'], + **result + }) + + if result['status'] == 'success': + self.stats['success'] += 1 + self.update_tool_config(tool, result) + elif result['status'] == 'partial': + self.stats['partial'] += 1 + else: + self.stats['failed'] += 1 + else: + self.stats['skipped'] += 1 + results.append({ + 'tool': tool['name'], + 'status': 'skipped', + 'reason': 'no_image_repo' + }) + + # Save config if changes were made + if self.stats['success'] > 0: + self.save_config() + + # Print summary + self.print_summary(results) + + def print_summary(self, results: List[Dict]): + """Print processing summary""" + duration = (datetime.now() - self.stats['start_time']).total_seconds() + + self.log("\n" + "="*80) + self.log("DISCOVERY SUMMARY") + self.log("="*80) + self.log(f"Duration: {duration:.1f}s ({duration/60:.1f} minutes)") + self.log(f"Total processed: {self.stats['total']}") + self.log(f"✓ Success: {self.stats['success']}") + self.log(f"⚠ Partial: {self.stats['partial']} (binary found but help doesn't work)") + self.log(f"✗ Failed: {self.stats['failed']}") + self.log(f"⊙ Skipped: {self.stats['skipped']}") + + # Show successful discoveries + successful = [r for r in results if r.get('status') == 'success'] + if successful: + self.log("\n" + "="*80) + self.log(f"SUCCESSFUL DISCOVERIES ({len(successful)})") + self.log("="*80) + for r in successful: + self.log(f" • {r['tool']:40s} → {r['binary']:20s} (confidence: {r.get('confidence', 0):.2f})") + + # Show failures + failed = [r for r in results if r.get('status') in ['failed', 'partial']] + if failed: + self.log("\n" + "="*80) + self.log(f"FAILED/PARTIAL ({len(failed)})") + self.log("="*80) + for r in failed: + reason = r.get('reason', 'unknown') + self.log(f" • {r['tool']:40s} → {reason}") + + success_rate = (self.stats['success'] / self.stats['total'] * 100) if self.stats['total'] > 0 else 0 + self.log("\n" + "="*80) + self.log(f"Success Rate: {success_rate:.1f}%") + self.log("="*80) + + +def main(): + """CLI entry point""" + parser = argparse.ArgumentParser( + description='Discover binaries for CLI tools in Docker images', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Dry run to see what would be discovered + python discover_binaries.py data/configs/cli_tools.json --dry-run + + # Discover and update config + python discover_binaries.py data/configs/cli_tools.json --update + + # Process specific category + python discover_binaries.py data/configs/cli_tools.json --category "Apache" --update + + # Process specific tools + python discover_binaries.py data/configs/cli_tools.json --only "act" "Airflow" --update + + # Limit to first 10 tools + python discover_binaries.py data/configs/cli_tools.json --limit 10 --update + """ + ) + + parser.add_argument('config', help='Path to cli_tools.json config file') + parser.add_argument('--update', action='store_true', help='Update config file with discoveries') + parser.add_argument('--dry-run', action='store_true', help='Dry run (no config updates)') + parser.add_argument('--category', help='Only process tools in this category') + parser.add_argument('--only', nargs='+', help='Only process these specific tools') + parser.add_argument('--limit', type=int, help='Limit number of tools to process') + + args = parser.parse_args() + + # Determine dry-run mode + dry_run = args.dry_run or not args.update + + if not args.update and not args.dry_run: + print("Note: Running in dry-run mode. Use --update to save changes to config.") + print() + + # Create and run discovery tool + discovery = BinaryDiscoveryTool(args.config, dry_run=dry_run) + discovery.process_tools( + only_tools=args.only, + category=args.category, + limit=args.limit + ) + + +if __name__ == '__main__': + main() + + From 07165587b34e05876cc68d5e1189c8b37b3e1c0f Mon Sep 17 00:00:00 2001 From: Ahmad Wilson Date: Tue, 20 Jan 2026 15:07:22 -0600 Subject: [PATCH 2/3] incorporate CR feedback --- src/binary_finder.py | 14 +++++++------- src/command_executor.py | 11 +++-------- src/discover_binaries.py | 10 +++++----- src/process_cli_tools_v2.py | 3 ++- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/src/binary_finder.py b/src/binary_finder.py index ade9add..693ce39 100755 --- a/src/binary_finder.py +++ b/src/binary_finder.py @@ -198,15 +198,14 @@ def verify_executable_responds_to_help(binary_path: str, docker_image: str) -> b """ import subprocess - binary_name = os.path.basename(binary_path) - # Try quick help variations (2 second timeout each) help_variations = ['--help', '-h', 'help'] for help_arg in help_variations: try: + # Use --entrypoint to explicitly specify the binary we want to test result = subprocess.run( - ['docker', 'run', '--rm', docker_image, help_arg], + ['docker', 'run', '--rm', '--entrypoint', binary_path, docker_image, help_arg], capture_output=True, text=True, timeout=2 @@ -250,6 +249,7 @@ def match_executables_to_candidates( for exe_path in executables: exe_name = os.path.basename(exe_path) + exe_name_lower = exe_name.lower() # Skip system binaries and scripts if exe_name in skip_binaries: @@ -257,14 +257,14 @@ def match_executables_to_candidates( if any(exe_name.endswith(ext) for ext in skip_extensions): continue - # Check if any candidate word is in the executable name + # Check if any candidate word is in the executable name (case-insensitive) for word in candidates: - if word in exe_name: + if word in exe_name_lower: # Simple confidence based on match quality - if exe_name == word: + if exe_name_lower == word: confidence = 1.0 match_type = 'exact' - elif exe_name.startswith(word): + elif exe_name_lower.startswith(word): confidence = 0.9 match_type = 'starts_with' else: diff --git a/src/command_executor.py b/src/command_executor.py index 9fa510d..9fa4c9b 100755 --- a/src/command_executor.py +++ b/src/command_executor.py @@ -60,14 +60,9 @@ def execute_command( timeout = CommandExecutor.COMMAND_TIMEOUT if docker_image: - # Run in Docker container - # For containers, we need to check if the binary is the entrypoint - # If it is, don't repeat it; if not, pass it explicitly - import os - binary_name = os.path.basename(binary_path) - - # Try without specifying binary (assumes it's in entrypoint) - cmd = ['docker', 'run', '--rm', docker_image] + args + # Run in Docker container with explicit entrypoint + # Use --entrypoint to specify the binary, ensuring we test the right executable + cmd = ['docker', 'run', '--rm', '--entrypoint', binary_path, docker_image] + args else: # Run on host cmd = [binary_path] + args diff --git a/src/discover_binaries.py b/src/discover_binaries.py index d74fd15..1624796 100755 --- a/src/discover_binaries.py +++ b/src/discover_binaries.py @@ -17,6 +17,7 @@ """ import argparse import json +import shutil import sys import os from pathlib import Path @@ -82,10 +83,9 @@ def save_config(self): return try: - # Create backup + # Create backup of original file (before modifications) backup_path = self.config_file.with_suffix('.json.backup') - with open(backup_path, 'w') as f: - json.dump(self.config, f, indent=2) + shutil.copy(self.config_file, backup_path) self.log(f"Created backup: {backup_path}") # Save updated config @@ -176,9 +176,9 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: self.log(f" No image_repo defined, skipping", "WARN") return None - # Get image and tag + # Get image and tag (handle empty list with `or` fallback) image_base = image_repo['image'] - image_tags = tool.get('image_tags', ['latest']) + image_tags = tool.get('image_tags', ['latest']) or ['latest'] image_tag = 'latest' if 'latest' in image_tags else image_tags[0] docker_image = f"{image_base}:{image_tag}" diff --git a/src/process_cli_tools_v2.py b/src/process_cli_tools_v2.py index 23a6f25..e67a8f4 100644 --- a/src/process_cli_tools_v2.py +++ b/src/process_cli_tools_v2.py @@ -120,7 +120,8 @@ def process_tool(self, tool: Dict, category: str) -> Dict: if 'image_tag' in tool: docker_tag = tool['image_tag'] else: - image_tags = tool.get('image_tags', ['latest']) + # Handle empty list with `or` fallback to prevent IndexError + image_tags = tool.get('image_tags', ['latest']) or ['latest'] docker_tag = 'latest' if 'latest' in image_tags else image_tags[0] docker_image = f"{image_name}:{docker_tag}" From 72e8228f0398f6a38f259e770084cf4aacb6059f Mon Sep 17 00:00:00 2001 From: Ahmad Wilson Date: Tue, 20 Jan 2026 15:32:12 -0600 Subject: [PATCH 3/3] add pre-commit and fix tests --- .pre-commit-config.yaml | 12 + setup.py | 10 +- src/binary_finder.py | 128 +++++---- src/command_executor.py | 225 +++++++-------- src/discover_binaries.py | 155 ++++++----- src/parser.py | 532 ++++++++++++++++++++++-------------- src/parser_v2.py | 379 ------------------------- src/process_cli_tools_v2.py | 349 +++++++++++------------ tests/test_parser.py | 194 +++++++++---- 9 files changed, 887 insertions(+), 1097 deletions(-) create mode 100644 .pre-commit-config.yaml delete mode 100644 src/parser_v2.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..e9a8f86 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,12 @@ +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: trailing-whitespace + - id: end-of-file-fixer + - id: check-yaml + - id: check-merge-conflict + - id: check-added-large-files + - id: check-ast # Python syntax check + - id: debug-statements # No debugger/breakpoint statements + - id: check-docstring-first # Docstring before code diff --git a/setup.py b/setup.py index 920c59e..736981d 100644 --- a/setup.py +++ b/setup.py @@ -1,13 +1,13 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( - name='cli_parser', - version='0.1.0', + name="cli_parser", + version="0.1.0", packages=find_packages(), install_requires=[], entry_points={ - 'console_scripts': [ - 'cli_parser=src.parser:main', + "console_scripts": [ + "cli_parser=src.parser:main", ], }, ) diff --git a/src/binary_finder.py b/src/binary_finder.py index 693ce39..5c53956 100755 --- a/src/binary_finder.py +++ b/src/binary_finder.py @@ -17,21 +17,21 @@ class BinaryFinder: """Find binaries in Docker containers and host systems""" - + # Cache for container scans to avoid repeated searches _container_cache = {} - + # Timeout for container operations (2 minutes) CONTAINER_TIMEOUT = 120 - + @staticmethod def find_on_host(binary_name: str) -> Tuple[Optional[str], str]: """ Find binary on host system - + Args: binary_name: Name of binary to find - + Returns: Tuple of (binary_path, discovery_method) or (None, "not_found") """ @@ -47,7 +47,7 @@ def find_on_host(binary_name: str) -> Tuple[Optional[str], str]: return result.stdout.strip(), "which" except (subprocess.TimeoutExpired, FileNotFoundError): pass - + # Try common locations common_paths = [ '/usr/bin', @@ -56,23 +56,23 @@ def find_on_host(binary_name: str) -> Tuple[Optional[str], str]: '/opt/bin', os.path.expanduser('~/.local/bin') ] - + for path in common_paths: binary_path = Path(path) / binary_name if binary_path.exists() and os.access(binary_path, os.X_OK): return str(binary_path), "common_path" - + return None, "not_found" - + @staticmethod def find_in_container(docker_image: str, binary_name: str) -> Tuple[Optional[str], str]: """ Find binary in Docker container - + Args: docker_image: Full Docker image name (e.g., "alpine:latest") binary_name: Name of binary to find - + Returns: Tuple of (binary_path, discovery_method) or (None, "not_found") """ @@ -88,11 +88,11 @@ def find_in_container(docker_image: str, binary_name: str) -> Tuple[Optional[str return result.stdout.strip(), "which_in_container" except subprocess.TimeoutExpired: pass - + # Try command -v (works in more minimal containers) try: result = subprocess.run( - ['docker', 'run', '--rm', '--entrypoint', 'sh', docker_image, + ['docker', 'run', '--rm', '--entrypoint', 'sh', docker_image, '-c', f'command -v {binary_name}'], capture_output=True, text=True, @@ -102,27 +102,27 @@ def find_in_container(docker_image: str, binary_name: str) -> Tuple[Optional[str return result.stdout.strip(), "command_v" except subprocess.TimeoutExpired: pass - + return None, "not_found" - + @staticmethod def discover_all_executables(docker_image: str, use_cache: bool = True) -> List[str]: """ Discover all executable files in a Docker container using efficient find command - + Args: docker_image: Full Docker image name use_cache: Whether to use cached results - + Returns: List of executable file paths found in container """ # Check cache first if use_cache and docker_image in BinaryFinder._container_cache: return BinaryFinder._container_cache[docker_image] - + print(f" → Scanning container filesystem for executables (timeout: {BinaryFinder.CONTAINER_TIMEOUT}s)...") - + # Efficient find command from root, excluding pseudo-filesystems find_cmd = [ 'docker', 'run', '--rm', '--entrypoint', 'sh', docker_image, @@ -135,7 +135,7 @@ def discover_all_executables(docker_image: str, use_cache: bool = True) -> List[ '-not -path "*/.git/*" ' '2>/dev/null || true' ] - + try: result = subprocess.run( find_cmd, @@ -143,64 +143,64 @@ def discover_all_executables(docker_image: str, use_cache: bool = True) -> List[ text=True, timeout=BinaryFinder.CONTAINER_TIMEOUT ) - + if result.returncode in [0, 1]: # 0 = success, 1 = some files not found (OK) executables = [line.strip() for line in result.stdout.split('\n') if line.strip()] print(f" → Found {len(executables)} executables") - + # Cache results BinaryFinder._container_cache[docker_image] = executables return executables else: print(f" → Find command failed with exit code {result.returncode}") return [] - + except subprocess.TimeoutExpired: print(f" → Timeout after {BinaryFinder.CONTAINER_TIMEOUT}s (large image or slow filesystem)") return [] except Exception as e: print(f" → Error scanning container: {e}") return [] - + @staticmethod - def generate_candidates(tool_name: str, min_length: int = 3) -> Set[str]: + def generate_candidates(tool_name: str, min_length: int = 2) -> Set[str]: """ Generate candidate binary names by simply splitting tool name into words - + Args: tool_name: Tool name from config (e.g., "Apache Drill" or "kubectl") - min_length: Minimum word length to consider - + min_length: Minimum word length to consider (default 2 for short tools like "go", "jq") + Returns: Set of words to search for in binary names """ # Normalize and split into words normalized = tool_name.lower() words = re.split(r'[\s\-_]+', normalized) - + # Remove common prefix words that aren't part of binary names ignore_words = {'the', 'a', 'an', 'apache', 'project', 'foundation'} words = [w for w in words if w and w not in ignore_words and len(w) >= min_length] - + return set(words) - + @staticmethod def verify_executable_responds_to_help(binary_path: str, docker_image: str) -> bool: """ Quick check if binary responds to basic help commands - + Args: binary_path: Path to binary docker_image: Docker image to test in - + Returns: True if binary responds to --help, -h, or help """ import subprocess - + # Try quick help variations (2 second timeout each) help_variations = ['--help', '-h', 'help'] - + for help_arg in help_variations: try: # Use --entrypoint to explicitly specify the binary we want to test @@ -210,53 +210,53 @@ def verify_executable_responds_to_help(binary_path: str, docker_image: str) -> b text=True, timeout=2 ) - + output = (result.stdout + result.stderr).strip() - + # Check if we got any meaningful output if len(output) > 50 and result.returncode in [0, 1]: return True - + except (subprocess.TimeoutExpired, Exception): continue - + return False - + @staticmethod def match_executables_to_candidates( - executables: List[str], + executables: List[str], candidates: Set[str], docker_image: Optional[str] = None, verify_help: bool = False ) -> List[Tuple[str, str, float]]: """ Match executables that contain any of the candidate words - + Args: executables: List of full paths to executables candidates: Set of words to search for docker_image: Optional Docker image for verification verify_help: If True, verify executable responds to help commands - + Returns: List of (exe_path, match_type, confidence) tuples, sorted by match quality """ matches = [] - + # Skip common system binaries and script files skip_binaries = {'sh', 'bash', 'ls', 'cat', 'echo', 'true', 'false', 'test', 'id', 'tr', 'ar', 'as'} skip_extensions = {'.js', '.ts', '.d.ts', '.json', '.py', '.rb', '.pl', '.sh', '.txt', '.md', '.xml', '.html'} - + for exe_path in executables: exe_name = os.path.basename(exe_path) exe_name_lower = exe_name.lower() - + # Skip system binaries and scripts if exe_name in skip_binaries: continue if any(exe_name.endswith(ext) for ext in skip_extensions): continue - + # Check if any candidate word is in the executable name (case-insensitive) for word in candidates: if word in exe_name_lower: @@ -270,67 +270,67 @@ def match_executables_to_candidates( else: confidence = 0.7 match_type = 'contains' - + # Bonus for binaries in standard locations if '/usr/bin/' in exe_path or '/usr/local/bin/' in exe_path: confidence = min(confidence + 0.1, 1.0) - + # Optional: Verify it responds to help (quick check) if verify_help and docker_image: if not BinaryFinder.verify_executable_responds_to_help(exe_path, docker_image): # Penalize if it doesn't respond to help confidence *= 0.5 - + matches.append((exe_path, match_type, confidence)) break # Only count each executable once - + # Sort by confidence (highest first), then by path length (shorter preferred) matches.sort(key=lambda x: (-x[2], len(x[0]))) - + return matches - + @staticmethod def discover_binaries_for_tool(docker_image: str, tool_name: str) -> List[Tuple[str, str, float]]: """ Main discovery method - find all matching binaries for a tool - + Args: docker_image: Docker image to search in tool_name: Name of the tool - + Returns: List of (binary_path, match_type, confidence) tuples """ print(f"\n Discovering binaries for: {tool_name}") print(f" Image: {docker_image}") - + # Generate candidates candidates = BinaryFinder.generate_candidates(tool_name) print(f" → Generated {len(candidates)} candidates: {', '.join(sorted(candidates)[:10])}{'...' if len(candidates) > 10 else ''}") - + # Try quick direct lookup first for candidate in sorted(candidates, key=len, reverse=True)[:5]: # Try top 5 most likely binary_path, method = BinaryFinder.find_in_container(docker_image, candidate) if binary_path: print(f" → Quick match found: {binary_path} (method: {method})") return [(binary_path, method, 1.0)] - + # Fall back to full filesystem scan print(f" → Quick lookup failed, performing full scan...") executables = BinaryFinder.discover_all_executables(docker_image) - + if not executables: print(f" → No executables found in container") return [] - + # Match executables to candidates (with help verification) matches = BinaryFinder.match_executables_to_candidates( - executables, + executables, candidates, docker_image=docker_image, verify_help=True # Enable help verification ) - + if matches: print(f" → Found {len(matches)} matches") # Show top 5 matches @@ -338,7 +338,5 @@ def discover_binaries_for_tool(docker_image: str, tool_name: str) -> List[Tuple[ print(f" • {os.path.basename(exe_path)} ({match_type}, confidence: {confidence:.2f})") else: print(f" → No matches found") - - return matches - + return matches diff --git a/src/command_executor.py b/src/command_executor.py index 9fa4c9b..7eb20d3 100755 --- a/src/command_executor.py +++ b/src/command_executor.py @@ -9,133 +9,128 @@ - Error handling and validation """ import subprocess -from typing import Optional, Dict, List, Tuple +from typing import Dict, List, Optional, Tuple class CommandExecutor: """Execute commands in Docker containers and on host""" - + # Timeout for command execution (2 minutes) COMMAND_TIMEOUT = 120 - + # Help command variations to try (in order of preference) HELP_VARIATIONS = [ - ['--help'], - ['-h'], - ['help'], - ['-help'], - ['--usage'], + ["--help"], + ["-h"], + ["help"], + ["-help"], + ["--usage"], [], # No args - some tools print help by default ] - + # Version command variations to try VERSION_VARIATIONS = [ - ['--version'], - ['-v'], - ['version'], - ['-version'], - ['--v'], + ["--version"], + ["-v"], + ["version"], + ["-version"], + ["--v"], ] - + @staticmethod def execute_command( - binary_path: str, - args: List[str], - docker_image: Optional[str] = None, - timeout: int = None + binary_path: str, args: List[str], docker_image: Optional[str] = None, timeout: int = None ) -> Tuple[int, str, str]: """ Execute a command and return result - + Args: binary_path: Path to binary args: Command arguments docker_image: Optional Docker image to run in timeout: Timeout in seconds (default: COMMAND_TIMEOUT) - + Returns: Tuple of (exit_code, stdout, stderr) """ if timeout is None: timeout = CommandExecutor.COMMAND_TIMEOUT - + if docker_image: # Run in Docker container with explicit entrypoint # Use --entrypoint to specify the binary, ensuring we test the right executable - cmd = ['docker', 'run', '--rm', '--entrypoint', binary_path, docker_image] + args + cmd = ["docker", "run", "--rm", "--entrypoint", binary_path, docker_image] + args else: # Run on host cmd = [binary_path] + args - + try: - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=timeout - ) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) return result.returncode, result.stdout, result.stderr - + except subprocess.TimeoutExpired: - return -1, '', f'Timeout after {timeout}s' + return -1, "", f"Timeout after {timeout}s" except FileNotFoundError: - return -1, '', f'Binary not found: {binary_path}' + return -1, "", f"Binary not found: {binary_path}" except Exception as e: - return -1, '', str(e) - + return -1, "", str(e) + @staticmethod def is_valid_help_output(output: str, exit_code: int) -> bool: """ Check if output looks like valid help text - + Args: output: Command output (stdout + stderr) exit_code: Exit code from command - + Returns: True if output appears to be valid help text """ # Exit code check - help commands typically return 0 or 1 if exit_code not in [0, 1, 2]: return False - + # Must have reasonable amount of content if len(output) < 50: return False - + # Check for help indicators (case-insensitive) output_lower = output.lower() help_indicators = [ - 'usage:', 'usage :', 'usage:', - 'options:', 'flags:', - 'commands:', 'subcommands:', - 'help', 'examples:', - 'arguments:', - 'synopsis', - 'description:', + "usage:", + "usage :", + "usage:", + "options:", + "flags:", + "commands:", + "subcommands:", + "help", + "examples:", + "arguments:", + "synopsis", + "description:", ] - + has_indicator = any(indicator in output_lower for indicator in help_indicators) - + # Or has option-like patterns (--something or -x) - has_options = ('--' in output or ' -' in output) - + has_options = "--" in output or " -" in output + return has_indicator or has_options - + @staticmethod def execute_help( - binary_path: str, - parent_command: Optional[str] = None, - docker_image: Optional[str] = None + binary_path: str, parent_command: Optional[str] = None, docker_image: Optional[str] = None ) -> Optional[str]: """ Execute help command with fallback strategies - + Args: binary_path: Path to binary parent_command: Optional parent command (e.g., "config" for "git config") docker_image: Optional Docker image to run in - + Returns: Help text or None if all attempts fail """ @@ -145,39 +140,33 @@ def execute_help( cmd_parts = parent_command.split() else: cmd_parts = [] - + # Try each help variation for help_args in CommandExecutor.HELP_VARIATIONS: full_args = cmd_parts + help_args - + exit_code, stdout, stderr = CommandExecutor.execute_command( - binary_path, - full_args, - docker_image, - timeout=10 # Shorter timeout for help commands (reduced from 30s) + binary_path, full_args, docker_image, timeout=10 # Shorter timeout for help commands (reduced from 30s) ) - + # Combine stdout and stderr (some tools print help to stderr) - output = stdout + '\n' + stderr + output = (stdout or "") + "\n" + (stderr or "") output = output.strip() - + if CommandExecutor.is_valid_help_output(output, exit_code): return output - + return None - + @staticmethod - def execute_version( - binary_path: str, - docker_image: Optional[str] = None - ) -> Optional[str]: + def execute_version(binary_path: str, docker_image: Optional[str] = None) -> Optional[str]: """ Execute version command with fallback strategies - + Args: binary_path: Path to binary docker_image: Optional Docker image to run in - + Returns: Version text or None if all attempts fail """ @@ -187,94 +176,84 @@ def execute_version( binary_path, version_args, docker_image, - timeout=10 # Shorter timeout for version commands (reduced from 30s) + timeout=10, # Shorter timeout for version commands (reduced from 30s) ) - + # Combine stdout and stderr - output = stdout + '\n' + stderr + output = stdout + "\n" + stderr output = output.strip() - + # Version output typically has version numbers if output and (exit_code in [0, 1]) and len(output) > 0: # Check if it looks like version output (has numbers) import re - if re.search(r'\d+\.\d+', output): + + if re.search(r"\d+\.\d+", output): return output - + return None - + @staticmethod - def test_help_variations( - binary_path: str, - docker_image: Optional[str] = None - ) -> Dict[str, any]: + def test_help_variations(binary_path: str, docker_image: Optional[str] = None) -> Dict[str, any]: """ Test all help command variations and return detailed results - + Args: binary_path: Path to binary docker_image: Optional Docker image to run in - + Returns: Dict with test results for each variation """ results = { - 'binary': binary_path, - 'image': docker_image, - 'tests': [], - 'working_commands': [], - 'best_command': None + "binary": binary_path, + "image": docker_image, + "tests": [], + "working_commands": [], + "best_command": None, } - + for help_args in CommandExecutor.HELP_VARIATIONS: exit_code, stdout, stderr = CommandExecutor.execute_command( - binary_path, - help_args, - docker_image, - timeout=30 + binary_path, help_args, docker_image, timeout=30 ) - - output = stdout + '\n' + stderr + + output = stdout + "\n" + stderr output = output.strip() - + is_valid = CommandExecutor.is_valid_help_output(output, exit_code) - - cmd_str = ' '.join(help_args) if help_args else '(no args)' - + + cmd_str = " ".join(help_args) if help_args else "(no args)" + test_result = { - 'command': cmd_str, - 'args': help_args, - 'exit_code': exit_code, - 'output_length': len(output), - 'is_valid': is_valid + "command": cmd_str, + "args": help_args, + "exit_code": exit_code, + "output_length": len(output), + "is_valid": is_valid, } - - results['tests'].append(test_result) - + + results["tests"].append(test_result) + if is_valid: - results['working_commands'].append(cmd_str) - if not results['best_command']: - results['best_command'] = cmd_str - + results["working_commands"].append(cmd_str) + if not results["best_command"]: + results["best_command"] = cmd_str + return results - + @staticmethod - def verify_binary( - binary_path: str, - docker_image: Optional[str] = None - ) -> bool: + def verify_binary(binary_path: str, docker_image: Optional[str] = None) -> bool: """ Verify that a binary exists and can be executed - + Args: binary_path: Path to binary docker_image: Optional Docker image to run in - + Returns: True if binary can be executed """ # Try to get help text help_text = CommandExecutor.execute_help(binary_path, docker_image=docker_image) return help_text is not None and len(help_text) > 50 - - diff --git a/src/discover_binaries.py b/src/discover_binaries.py index 1624796..b8dc9ff 100755 --- a/src/discover_binaries.py +++ b/src/discover_binaries.py @@ -34,11 +34,11 @@ class BinaryDiscoveryTool: """Discover binaries for tools without binary names""" - + def __init__(self, config_file: str, dry_run: bool = False): """ Initialize the discovery tool - + Args: config_file: Path to cli_tools.json config file dry_run: If True, don't update config file @@ -47,7 +47,7 @@ def __init__(self, config_file: str, dry_run: bool = False): self.dry_run = dry_run self.config = None self.tools = [] - + self.stats = { 'total': 0, 'success': 0, @@ -56,46 +56,46 @@ def __init__(self, config_file: str, dry_run: bool = False): 'skipped': 0, 'start_time': datetime.now() } - + def log(self, message: str, level: str = "INFO"): """Log message with timestamp""" timestamp = datetime.now().strftime("%H:%M:%S") print(f"[{timestamp}] {level}: {message}") - + def load_config(self) -> bool: """Load configuration file""" try: with open(self.config_file, 'r') as f: self.config = json.load(f) - + self.tools = self.config.get('cli_tools', []) self.log(f"Loaded {len(self.tools)} tools from config") return True - + except Exception as e: self.log(f"Failed to load config: {e}", "ERROR") return False - + def save_config(self): """Save updated configuration file""" if self.dry_run: self.log("Dry-run mode: skipping config save", "INFO") return - + try: # Create backup of original file (before modifications) backup_path = self.config_file.with_suffix('.json.backup') shutil.copy(self.config_file, backup_path) self.log(f"Created backup: {backup_path}") - + # Save updated config with open(self.config_file, 'w') as f: json.dump(self.config, f, indent=2) self.log(f"Updated config saved: {self.config_file}") - + except Exception as e: self.log(f"Failed to save config: {e}", "ERROR") - + def check_docker_available(self) -> bool: """Check if Docker is available""" try: @@ -107,19 +107,19 @@ def check_docker_available(self) -> bool: return result.returncode == 0 except Exception: return False - + def pull_image(self, image: str) -> bool: """ Pull Docker image if not available locally - + Args: image: Full image name (e.g., "alpine:latest") - + Returns: True if image is available """ self.log(f" Checking image availability: {image}") - + # Check if image exists locally try: result = subprocess.run( @@ -132,7 +132,7 @@ def pull_image(self, image: str) -> bool: return True except Exception: pass - + # Try to pull image self.log(f" → Pulling image (this may take a while)...") try: @@ -154,14 +154,14 @@ def pull_image(self, image: str) -> bool: except Exception as e: self.log(f" → Error pulling image: {e}", "WARN") return False - + def discover_for_tool(self, tool: Dict) -> Optional[Dict]: """ Discover binaries for a single tool - + Args: tool: Tool dictionary from config - + Returns: Discovery result dict or None if failed """ @@ -169,21 +169,21 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: self.log(f"\n{'='*80}") self.log(f"Processing: {tool_name}") self.log(f"{'='*80}") - + # Check if tool has image_repo image_repo = tool.get('image_repo', {}) if not image_repo or not image_repo.get('image'): self.log(f" No image_repo defined, skipping", "WARN") return None - + # Get image and tag (handle empty list with `or` fallback) image_base = image_repo['image'] image_tags = tool.get('image_tags', ['latest']) or ['latest'] image_tag = 'latest' if 'latest' in image_tags else image_tags[0] docker_image = f"{image_base}:{image_tag}" - + self.log(f" Image: {docker_image}") - + # Check image availability if not self.pull_image(docker_image): return { @@ -191,23 +191,23 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: 'reason': 'image_unavailable', 'image': docker_image } - + # Discover binaries matches = BinaryFinder.discover_binaries_for_tool(docker_image, tool_name) - + if not matches: return { 'status': 'failed', 'reason': 'no_binaries_found', 'image': docker_image } - + # Test each match to find working binaries working_binaries = [] consecutive_failures = 0 max_consecutive_failures = 3 # Stop after 3 consecutive failures min_confidence = 0.40 # Minimum confidence to consider (lowered to catch more valid binaries) - + for binary_path, match_type, confidence in matches[:10]: # Test top 10 matches # Skip low confidence matches if confidence < min_confidence: @@ -218,10 +218,10 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: continue binary_name = os.path.basename(binary_path) self.log(f" Testing: {binary_name} (confidence: {confidence:.2f}, type: {match_type})") - + # Test help command test_results = CommandExecutor.test_help_variations(binary_path, docker_image) - + if test_results['working_commands']: self.log(f" ✓ Working help commands: {', '.join(test_results['working_commands'])}") working_binaries.append({ @@ -236,12 +236,12 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: else: self.log(f" ✗ No working help commands found") consecutive_failures += 1 - + # Stop early if too many consecutive failures if consecutive_failures >= max_consecutive_failures: self.log(f" → Stopping after {consecutive_failures} consecutive failures") break - + if not working_binaries: return { 'status': 'partial', @@ -249,17 +249,17 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: 'image': docker_image, 'binaries_tested': len(matches[:10]) } - + # Select primary binary (highest confidence with working help) primary = working_binaries[0] alternates = [b['name'] for b in working_binaries[1:] if b['name'] != primary['name']] - + self.log(f"\n ✓ Discovery successful!") self.log(f" Primary binary: {primary['name']}") self.log(f" Help command: {primary['help_command']}") if alternates: self.log(f" Alternate binaries: {', '.join(alternates[:5])}") - + return { 'status': 'success', 'binary': primary['name'], @@ -275,11 +275,11 @@ def discover_for_tool(self, tool: Dict) -> Optional[Dict]: 'verified': True } } - + def update_tool_config(self, tool: Dict, discovery_result: Dict): """ Update tool configuration with discovery results - + Args: tool: Original tool dict discovery_result: Discovery result dict @@ -287,19 +287,19 @@ def update_tool_config(self, tool: Dict, discovery_result: Dict): if discovery_result['status'] == 'success': tool['binary'] = discovery_result['binary'] tool['alternate_binaries'] = discovery_result['alternate_binaries'] - + # Add discovery metadata to notes if 'notes' not in tool: tool['notes'] = '' - + note = f"Binary discovered automatically on {datetime.now().strftime('%Y-%m-%d')} " \ f"(method: {discovery_result['match_type']}, confidence: {discovery_result['confidence']:.2f})" - + if tool['notes']: tool['notes'] += f" | {note}" else: tool['notes'] = note - + def process_tools( self, only_tools: Optional[List[str]] = None, @@ -308,7 +308,7 @@ def process_tools( ): """ Process tools and discover binaries - + Args: only_tools: If set, only process these specific tools category: If set, only process tools in this category @@ -321,65 +321,66 @@ def process_tools( self.log(f"Dry-run: {self.dry_run}") self.log(f"Timeout: 2 minutes per container scan") self.log("="*80) - + # Check Docker if not self.check_docker_available(): self.log("Docker not available! Please install Docker.", "ERROR") return - + self.log("✓ Docker is available") - + # Load config if not self.load_config(): return - + # Filter tools filtered_tools = [] for tool in self.tools: # Skip if has binary already if tool.get('binary'): continue - + # Filter by category if category and tool.get('category') != category: continue - + # Filter by name if only_tools and tool.get('name') not in only_tools: continue - - # Must have image_repo - if not tool.get('image_repo', {}).get('image'): + + # Must have image_repo (handle None values explicitly) + image_repo = tool.get('image_repo') + if not image_repo or not image_repo.get('image'): continue - + filtered_tools.append(tool) - + if not filtered_tools: self.log("No tools to process after filtering", "WARN") return - + # Apply limit if limit: filtered_tools = filtered_tools[:limit] - + self.log(f"\nProcessing {len(filtered_tools)} tools") self.log("="*80) - + # Process each tool results = [] for i, tool in enumerate(filtered_tools, 1): self.stats['total'] += 1 - + self.log(f"\n[{i}/{len(filtered_tools)}]") - + result = self.discover_for_tool(tool) - + if result: results.append({ 'tool': tool['name'], **result }) - + if result['status'] == 'success': self.stats['success'] += 1 self.update_tool_config(tool, result) @@ -394,18 +395,18 @@ def process_tools( 'status': 'skipped', 'reason': 'no_image_repo' }) - + # Save config if changes were made if self.stats['success'] > 0: self.save_config() - + # Print summary self.print_summary(results) - + def print_summary(self, results: List[Dict]): """Print processing summary""" duration = (datetime.now() - self.stats['start_time']).total_seconds() - + self.log("\n" + "="*80) self.log("DISCOVERY SUMMARY") self.log("="*80) @@ -415,7 +416,7 @@ def print_summary(self, results: List[Dict]): self.log(f"⚠ Partial: {self.stats['partial']} (binary found but help doesn't work)") self.log(f"✗ Failed: {self.stats['failed']}") self.log(f"⊙ Skipped: {self.stats['skipped']}") - + # Show successful discoveries successful = [r for r in results if r.get('status') == 'success'] if successful: @@ -424,7 +425,7 @@ def print_summary(self, results: List[Dict]): self.log("="*80) for r in successful: self.log(f" • {r['tool']:40s} → {r['binary']:20s} (confidence: {r.get('confidence', 0):.2f})") - + # Show failures failed = [r for r in results if r.get('status') in ['failed', 'partial']] if failed: @@ -434,7 +435,7 @@ def print_summary(self, results: List[Dict]): for r in failed: reason = r.get('reason', 'unknown') self.log(f" • {r['tool']:40s} → {reason}") - + success_rate = (self.stats['success'] / self.stats['total'] * 100) if self.stats['total'] > 0 else 0 self.log("\n" + "="*80) self.log(f"Success Rate: {success_rate:.1f}%") @@ -450,37 +451,37 @@ def main(): Examples: # Dry run to see what would be discovered python discover_binaries.py data/configs/cli_tools.json --dry-run - + # Discover and update config python discover_binaries.py data/configs/cli_tools.json --update - + # Process specific category python discover_binaries.py data/configs/cli_tools.json --category "Apache" --update - + # Process specific tools python discover_binaries.py data/configs/cli_tools.json --only "act" "Airflow" --update - + # Limit to first 10 tools python discover_binaries.py data/configs/cli_tools.json --limit 10 --update """ ) - + parser.add_argument('config', help='Path to cli_tools.json config file') parser.add_argument('--update', action='store_true', help='Update config file with discoveries') parser.add_argument('--dry-run', action='store_true', help='Dry run (no config updates)') parser.add_argument('--category', help='Only process tools in this category') parser.add_argument('--only', nargs='+', help='Only process these specific tools') parser.add_argument('--limit', type=int, help='Limit number of tools to process') - + args = parser.parse_args() - + # Determine dry-run mode dry_run = args.dry_run or not args.update - + if not args.update and not args.dry_run: print("Note: Running in dry-run mode. Use --update to save changes to config.") print() - + # Create and run discovery tool discovery = BinaryDiscoveryTool(args.config, dry_run=dry_run) discovery.process_tools( @@ -492,5 +493,3 @@ def main(): if __name__ == '__main__': main() - - diff --git a/src/parser.py b/src/parser.py index a59d19e..d5a13ad 100644 --- a/src/parser.py +++ b/src/parser.py @@ -1,255 +1,381 @@ +#!/usr/bin/env python3 +""" +CLI Parser - Simplified, robust CLI documentation extractor + +This parser reliably extracts help documentation from CLI tools whether running +in Docker containers or on the host system. It handles complex tools like AWS CLI +as well as simple tools like ls. +""" import argparse import json import os -import subprocess -import tarfile -import zipfile +import sys +from typing import Dict, Optional -import requests -from bson import json_util -from pymongo import MongoClient - - -def download_file(url, dest): - response = requests.get(url, stream=True) - file_path = os.path.join(dest, os.path.basename(url)) - with open(file_path, 'wb') as file: - for chunk in response.iter_content(chunk_size=8192): - file.write(chunk) - return file_path - -def extract_file(file_path, dest): - if file_path.endswith(('.tar.gz', '.tgz')): - with tarfile.open(file_path, "r:gz") as tar: - tar.extractall(path=dest) - elif file_path.endswith('.zip'): - with zipfile.ZipFile(file_path, "r") as zip_ref: - zip_ref.extractall(dest) - else: - os.chmod(file_path, 0o755) - return file_path +# Add current directory to path for imports +sys.path.insert(0, os.path.dirname(__file__)) -def download_and_extract(url, dest="/usr/local/bin"): - """ - Downloads and extracts an archive file from the given URL to the specified destination. +from binary_finder import BinaryFinder +from command_executor import CommandExecutor - Args: - url (str): The URL of the file to download. - dest (str): The destination directory to extract the file to. Defaults to "/usr/local/bin". - Returns: - str: The file path of the downloaded and extracted file. - """ - file_path = download_file(url, dest) - return extract_file(file_path, dest) +# AI Analysis imports +import requests -DOCKER_IMAGE = None -def call_command(binary, commands): +def analyze_with_ai(help_text: str, prompt_type: str = "help") -> Optional[Dict]: """ - Calls the specified command for the binary and returns the output. + Analyze help or version text using AI Args: - binary (str): The name or path of the binary to call. - commands (list): List of commands to try. + help_text: The text to analyze + prompt_type: Either "help" or "version" Returns: - str: The command output of the binary. - - Raises: - Exception: If all attempts to get command output fail. + Parsed structure as dict, or None on failure """ - for command in commands: - if DOCKER_IMAGE: - cmd = ["docker", "run", "--rm", DOCKER_IMAGE, binary] + command - else: - cmd = ([binary] if len(binary.split()) < 2 else binary.split()) + command - result = subprocess.run(cmd, capture_output=True, text=True) - output = result.stdout.strip() or result.stderr.strip() - if output: - return output - raise Exception(f"Failed to get output for {binary} with commands {commands}") - -def call_help(binary, command=None): - return call_command(f"{binary} {command}" if command else binary, [["--help"], ["-h"], ["help"]]) - -def call_version(binary): - return call_command(binary, [["--version"], ["-v"], ["version"]]) - -def get_prompt(prompt_type): prompts = { "help": ( - f"Parse the command-line help output into a JSON with 'subcommands' and 'options'. " - f"Subcommands can only begin with a lowercase letter; options start with '-' or '--'. " - f"Subcommands: {{'name': , 'description': , 'usage': }}." - f"Options: {{'option': <'--option'>, 'shortcut': <'-shortcut'>, 'description': , 'value': , 'default': , 'tags': []}}. " - f"Always include 'description', 'name' and usage details for the root command and subcommands. All commands should have a description. Exclude missing properties." - f"Sort subcommands and options alphabetically." + "Parse command-line help output into JSON with 'subcommands', 'options', and 'aliases'. " + "\n\nCRITICAL RULES:" + "\n\n1. IDENTIFY COMMAND SECTIONS - Look for section headers that indicate commands, services, or groups:" + "\n - Sections containing words: 'Command', 'Commands', 'Service', 'Services', 'Group', 'Groups' (case-insensitive)" + "\n - Man page format: sections in all-caps ending with 'COMMANDS', 'SERVICES', 'GROUPS'" + "\n - Ignore text within parentheses in headers (e.g., 'Basic Commands (Beginner):' → section header only)" + "\n" + "\n2. EXTRACT SUBCOMMANDS - From identified sections, extract ONLY the indented/listed items:" + "\n RULES:" + "\n - Extract the FIRST WORD from each indented line as the subcommand name" + "\n - Indented lines have leading whitespace (spaces or tabs)" + "\n - Strip special characters: 'buildx*' → 'buildx', '+o service' → 'service', '- item' → 'item'" + "\n - Include the description (remaining text on the same line after the command name)" + "\n - NEVER extract words from the section header line itself - only from indented items below it" + "\n" + "\n3. EXTRACT OPTIONS - Items starting with '-' or '--' in sections like 'Options:', 'Flags:', 'Global Options:'" + "\n" + "\n4. EXTRACT ALIASES - Only from 'Aliases:' sections (these are alternative names, NOT subcommands)" + "\n" + "\n5. EXCLUDE from subcommands:" + "\n - Section header lines (lines ending with ':' without leading whitespace)" + "\n - Text within parentheses in section headers" + "\n - Items in 'Arguments:', 'Positional Arguments:', 'Usage:', 'Examples:' sections" + "\n - Resource type abbreviations like 'pod (po)', 'service (svc)'" + "\n - Anything starting with '-' or '--' (these are options, not subcommands)" + "\n - All-caps section headers: 'DESCRIPTION', 'SYNOPSIS', 'EXAMPLES'" + "\n" + "\n6. COMPLETENESS - Extract ALL items from all command/service/group sections. Do not stop early." + "\n\nOUTPUT FORMAT (JSON):" + "\n{" + '\n "subcommands": [{"name": "cmd", "description": "desc"}, ...],' + '\n "options": [{"option": "--flag", "shortcut": "-f", "description": "desc", "value": "val", "default": "def"}, ...],' + '\n "aliases": ["alias1", "alias2", ...]' + "\n}" ), - "version": ( - f"Extract and return the version number (including commit SHAs) within a JSON object from the following version output." - ) + "version": "Extract version number from the output and return as JSON: {'version': }" } - return prompts[prompt_type] -def analyze_output(binary, output, prompt_type): - """ - Analyzes the output of a binary command and returns it in JSON format or plain text. - - Args: - binary (str): The name or path of the binary to analyze. - output (str): The output of the binary command. - prompt_type (str): The type of prompt to use ('help' or 'version'). + api_key = os.getenv('OPENAI_API_KEY') + if not api_key: + print("Warning: OPENAI_API_KEY not set, skipping AI analysis") + return None - Returns: - dict or str: The parsed output in JSON format for help, or plain text for version. - """ - prompt = get_prompt(prompt_type) headers = { - 'Authorization': f"Bearer {os.getenv('OPENAI_API_KEY')}", + 'Authorization': f"Bearer {api_key}", 'Content-Type': 'application/json', } + json_data = { - 'model': 'gpt-4o', + 'model': 'gpt-4o-mini', 'messages': [ - {'role': 'system', 'content': 'You are a helpful CLI parser assistant that returns results in JSON.'}, - {'role': 'user', 'content': prompt}, - {'role': 'user', 'content': output} + {'role': 'system', 'content': 'You are a CLI parser that returns JSON.'}, + {'role': 'user', 'content': prompts[prompt_type]}, + {'role': 'user', 'content': help_text} ], - 'response_format': { 'type': "json_object" }, + 'response_format': {'type': "json_object"}, 'temperature': 0.7, } try: - response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=json_data) + response = requests.post( + "https://api.openai.com/v1/chat/completions", + headers=headers, + json=json_data, + timeout=300 # 5 minutes for large help texts like AWS CLI + ) response.raise_for_status() - print("AI Token Usage:", response.json()['usage']) - return response.json()['choices'][0]['message']['content'].strip() - except requests.exceptions.RequestException as e: - print(e.response.json() if e.response else str(e)) + + content = response.json()['choices'][0]['message']['content'] + return json.loads(content) + + except Exception as e: + print(f"AI analysis error: {e}") return None -def analyze_binary_help(binary, parent=None): + +def parse_command(binary_path: str, parent_command: Optional[str], docker_image: Optional[str], + depth: int, max_depth: int, parent_help_text: Optional[str] = None) -> Dict: """ - Analyzes the help output of a binary and returns it in JSON format, including subcommands and options. + Recursively parse a command and its subcommands Args: - binary (str): The name or path of the binary to analyze. - parent (str, optional): An additional command to append to the binary for deeper analysis. Defaults to None. + binary_path: Full path to the binary + parent_command: Parent command string (e.g., "config" for "git config") + docker_image: Optional Docker image to run in + depth: Current recursion depth + max_depth: Maximum recursion depth + parent_help_text: Help text from parent command for comparison Returns: - dict: The parsed help output in JSON format, including subcommands and options. + Parsed command structure as dict """ - print(f"Analyzing Binary: {binary}, Parent: {parent}") - try: - help_output = call_help(binary, parent) - except Exception as e: - print(str(e)) - return {'name': f"{binary} {parent}" if parent else binary, 'subcommands': [], 'options': []} - - result = analyze_output(binary, help_output, "help") - if result: - try: - result = json.loads(result) - result['name'] = f"{binary} {parent}" if parent else binary - except json.JSONDecodeError as e: - print(f"JSON decoding error in AI response for {binary} {parent}: {e}") - return {'name': f"{binary} {parent}" if parent else binary, 'subcommands': [], 'options': []} - - # Analyze subcommands recursively - subcommands = [] - for command in result.get('subcommands', []): - if command['name'].lower() not in ["help", (parent.lower() if parent else ""), binary]: - subcommands.append(analyze_binary_help(result['name'], command['name'])) - result['subcommands'] = subcommands - - with open('result.json', 'a') as file: - json.dump(result, file) - file.write('\n') - - return result - return {'name': f"{binary} {parent}" if parent else binary, 'subcommands': [], 'options': []} - -def analyze_binary_version(binary): + # Build display name + binary_name = os.path.basename(binary_path) + if parent_command: + display_name = f"{binary_name} {parent_command}" + else: + display_name = binary_name + + print(f"\n[Depth {depth}] Analyzing: {display_name}") + + # Stop recursion if max depth reached + if depth >= max_depth: + print(f" → Max depth {max_depth} reached, stopping recursion") + return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} + + # Get help text + help_text = CommandExecutor.execute_help(binary_path, parent_command, docker_image) + + if not help_text: + print(f" → No help text retrieved") + return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} + + # Check if help text is identical to parent (indicates resource type, not real subcommand) + if parent_help_text and help_text == parent_help_text: + print(f" → Help text identical to parent (likely a resource type, not a subcommand)") + return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} + + print(f" → Help text retrieved ({len(help_text)} chars)") + print(f" → Full help text:") + preview = help_text.replace('\n', '\n ') + print(f" {preview}") + + # Parse with AI + parsed = analyze_with_ai(help_text, "help") + + if not parsed: + print(f" → AI parsing failed") + return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': [], 'raw_help_text': help_text} + + # Add metadata + parsed['name'] = display_name + parsed['raw_help_text'] = help_text + + # Ensure required fields exist + if 'subcommands' not in parsed: + parsed['subcommands'] = [] + if 'options' not in parsed: + parsed['options'] = [] + if 'aliases' not in parsed: + parsed['aliases'] = [] + + num_subcmds = len(parsed['subcommands']) + num_opts = len(parsed['options']) + num_aliases = len(parsed['aliases']) + + print(f" → Extracted: {num_subcmds} subcommands, {num_opts} options, {num_aliases} aliases") + + # Show first few subcommands and options + if num_subcmds > 0: + print(f" → First subcommands: {', '.join([s['name'] for s in parsed['subcommands'][:5]])}{' ...' if num_subcmds > 5 else ''}") + if num_opts > 0: + print(f" → First options: {', '.join([o.get('option', o.get('shortcut', '?')) for o in parsed['options'][:5]])}{' ...' if num_opts > 5 else ''}") + + # Recursively parse subcommands + if num_subcmds > 0 and depth < max_depth: + aliases_lower = [a.lower() for a in parsed['aliases']] + processed_subcommands = [] + + # Build set of words in current command path for redundancy checking + current_path_words = set() + if parent_command: + current_path_words = {w.lower() for w in parent_command.split()} + current_path_words.add(os.path.basename(binary_path).lower()) + + for subcmd in parsed['subcommands']: + subcmd_name = subcmd.get('name', '') + + # Skip if it's an alias + if subcmd_name.lower() in aliases_lower: + print(f" → Skipping '{subcmd_name}' (it's an alias)") + continue + + # Skip help commands + if subcmd_name.lower() in ['help', 'h', '--help', '-h']: + continue + + # Check for redundant words in subcommand name + subcmd_words = [w.lower() for w in subcmd_name.split()] + redundant_words = [w for w in subcmd_words if w in current_path_words] + + if redundant_words: + print(f" → Skipping '{subcmd_name}' (redundant words: {redundant_words})") + continue + + # Check for repeated words within the subcommand itself + if len(subcmd_words) != len(set(subcmd_words)): + print(f" → Skipping '{subcmd_name}' (contains repeated words)") + continue + + # Build next parent command + if parent_command: + next_parent = f"{parent_command} {subcmd_name}" + else: + next_parent = subcmd_name + + # Recursively parse + subcmd_parsed = parse_command( + binary_path, + next_parent, + docker_image, + depth + 1, + max_depth, + help_text # Pass current help text for comparison + ) + + # Skip subcommands that failed to retrieve help text (likely invalid commands) + # Check if it has no help text and no valid content + has_content = ( + subcmd_parsed.get('subcommands') or + subcmd_parsed.get('options') or + subcmd_parsed.get('raw_help_text') + ) + if not has_content: + print(f" → Skipping '{subcmd_name}' (no help text retrieved - likely invalid command)") + continue + + # Preserve original description + if 'description' not in subcmd_parsed and 'description' in subcmd: + subcmd_parsed['description'] = subcmd['description'] + + processed_subcommands.append(subcmd_parsed) + + parsed['subcommands'] = processed_subcommands + + return parsed + + +def parse_binary(binary_name: str, docker_image: Optional[str] = None, + max_depth: int = 20) -> Optional[Dict]: """ - Analyzes the version output of a binary and returns the version number. + Main entry point: parse a binary and extract all documentation Args: - binary (str): The name or path of the binary to analyze. + binary_name: Name or path of binary to parse + docker_image: Optional Docker image to run in + max_depth: Maximum recursion depth for subcommands Returns: - str: The version number of the binary. + Complete parsed structure as dict, or None on failure """ - print(f"Analyzing Binary Version: {binary}") - try: - version_output = call_version(binary) - return json.loads(analyze_output(binary, version_output, "version"))['version'] - except Exception as e: - print(str(e)) - return None + print(f"\n{'='*80}") + print(f"Parsing: {binary_name}") + if docker_image: + print(f"Docker Image: {docker_image}") + print(f"Max Depth: {max_depth}") + print(f"{'='*80}") + + # Find binary path + if docker_image: + binary_path, method = BinaryFinder.find_in_container(docker_image, binary_name) + if not binary_path: + print(f"\n✗ Binary '{binary_name}' not found in container") + return None + print(f"\n✓ Binary found in container: {binary_path}") + print(f" Search method: {method}") + else: + # Check if it's already a full path + if binary_name.startswith('/'): + binary_path = binary_name + method = "provided-as-path" + else: + binary_path, method = BinaryFinder.find_on_host(binary_name) + if not binary_path: + print(f"\n✗ Binary '{binary_name}' not found on host") + return None + + print(f"\n✓ Binary found on host: {binary_path}") + print(f" Search method: {method}") + + # Parse the binary recursively + result = parse_command(binary_path, None, docker_image, depth=0, max_depth=max_depth) + + # Get version + print(f"\nGetting version information...") + version_text = CommandExecutor.execute_version(binary_path, docker_image) + + if version_text: + print(f" → Version text retrieved ({len(version_text)} chars)") + version_parsed = analyze_with_ai(version_text, "version") + if version_parsed and 'version' in version_parsed: + result['version'] = version_parsed['version'] + print(f" → Version: {result['version']}") + else: + result['version'] = None + print(f" → Could not parse version") + else: + result['version'] = None + print(f" → No version text retrieved") -def main(binary_name, url=None, mongodb_url=None, override=False, docker_image=None): - """ - Main function to analyze a binary's help output and optionally save the results to MongoDB. + return result - Args: - binary_name (str): The name of the binary to analyze. - url (str, optional): URL to download the binary or archive file. Defaults to None. - mongodb_url (str, optional): MongoDB connection string. Defaults to None. - override (bool, optional): Whether to override existing document if it exists. Defaults to False. - docker_image (str, optional): Docker image to run the binary in. Defaults to None. - """ - global DOCKER_IMAGE - DOCKER_IMAGE = docker_image - db = None - if mongodb_url: - client = MongoClient(mongodb_url) - db = client.cli_archive - - try: - existing_document = db.cli_archive.find_one({"name": binary_name}) - except Exception as e: - print(f"Error encountered accessing Mongo DB: {str(e)}") - existing_document = None - - if existing_document and not override: - print(f"Document for {binary_name} found in MongoDB:") - print(json.dumps(existing_document, indent=4, default=json_util.default)) - return - - if url: - binary_path = download_and_extract(url) - binary_name = os.path.basename(binary_path) - else: - binary_path = binary_name - result = analyze_binary_help(binary_path) - result['version'] = analyze_binary_version(binary_path) - print(json.dumps(result, indent=4)) +def main(): + """CLI entry point""" + arg_parser = argparse.ArgumentParser( + description='Parse CLI tool documentation', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Parse local binary + python parser.py ls --max-depth 1 - if db != None: - if existing_document and override: - db.cli_archive.replace_one({"name": binary_name}, result) - print(f"Results for {binary_name} overwritten in MongoDB") - else: - db.cli_archive.insert_one(result) - print(f"Results for {binary_name} inserted into MongoDB") + # Parse in Docker + python parser.py doctl --docker docker.io/digitalocean/doctl:latest + + # Parse with output file + python parser.py kubectl --docker bitnami/kubectl:latest --output kubectl.json + """ + ) + + arg_parser.add_argument('binary', help='Binary name or path to parse') + arg_parser.add_argument('--docker', help='Docker image to run binary in') + arg_parser.add_argument('--max-depth', type=int, default=20, help='Maximum recursion depth (default: 20)') + arg_parser.add_argument('--output', '-o', help='Output JSON file (default: stdout)') + + args = arg_parser.parse_args() + + # Parse the binary + result = parse_binary(args.binary, args.docker, args.max_depth) + + if not result: + print("\n✗ Parsing failed") + sys.exit(1) + + # Output results + json_output = json.dumps(result, indent=2) + + if args.output: + with open(args.output, 'w') as f: + f.write(json_output) + print(f"\n✓ Results written to: {args.output}") + else: + print("\n" + "="*80) + print("RESULTS:") + print("="*80) + print(json_output) + + print(f"\n✓ Parsing complete") - return result -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="CLI Analyzer") - parser.add_argument("binary_name", type=str, help="The name of the binary to analyze") - parser.add_argument("--url", type=str, help="Optional URL to download the binary or archive file") - parser.add_argument("--override", action='store_true', help="Override existing document if it exists") - parser.add_argument("--mongodb-url", type=str, help="MongoDB connection string") - parser.add_argument("--docker", type=str, help="Docker image to run the binary in") - - args = parser.parse_args() - - if args.binary_name is None: - print("Error: Missing required argument 'binary_name'. Use --help for usage information.") - parser.print_help() - exit(1) - - main(args.binary_name, args.url, args.mongodb_url, args.override, args.docker) +if __name__ == '__main__': + main() diff --git a/src/parser_v2.py b/src/parser_v2.py deleted file mode 100644 index a78f556..0000000 --- a/src/parser_v2.py +++ /dev/null @@ -1,379 +0,0 @@ -#!/usr/bin/env python3 -""" -CLI Parser V2 - Simplified, robust CLI documentation extractor - -This parser reliably extracts help documentation from CLI tools whether running -in Docker containers or on the host system. It handles complex tools like AWS CLI -as well as simple tools like ls. -""" -import argparse -import json -import os -import sys -from typing import Dict, Optional - -# Add current directory to path for imports -sys.path.insert(0, os.path.dirname(__file__)) - -from binary_finder import BinaryFinder -from command_executor import CommandExecutor - - -# AI Analysis imports -import requests - - -def analyze_with_ai(help_text: str, prompt_type: str = "help") -> Optional[Dict]: - """ - Analyze help or version text using AI - - Args: - help_text: The text to analyze - prompt_type: Either "help" or "version" - - Returns: - Parsed structure as dict, or None on failure - """ - prompts = { - "help": ( - "Parse command-line help output into JSON with 'subcommands', 'options', and 'aliases'. " - "\n\nCRITICAL RULES:" - "\n\n1. IDENTIFY COMMAND SECTIONS - Look for section headers that indicate commands, services, or groups:" - "\n - Sections containing words: 'Command', 'Commands', 'Service', 'Services', 'Group', 'Groups' (case-insensitive)" - "\n - Man page format: sections in all-caps ending with 'COMMANDS', 'SERVICES', 'GROUPS'" - "\n - Ignore text within parentheses in headers (e.g., 'Basic Commands (Beginner):' → section header only)" - "\n" - "\n2. EXTRACT SUBCOMMANDS - From identified sections, extract ONLY the indented/listed items:" - "\n RULES:" - "\n - Extract the FIRST WORD from each indented line as the subcommand name" - "\n - Indented lines have leading whitespace (spaces or tabs)" - "\n - Strip special characters: 'buildx*' → 'buildx', '+o service' → 'service', '- item' → 'item'" - "\n - Include the description (remaining text on the same line after the command name)" - "\n - NEVER extract words from the section header line itself - only from indented items below it" - "\n" - "\n3. EXTRACT OPTIONS - Items starting with '-' or '--' in sections like 'Options:', 'Flags:', 'Global Options:'" - "\n" - "\n4. EXTRACT ALIASES - Only from 'Aliases:' sections (these are alternative names, NOT subcommands)" - "\n" - "\n5. EXCLUDE from subcommands:" - "\n - Section header lines (lines ending with ':' without leading whitespace)" - "\n - Text within parentheses in section headers" - "\n - Items in 'Arguments:', 'Positional Arguments:', 'Usage:', 'Examples:' sections" - "\n - Resource type abbreviations like 'pod (po)', 'service (svc)'" - "\n - Anything starting with '-' or '--' (these are options, not subcommands)" - "\n - All-caps section headers: 'DESCRIPTION', 'SYNOPSIS', 'EXAMPLES'" - "\n" - "\n6. COMPLETENESS - Extract ALL items from all command/service/group sections. Do not stop early." - "\n\nOUTPUT FORMAT (JSON):" - "\n{" - '\n "subcommands": [{"name": "cmd", "description": "desc"}, ...],' - '\n "options": [{"option": "--flag", "shortcut": "-f", "description": "desc", "value": "val", "default": "def"}, ...],' - '\n "aliases": ["alias1", "alias2", ...]' - "\n}" - ), - "version": "Extract version number from the output and return as JSON: {'version': }" - } - - api_key = os.getenv('OPENAI_API_KEY') - if not api_key: - print("Warning: OPENAI_API_KEY not set, skipping AI analysis") - return None - - headers = { - 'Authorization': f"Bearer {api_key}", - 'Content-Type': 'application/json', - } - - json_data = { - 'model': 'gpt-4o-mini', - 'messages': [ - {'role': 'system', 'content': 'You are a CLI parser that returns JSON.'}, - {'role': 'user', 'content': prompts[prompt_type]}, - {'role': 'user', 'content': help_text} - ], - 'response_format': {'type': "json_object"}, - 'temperature': 0.7, - } - - try: - response = requests.post( - "https://api.openai.com/v1/chat/completions", - headers=headers, - json=json_data, - timeout=300 # 5 minutes for large help texts like AWS CLI - ) - response.raise_for_status() - - content = response.json()['choices'][0]['message']['content'] - return json.loads(content) - - except Exception as e: - print(f"AI analysis error: {e}") - return None - - -def parse_command(binary_path: str, parent_command: Optional[str], docker_image: Optional[str], - depth: int, max_depth: int, parent_help_text: Optional[str] = None) -> Dict: - """ - Recursively parse a command and its subcommands - - Args: - binary_path: Full path to the binary - parent_command: Parent command string (e.g., "config" for "git config") - docker_image: Optional Docker image to run in - depth: Current recursion depth - max_depth: Maximum recursion depth - parent_help_text: Help text from parent command for comparison - - Returns: - Parsed command structure as dict - """ - # Build display name - binary_name = os.path.basename(binary_path) - if parent_command: - display_name = f"{binary_name} {parent_command}" - else: - display_name = binary_name - - print(f"\n[Depth {depth}] Analyzing: {display_name}") - - # Stop recursion if max depth reached - if depth >= max_depth: - print(f" → Max depth {max_depth} reached, stopping recursion") - return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} - - # Get help text - help_text = CommandExecutor.execute_help(binary_path, parent_command, docker_image) - - if not help_text: - print(f" → No help text retrieved") - return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} - - # Check if help text is identical to parent (indicates resource type, not real subcommand) - if parent_help_text and help_text == parent_help_text: - print(f" → Help text identical to parent (likely a resource type, not a subcommand)") - return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': []} - - print(f" → Help text retrieved ({len(help_text)} chars)") - print(f" → Full help text:") - preview = help_text.replace('\n', '\n ') - print(f" {preview}") - - # Parse with AI - parsed = analyze_with_ai(help_text, "help") - - if not parsed: - print(f" → AI parsing failed") - return {'name': display_name, 'subcommands': [], 'options': [], 'aliases': [], 'raw_help_text': help_text} - - # Add metadata - parsed['name'] = display_name - parsed['raw_help_text'] = help_text - - # Ensure required fields exist - if 'subcommands' not in parsed: - parsed['subcommands'] = [] - if 'options' not in parsed: - parsed['options'] = [] - if 'aliases' not in parsed: - parsed['aliases'] = [] - - num_subcmds = len(parsed['subcommands']) - num_opts = len(parsed['options']) - num_aliases = len(parsed['aliases']) - - print(f" → Extracted: {num_subcmds} subcommands, {num_opts} options, {num_aliases} aliases") - - # Show first few subcommands and options - if num_subcmds > 0: - print(f" → First subcommands: {', '.join([s['name'] for s in parsed['subcommands'][:5]])}{' ...' if num_subcmds > 5 else ''}") - if num_opts > 0: - print(f" → First options: {', '.join([o.get('option', o.get('shortcut', '?')) for o in parsed['options'][:5]])}{' ...' if num_opts > 5 else ''}") - - # Recursively parse subcommands - if num_subcmds > 0 and depth < max_depth: - aliases_lower = [a.lower() for a in parsed['aliases']] - processed_subcommands = [] - - # Build set of words in current command path for redundancy checking - current_path_words = set() - if parent_command: - current_path_words = {w.lower() for w in parent_command.split()} - current_path_words.add(os.path.basename(binary_path).lower()) - - for subcmd in parsed['subcommands']: - subcmd_name = subcmd.get('name', '') - - # Skip if it's an alias - if subcmd_name.lower() in aliases_lower: - print(f" → Skipping '{subcmd_name}' (it's an alias)") - continue - - # Skip help commands - if subcmd_name.lower() in ['help', 'h', '--help', '-h']: - continue - - # Check for redundant words in subcommand name - subcmd_words = [w.lower() for w in subcmd_name.split()] - redundant_words = [w for w in subcmd_words if w in current_path_words] - - if redundant_words: - print(f" → Skipping '{subcmd_name}' (redundant words: {redundant_words})") - continue - - # Check for repeated words within the subcommand itself - if len(subcmd_words) != len(set(subcmd_words)): - print(f" → Skipping '{subcmd_name}' (contains repeated words)") - continue - - # Build next parent command - if parent_command: - next_parent = f"{parent_command} {subcmd_name}" - else: - next_parent = subcmd_name - - # Recursively parse - subcmd_parsed = parse_command( - binary_path, - next_parent, - docker_image, - depth + 1, - max_depth, - help_text # Pass current help text for comparison - ) - - # Skip subcommands that failed to retrieve help text (likely invalid commands) - # Check if it has no help text and no valid content - has_content = ( - subcmd_parsed.get('subcommands') or - subcmd_parsed.get('options') or - subcmd_parsed.get('raw_help_text') - ) - if not has_content: - print(f" → Skipping '{subcmd_name}' (no help text retrieved - likely invalid command)") - continue - - # Preserve original description - if 'description' not in subcmd_parsed and 'description' in subcmd: - subcmd_parsed['description'] = subcmd['description'] - - processed_subcommands.append(subcmd_parsed) - - parsed['subcommands'] = processed_subcommands - - return parsed - - -def parse_binary(binary_name: str, docker_image: Optional[str] = None, - max_depth: int = 20) -> Optional[Dict]: - """ - Main entry point: parse a binary and extract all documentation - - Args: - binary_name: Name or path of binary to parse - docker_image: Optional Docker image to run in - max_depth: Maximum recursion depth for subcommands - - Returns: - Complete parsed structure as dict, or None on failure - """ - print(f"\n{'='*80}") - print(f"Parsing: {binary_name}") - if docker_image: - print(f"Docker Image: {docker_image}") - print(f"Max Depth: {max_depth}") - print(f"{'='*80}") - - # Find binary path - if docker_image: - binary_path, method = BinaryFinder.find_in_container(docker_image, binary_name) - print(f"\n✓ Binary found in container: {binary_path}") - print(f" Search method: {method}") - else: - # Check if it's already a full path - if binary_name.startswith('/'): - binary_path = binary_name - method = "provided-as-path" - else: - binary_path, method = BinaryFinder.find_on_host(binary_name) - if not binary_path: - print(f"\n✗ Binary '{binary_name}' not found on host") - return None - - print(f"\n✓ Binary found on host: {binary_path}") - print(f" Search method: {method}") - - # Parse the binary recursively - result = parse_command(binary_path, None, docker_image, depth=0, max_depth=max_depth) - - # Get version - print(f"\nGetting version information...") - version_text = CommandExecutor.execute_version(binary_path, docker_image) - - if version_text: - print(f" → Version text retrieved ({len(version_text)} chars)") - version_parsed = analyze_with_ai(version_text, "version") - if version_parsed and 'version' in version_parsed: - result['version'] = version_parsed['version'] - print(f" → Version: {result['version']}") - else: - result['version'] = None - print(f" → Could not parse version") - else: - result['version'] = None - print(f" → No version text retrieved") - - return result - - -def main(): - """CLI entry point""" - parser = argparse.ArgumentParser( - description='Parse CLI tool documentation', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Parse local binary - python parser_v2.py ls --max-depth 1 - - # Parse in Docker - python parser_v2.py doctl --docker docker.io/digitalocean/doctl:latest - - # Parse with output file - python parser_v2.py kubectl --docker bitnami/kubectl:latest --output kubectl.json - """ - ) - - parser.add_argument('binary', help='Binary name or path to parse') - parser.add_argument('--docker', help='Docker image to run binary in') - parser.add_argument('--max-depth', type=int, default=20, help='Maximum recursion depth (default: 20)') - parser.add_argument('--output', '-o', help='Output JSON file (default: stdout)') - - args = parser.parse_args() - - # Parse the binary - result = parse_binary(args.binary, args.docker, args.max_depth) - - if not result: - print("\n✗ Parsing failed") - sys.exit(1) - - # Output results - json_output = json.dumps(result, indent=2) - - if args.output: - with open(args.output, 'w') as f: - f.write(json_output) - print(f"\n✓ Results written to: {args.output}") - else: - print("\n" + "="*80) - print("RESULTS:") - print("="*80) - print(json_output) - - print(f"\n✓ Parsing complete") - - -if __name__ == '__main__': - main() - diff --git a/src/process_cli_tools_v2.py b/src/process_cli_tools_v2.py index e67a8f4..0660987 100644 --- a/src/process_cli_tools_v2.py +++ b/src/process_cli_tools_v2.py @@ -11,54 +11,49 @@ """ import argparse import json -import sys import os -from pathlib import Path -from datetime import datetime +import sys from concurrent.futures import ProcessPoolExecutor, as_completed +from datetime import datetime +from pathlib import Path from typing import Dict, List, Optional # Add src to path sys.path.insert(0, os.path.dirname(__file__)) +import parser + from binary_finder import BinaryFinder -import parser_v2 class ToolsProcessor: """Process multiple CLI tools from configuration""" - + def __init__(self, config_file: str, output_dir: str, max_depth: int = 20, skip_existing: bool = True): self.config_file = Path(config_file) self.output_dir = Path(output_dir) self.max_depth = max_depth self.skip_existing = skip_existing - - self.stats = { - 'total': 0, - 'success': 0, - 'failed': 0, - 'skipped': 0, - 'start_time': datetime.now() - } - + + self.stats = {"total": 0, "success": 0, "failed": 0, "skipped": 0, "start_time": datetime.now()} + def log(self, message: str, level: str = "INFO"): """Log message with timestamp""" timestamp = datetime.now().strftime("%H:%M:%S") print(f"[{timestamp}] {level}: {message}") - + def load_config(self) -> Dict: """Load tools configuration from JSON file""" - with open(self.config_file, 'r') as f: + with open(self.config_file, "r") as f: config = json.load(f) - + # Determine category key - if 'dev_tools' in config: - category_key = 'dev_tools' - elif 'blockchain_tools' in config: - category_key = 'blockchain_tools' - elif 'cncf_tools' in config: - category_key = 'cncf_tools' + if "dev_tools" in config: + category_key = "dev_tools" + elif "blockchain_tools" in config: + category_key = "blockchain_tools" + elif "cncf_tools" in config: + category_key = "cncf_tools" else: # Find first array key for key in config.keys(): @@ -67,312 +62,284 @@ def load_config(self) -> Dict: break else: raise ValueError("No tools array found in config") - + return config, category_key - + def filter_tools(self, tools: List[Dict], only_tools: Optional[List[str]] = None) -> List[Dict]: """Filter tools based on criteria""" filtered = [] - + for tool in tools: - name = tool.get('name', 'Unknown') - + name = tool.get("name", "Unknown") + # Skip if only_tools specified and not in list if only_tools and name not in only_tools: continue - + # Skip deprecated - if tool.get('deprecated', False): + if tool.get("deprecated", False): self.log(f"Skipping {name}: deprecated") continue - + # Must have required fields - if not tool.get('image_name') or not tool.get('docker_help_command'): + if not tool.get("image_name") or not tool.get("docker_help_command"): self.log(f"Skipping {name}: missing required fields") continue - + filtered.append(tool) - + return filtered - + def get_output_path(self, tool_name: str, category: str, image_tag: str) -> Path: """Get output file path for a tool""" - safe_name = tool_name.replace(' ', '_').replace('/', '_').lower() + safe_name = tool_name.replace(" ", "_").replace("/", "_").lower() filename = f"{safe_name}-{image_tag}.json" - + category_dir = self.output_dir / category category_dir.mkdir(parents=True, exist_ok=True) - + return category_dir / filename - + def process_tool(self, tool: Dict, category: str) -> Dict: """ Process a single tool - + Returns: Result dict with status and details """ - name = tool['name'] - image_name = tool['image_name'] - docker_help_command = tool['docker_help_command'] - + name = tool["name"] + image_name = tool["image_name"] + docker_help_command = tool["docker_help_command"] + # Handle both image_tag (string) and image_tags (array) - if 'image_tag' in tool: - docker_tag = tool['image_tag'] + if "image_tag" in tool: + docker_tag = tool["image_tag"] else: # Handle empty list with `or` fallback to prevent IndexError - image_tags = tool.get('image_tags', ['latest']) or ['latest'] - docker_tag = 'latest' if 'latest' in image_tags else image_tags[0] - + image_tags = tool.get("image_tags", ["latest"]) or ["latest"] + docker_tag = "latest" if "latest" in image_tags else image_tags[0] + docker_image = f"{image_name}:{docker_tag}" - + # Extract binary name from help command binary_name = docker_help_command.split()[0] - + # Get output path output_path = self.get_output_path(name, category, docker_tag) - + # Check if already exists if self.skip_existing and output_path.exists(): - return { - 'name': name, - 'status': 'skipped', - 'message': 'already exists', - 'output_path': str(output_path) - } - + return {"name": name, "status": "skipped", "message": "already exists", "output_path": str(output_path)} + try: # Parse the tool - result = parser_v2.parse_binary( - binary_name, - docker_image=docker_image, - max_depth=self.max_depth - ) - + result = parser.parse_binary(binary_name, docker_image=docker_image, max_depth=self.max_depth) + if not result: - return { - 'name': name, - 'status': 'failed', - 'message': 'parsing returned no results' - } - + return {"name": name, "status": "failed", "message": "parsing returned no results"} + # Validate result has meaningful data - num_commands = len(result.get('subcommands', [])) - num_options = len(result.get('options', [])) - + num_commands = len(result.get("subcommands", [])) + num_options = len(result.get("options", [])) + if num_commands == 0 and num_options < 3: return { - 'name': name, - 'status': 'failed', - 'message': f'insufficient data: {num_commands} commands, {num_options} options' + "name": name, + "status": "failed", + "message": f"insufficient data: {num_commands} commands, {num_options} options", } - + # Save result - with open(output_path, 'w') as f: + with open(output_path, "w") as f: json.dump(result, f, indent=2) - + file_size = output_path.stat().st_size / 1024 # KB - + return { - 'name': name, - 'status': 'success', - 'output_path': str(output_path), - 'size_kb': file_size, - 'commands': num_commands, - 'options': num_options, - 'version': result.get('version') + "name": name, + "status": "success", + "output_path": str(output_path), + "size_kb": file_size, + "commands": num_commands, + "options": num_options, + "version": result.get("version"), } - + except Exception as e: - return { - 'name': name, - 'status': 'failed', - 'message': str(e)[:200] - } - + return {"name": name, "status": "failed", "message": str(e)[:200]} + def process_tools_sequential(self, tools: List[Dict], category: str) -> List[Dict]: """Process tools sequentially (for debugging or when parallelization issues occur)""" results = [] - + for i, tool in enumerate(tools, 1): self.log(f"[{i}/{len(tools)}] Processing: {tool['name']}") - + result = self.process_tool(tool, category) results.append(result) - + # Update stats - self.stats['total'] += 1 - if result['status'] == 'success': - self.stats['success'] += 1 - self.log(f" ✓ Success: {result.get('size_kb', 0):.1f} KB, " - f"{result.get('commands', 0)} cmds, {result.get('options', 0)} opts") - elif result['status'] == 'skipped': - self.stats['skipped'] += 1 + self.stats["total"] += 1 + if result["status"] == "success": + self.stats["success"] += 1 + self.log( + f" ✓ Success: {result.get('size_kb', 0):.1f} KB, " + f"{result.get('commands', 0)} cmds, {result.get('options', 0)} opts" + ) + elif result["status"] == "skipped": + self.stats["skipped"] += 1 self.log(f" ⊙ Skipped: {result.get('message', '')}") else: - self.stats['failed'] += 1 + self.stats["failed"] += 1 self.log(f" ✗ Failed: {result.get('message', '')}", "ERROR") - + return results - + def process_tools_parallel(self, tools: List[Dict], category: str, max_workers: int = 4) -> List[Dict]: """Process tools in parallel for faster throughput""" results = [] - + with ProcessPoolExecutor(max_workers=max_workers) as executor: # Submit all jobs - future_to_tool = { - executor.submit(self.process_tool, tool, category): tool - for tool in tools - } - + future_to_tool = {executor.submit(self.process_tool, tool, category): tool for tool in tools} + # Process completed jobs for future in as_completed(future_to_tool): tool = future_to_tool[future] - + try: result = future.result() results.append(result) - + # Update stats - self.stats['total'] += 1 - if result['status'] == 'success': - self.stats['success'] += 1 - self.log(f"✓ {result['name']}: {result.get('size_kb', 0):.1f} KB, " - f"{result.get('commands', 0)} cmds") - elif result['status'] == 'skipped': - self.stats['skipped'] += 1 + self.stats["total"] += 1 + if result["status"] == "success": + self.stats["success"] += 1 + self.log( + f"✓ {result['name']}: {result.get('size_kb', 0):.1f} KB, " + f"{result.get('commands', 0)} cmds" + ) + elif result["status"] == "skipped": + self.stats["skipped"] += 1 self.log(f"⊙ {result['name']}: skipped") else: - self.stats['failed'] += 1 + self.stats["failed"] += 1 self.log(f"✗ {result['name']}: {result.get('message', '')}", "ERROR") - + except Exception as e: - self.stats['total'] += 1 - self.stats['failed'] += 1 + self.stats["total"] += 1 + self.stats["failed"] += 1 self.log(f"✗ {tool['name']}: {str(e)[:100]}", "ERROR") - results.append({ - 'name': tool['name'], - 'status': 'failed', - 'message': str(e)[:200] - }) - + results.append({"name": tool["name"], "status": "failed", "message": str(e)[:200]}) + return results - + def process(self, only_tools: Optional[List[str]] = None, parallel: bool = False, max_workers: int = 4): """ Main processing method - + Args: only_tools: If set, only process these tools parallel: Whether to process in parallel max_workers: Number of parallel workers """ - self.log("="*80) + self.log("=" * 80) self.log(f"CLI Tools Processor V2") - self.log("="*80) + self.log("=" * 80) self.log(f"Config: {self.config_file}") self.log(f"Output: {self.output_dir}") self.log(f"Max Depth: {self.max_depth}") self.log(f"Parallel: {parallel} (workers: {max_workers if parallel else 'N/A'})") - self.log("="*80) - + self.log("=" * 80) + # Load config config, category_key = self.load_config() tools = config[category_key] - + self.log(f"Loaded {len(tools)} tools from '{category_key}' category") - + # Filter tools filtered_tools = self.filter_tools(tools, only_tools) - + if not filtered_tools: self.log("No tools to process after filtering", "WARN") return - + self.log(f"Processing {len(filtered_tools)} tools") - self.log("="*80) - + self.log("=" * 80) + # Process tools if parallel: results = self.process_tools_parallel(filtered_tools, category_key, max_workers) else: results = self.process_tools_sequential(filtered_tools, category_key) - + # Print summary self.print_summary(results) - + def print_summary(self, results: List[Dict]): """Print processing summary""" - duration = (datetime.now() - self.stats['start_time']).total_seconds() - - self.log("="*80) + duration = (datetime.now() - self.stats["start_time"]).total_seconds() + + self.log("=" * 80) self.log("PROCESSING SUMMARY") - self.log("="*80) + self.log("=" * 80) self.log(f"Duration: {duration:.1f}s") self.log(f"Total: {self.stats['total']}") self.log(f"✓ Success: {self.stats['success']}") self.log(f"⊙ Skipped: {self.stats['skipped']}") self.log(f"✗ Failed: {self.stats['failed']}") - + # Show successful results - successful = [r for r in results if r['status'] == 'success'] + successful = [r for r in results if r["status"] == "success"] if successful: - self.log("="*80) + self.log("=" * 80) self.log(f"SUCCESSFUL EXTRACTIONS ({len(successful)})") - self.log("="*80) + self.log("=" * 80) for r in successful: - self.log(f" • {r['name']}: {r.get('size_kb', 0):.1f} KB, " - f"{r.get('commands', 0)} cmds, {r.get('options', 0)} opts") - + self.log( + f" • {r['name']}: {r.get('size_kb', 0):.1f} KB, " + f"{r.get('commands', 0)} cmds, {r.get('options', 0)} opts" + ) + # Show failures - failed = [r for r in results if r['status'] == 'failed'] + failed = [r for r in results if r["status"] == "failed"] if failed: - self.log("="*80) + self.log("=" * 80) self.log(f"FAILED ({len(failed)})") - self.log("="*80) + self.log("=" * 80) for r in failed: self.log(f" • {r['name']}: {r.get('message', 'unknown error')}") - - success_rate = (self.stats['success'] / self.stats['total'] * 100) if self.stats['total'] > 0 else 0 - self.log("="*80) + + success_rate = (self.stats["success"] / self.stats["total"] * 100) if self.stats["total"] > 0 else 0 + self.log("=" * 80) self.log(f"Success Rate: {success_rate:.1f}%") - self.log("="*80) + self.log("=" * 80) def main(): """CLI entry point""" parser = argparse.ArgumentParser( - description='Process multiple CLI tools from configuration', - formatter_class=argparse.RawDescriptionHelpFormatter + description="Process multiple CLI tools from configuration", + formatter_class=argparse.RawDescriptionHelpFormatter, ) - - parser.add_argument('config', help='Path to config JSON file') - parser.add_argument('--output-dir', default='data/results', help='Base output directory') - parser.add_argument('--max-depth', type=int, default=20, help='Maximum recursion depth (default: 20)') - parser.add_argument('--only', nargs='+', help='Only process these tools (by name)') - parser.add_argument('--parallel', action='store_true', help='Process tools in parallel') - parser.add_argument('--workers', type=int, default=4, help='Number of parallel workers (default: 4)') - parser.add_argument('--no-skip', action='store_true', help='Reprocess even if output exists') - + + parser.add_argument("config", help="Path to config JSON file") + parser.add_argument("--output-dir", default="data/results", help="Base output directory") + parser.add_argument("--max-depth", type=int, default=20, help="Maximum recursion depth (default: 20)") + parser.add_argument("--only", nargs="+", help="Only process these tools (by name)") + parser.add_argument("--parallel", action="store_true", help="Process tools in parallel") + parser.add_argument("--workers", type=int, default=4, help="Number of parallel workers (default: 4)") + parser.add_argument("--no-skip", action="store_true", help="Reprocess even if output exists") + args = parser.parse_args() - + # Create processor - processor = ToolsProcessor( - args.config, - args.output_dir, - max_depth=args.max_depth, - skip_existing=not args.no_skip - ) - + processor = ToolsProcessor(args.config, args.output_dir, max_depth=args.max_depth, skip_existing=not args.no_skip) + # Process tools - processor.process( - only_tools=args.only, - parallel=args.parallel, - max_workers=args.workers - ) + processor.process(only_tools=args.only, parallel=args.parallel, max_workers=args.workers) -if __name__ == '__main__': +if __name__ == "__main__": main() - diff --git a/tests/test_parser.py b/tests/test_parser.py index a1b6db2..d9243ec 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -1,17 +1,22 @@ -import pytest -from unittest.mock import patch +import json import subprocess -from src.parser import main +from unittest.mock import MagicMock, patch + +import pytest + +from src.parser import parse_binary @pytest.fixture def mock_subprocess_run(): - with patch('subprocess.run') as mock_run: + with patch("subprocess.run") as mock_run: yield mock_run + def compare_dicts(result, expected): """ - Recursively compares two dictionaries and asserts that all raw values are equal. + Recursively compares two dictionaries and asserts that all expected keys match. + Ignores extra keys in result (like raw_help_text, version). Args: result (dict): The dictionary obtained from the function. @@ -36,7 +41,9 @@ def compare_dicts(result, expected): else: assert result[key] == expected[key], f"Value mismatch for key '{key}': {result[key]} != {expected[key]}" + def test_parse_aws(mock_subprocess_run): + """Test parsing AWS CLI with mocked subprocess and OpenAI API calls""" # Mock outputs for aws commands aws_help_output = """ AWS CLI tool for managing Amazon Web Services @@ -47,7 +54,7 @@ def test_parse_aws(mock_subprocess_run): ec2 EC2 service s3 S3 service """ - + aws_ec2_help_output = """ EC2 service @@ -56,7 +63,7 @@ def test_parse_aws(mock_subprocess_run): Available Commands: describe-instances Describe EC2 instances """ - + aws_ec2_describe_instances_help_output = """ Describe EC2 instances @@ -65,7 +72,7 @@ def test_parse_aws(mock_subprocess_run): Options: --filters Filters to apply to the request """ - + aws_s3_help_output = """ S3 service @@ -74,7 +81,7 @@ def test_parse_aws(mock_subprocess_run): Available Commands: ls List S3 buckets """ - + aws_s3_ls_help_output = """ List S3 buckets @@ -86,60 +93,141 @@ def test_parse_aws(mock_subprocess_run): # Define the side effects for subprocess.run mock_subprocess_run.side_effect = [ - subprocess.CompletedProcess(args='aws --help', returncode=0, stdout=aws_help_output), - subprocess.CompletedProcess(args='aws ec2 --help', returncode=0, stdout=aws_ec2_help_output), - subprocess.CompletedProcess(args='aws ec2 describe-instances --help', returncode=0, stdout=aws_ec2_describe_instances_help_output), - subprocess.CompletedProcess(args='aws s3 --help', returncode=0, stdout=aws_s3_help_output), - subprocess.CompletedProcess(args='aws s3 ls --help', returncode=0, stdout=aws_s3_ls_help_output), + # which aws + subprocess.CompletedProcess(args=["which", "aws"], returncode=0, stdout="/usr/local/bin/aws\n", stderr=""), + # aws --help + subprocess.CompletedProcess(args=["aws", "--help"], returncode=0, stdout=aws_help_output, stderr=""), + # aws ec2 --help + subprocess.CompletedProcess(args=["aws", "ec2", "--help"], returncode=0, stdout=aws_ec2_help_output, stderr=""), + # aws ec2 describe-instances --help + subprocess.CompletedProcess( + args=["aws", "ec2", "describe-instances", "--help"], + returncode=0, + stdout=aws_ec2_describe_instances_help_output, + stderr="", + ), + # aws ec2 describe-instances --version attempts (all fail) + subprocess.CompletedProcess( + args=["aws", "ec2", "describe-instances", "--version"], returncode=1, stdout="", stderr="" + ), + subprocess.CompletedProcess( + args=["aws", "ec2", "describe-instances", "-v"], returncode=1, stdout="", stderr="" + ), + subprocess.CompletedProcess( + args=["aws", "ec2", "describe-instances", "version"], returncode=1, stdout="", stderr="" + ), + # aws s3 --help + subprocess.CompletedProcess(args=["aws", "s3", "--help"], returncode=0, stdout=aws_s3_help_output, stderr=""), + # aws s3 ls --help + subprocess.CompletedProcess( + args=["aws", "s3", "ls", "--help"], returncode=0, stdout=aws_s3_ls_help_output, stderr="" + ), + # aws s3 ls --version attempts (all fail) + subprocess.CompletedProcess(args=["aws", "s3", "ls", "--version"], returncode=1, stdout="", stderr=""), + subprocess.CompletedProcess(args=["aws", "s3", "ls", "-v"], returncode=1, stdout="", stderr=""), + subprocess.CompletedProcess(args=["aws", "s3", "ls", "version"], returncode=1, stdout="", stderr=""), + # aws --version (main version) + subprocess.CompletedProcess(args=["aws", "--version"], returncode=0, stdout="aws-cli/2.0.0", stderr=""), ] - + expected_output = { - 'name': 'aws', - 'description': 'AWS CLI tool for managing Amazon Web Services', - 'usage': 'aws [options] [command] [command options]', - 'subcommands': [ + "name": "aws", + "subcommands": [ { - 'name': 'aws ec2', - 'description': 'EC2 service', - 'usage': 'aws ec2 [options] [command] [command options]', - 'subcommands': [ + "name": "aws ec2", + "description": "EC2 service", + "subcommands": [ { - 'name': 'aws ec2 describe-instances', - 'description': 'Describe EC2 instances', - 'usage': 'aws ec2 describe-instances [options]', - 'options': [ - { - 'option': '--filters', - 'description': 'Filters to apply to the request' - } - ], - 'subcommands': [] + "name": "aws ec2 describe-instances", + "description": "Describe EC2 instances", + "options": [{"option": "--filters", "description": "Filters to apply to the request"}], + "subcommands": [], + "aliases": [], } ], - 'options': [] + "options": [], + "aliases": [], }, { - 'name': 'aws s3', - 'description': 'S3 service', - 'usage': 'aws s3 [options] [command] [command options]', - 'subcommands': [ + "name": "aws s3", + "description": "S3 service", + "subcommands": [ { - 'name': 'aws s3 ls', - 'description': 'List S3 buckets', - 'usage': 'aws s3 ls [options]', - 'options': [ - { - 'option': '--profile', - 'description': 'Specify the profile to use' - } - ], - 'subcommands': [] + "name": "aws s3 ls", + "description": "List S3 buckets", + "options": [{"option": "--profile", "description": "Specify the profile to use"}], + "subcommands": [], + "aliases": [], } ], - 'options': [] - } + "options": [], + "aliases": [], + }, ], + "options": [], + "aliases": [], } - - result = main("aws") - compare_dicts(result, expected_output) + + # Mock OpenAI API responses + def mock_openai_response(*args, **kwargs): + """Mock OpenAI API calls based on the input""" + response = MagicMock() + response.raise_for_status = MagicMock() + + # Extract the help text from the request + help_text = kwargs["json"]["messages"][-1]["content"] + + # Determine which response to return based on the help text + if "AWS CLI tool" in help_text and "Available Commands:" in help_text and "ec2" in help_text: + # Main aws help + ai_response = { + "subcommands": [ + {"name": "ec2", "description": "EC2 service"}, + {"name": "s3", "description": "S3 service"}, + ], + "options": [], + "aliases": [], + } + elif "EC2 service" in help_text and "describe-instances" in help_text: + # aws ec2 help + ai_response = { + "subcommands": [{"name": "describe-instances", "description": "Describe EC2 instances"}], + "options": [], + "aliases": [], + } + elif ( + "Describe EC2 instances" in help_text and "--filters" in help_text and "Available Commands" not in help_text + ): + # aws ec2 describe-instances help + ai_response = { + "subcommands": [], + "options": [{"option": "--filters", "description": "Filters to apply to the request"}], + "aliases": [], + } + elif "S3 service" in help_text and "ls" in help_text: + # aws s3 help + ai_response = { + "subcommands": [{"name": "ls", "description": "List S3 buckets"}], + "options": [], + "aliases": [], + } + elif "List S3 buckets" in help_text and "--profile" in help_text: + # aws s3 ls help + ai_response = { + "subcommands": [], + "options": [{"option": "--profile", "description": "Specify the profile to use"}], + "aliases": [], + } + elif "aws-cli/2.0.0" in help_text: + # Version response + ai_response = {"version": "2.0.0"} + else: + # Default empty response + ai_response = {"subcommands": [], "options": [], "aliases": []} + + response.json.return_value = {"choices": [{"message": {"content": json.dumps(ai_response)}}]} + return response + + with patch("requests.post", side_effect=mock_openai_response): + result = parse_binary("aws") + compare_dicts(result, expected_output)