diff --git a/CHANGELOG.md b/CHANGELOG.md index 56e58e6..8742f40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ## [Unreleased] +### Sonar +- Added `--filter` flag which accepts the (Package Search Syntax)[https://docs.cloudsmith.com/artifact-management/search-filter-sort-packages] string. e.g. `--filter "downloads:>0"`. + ## [Sonar] [v1.1] [2026-01-07] ### Update diff --git a/Docker/Sonar/README.md b/Docker/Sonar/README.md index 7b826cb..31c5ed7 100644 --- a/Docker/Sonar/README.md +++ b/Docker/Sonar/README.md @@ -90,7 +90,8 @@ Here is a summary of its capabilities: | `--delete-tag` | Deletes a specific tag from the repository. | | `--delete-all` | Wipes all images and manifest lists detected by the scan. | | `--force` | Force deletion without interactive prompt. | - | `--output` | Use `json` value to output results to JSON format . | + | `--output` | Use `json` value to output results to JSON format. | + | `--filter` | Query using Package Syntax Filtering. | 3. **Examples** - Get a summary of all tags for my-image: @@ -108,6 +109,11 @@ Here is a summary of its capabilities: python3 sonar.py my-org my-repo my-image --untagged ``` + - Query using Package Syntax Filtering: + ```bash + python3 sonar.py my-org my-repo --filter "uploaded:<'30 days ago' AND downloads:>0" + ``` + - Delete untagged manifest lists: ```bash python3 sonar.py my-org my-repo my-image --untagged-delete diff --git a/Docker/Sonar/sonar.py b/Docker/Sonar/sonar.py index 24289fe..baf9c0d 100755 --- a/Docker/Sonar/sonar.py +++ b/Docker/Sonar/sonar.py @@ -267,7 +267,7 @@ def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknow "is_child": True } -def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False): +def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False, filtering_digests=None): """Fetches the manifest list for a tag and returns a list of data dicts.""" manifest_url = f"{CLOUDSMITH_URL}/v2/{workspace}/{repo}/{img}/manifests/{ntag}" @@ -302,39 +302,19 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False for d in digests: children.append({'digest': d, 'platform': 'unknown'}) - if is_list and not children: + if is_list and not children and filtering_digests is None: return [] - # Process children - children_data = [] - total_downloads = 0 - derived_status = None - - if is_list: - for child in children: - data = get_digest_data(workspace, repo, img, child['digest'], ntag, platform=child['platform']) - children_data.append(data) - total_downloads += data['downloads'] - - # Check quarantine status of children - if children_data: - quarantined_count = sum(1 for c in children_data if "Quarantined" in c.get('status', '')) - count = len(children_data) - - if quarantined_count == count: - derived_status = "Quarantined" - elif quarantined_count > 0: - derived_status = "Partial Quarantine" - # Fetch parent package info - api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=version:{ntag}" + api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=tag:{ntag}" pkg_details = make_request(api_url, {"Cache-Control": "no-cache"}) parent_status = "Unknown" index_digest = "" slug = "" parent_platform = "multi" if is_list else "unknown" - + total_downloads = 0 + if pkg_details and len(pkg_details) > 0: pkg = pkg_details[0] parent_status = pkg.get('status_str', 'Unknown') @@ -351,6 +331,53 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False if arch and arch.lower() != 'unknown': parent_platform = arch + # Apply Filtering if active + parent_matched = True + if filtering_digests is not None: + # Check if parent digest is in filter + # Handle potential prefix differences (sha256: vs raw) + raw_index = index_digest.replace("sha256:", "") + norm_index = index_digest if index_digest.startswith("sha256:") else f"sha256:{index_digest}" + + parent_matched = (index_digest in filtering_digests) or (raw_index in filtering_digests) or (norm_index in filtering_digests) + + if not parent_matched: + # Parent didn't match, so filter children to find matches + filtered_children = [] + for c in children: + d = c['digest'] + raw_d = d.replace("sha256:", "") + norm_d = d if d.startswith("sha256:") else f"sha256:{d}" + if (d in filtering_digests) or (raw_d in filtering_digests) or (norm_d in filtering_digests): + filtered_children.append(c) + + if not filtered_children: + # No match in parent or ANY children -> exclude this tag + return [] + + # Update children list to only matched children + children = filtered_children + + # Process children + children_data = [] + derived_status = None + + if is_list: + for child in children: + data = get_digest_data(workspace, repo, img, child['digest'], ntag, platform=child['platform']) + children_data.append(data) + total_downloads += data['downloads'] + + # Check quarantine status of children + if children_data: + quarantined_count = sum(1 for c in children_data if "Quarantined" in c.get('status', '')) + count = len(children_data) + + if quarantined_count == count: + derived_status = "Quarantined" + elif quarantined_count > 0: + derived_status = "Partial Quarantine" + # Override parent status if derived from children if derived_status: parent_status = derived_status @@ -373,8 +400,6 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False b_arch = blob_data.get('architecture') if b_arch: parent_platform = f"{b_os}/{b_arch}" - else: - logger.debug(f"Failed to fetch config blob for {img}:{ntag} ({cfg_digest})") results = [] # Parent Data @@ -390,7 +415,12 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False }) # Children Data - if detailed and is_list: + # Force show children if we are in filtering mode and matched children (context is important) + show_children = detailed + if filtering_digests is not None and not parent_matched and children_data: + show_children = True + + if show_children and is_list: results.extend(children_data) return results @@ -646,44 +676,83 @@ def get_untagged_images(workspace, repo, img, delete=False, detailed=False, prog # filepath: /Users/cmoynes/dev/support-engineering/Docker/Sonar/sonar.py # --- Core Logic --- -def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, include_all=False): +def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, include_all=False, query_filter=None): # Fetch all tags (including untagged if requested, but logic handled separately) - api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=name:{img_name}" + + query_parts = [f"name:{img_name}"] + if query_filter: + query_parts.append(query_filter) + + full_query = " AND ".join(query_parts) + qs = urlencode({'query': full_query}) + + api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?{qs}" packages = make_request(api_url, {"Cache-Control": "no-cache"}) if not packages: logger.info(f"No packages found for image: {img_name}") return None - # Extract tags from package data + # Extract tags and matched digests (targets) tags = set() + matched_digests = set() + for pkg in packages: + # Collect tags pkg_tags = pkg.get('tags', {}).get('version', []) for t in pkg_tags: tags.add(t) + + # Collect digests (often the 'version' field in API for images) + v = pkg.get('version') + if v: + # Normalize to sha256: for consistent strings + if not v.startswith('sha256:') and len(v) == 64: + matched_digests.add(f"sha256:{v}") + else: + matched_digests.add(v) + + # If filter matched packages (e.g. children) but no direct tags were found, we need to find their parents. + if packages and not tags: + logger.info(f"Filter matched {len(packages)} objects but no direct tags. Performing parent lookup...") + + # Search for ALL manifest lists for this image to identify parents of the matched digests + all_ml_query = urlencode({'query': f"name:{img_name} AND NOT architecture:**"}) + api_ml_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?{all_ml_query}" + all_manifests = make_request(api_ml_url, {"Cache-Control": "no-cache"}) + + if all_manifests: + for ml in all_manifests: + t_list = ml.get('tags', {}).get('version', []) + for t in t_list: + tags.add(t) sorted_tags = sorted(list(tags)) if not sorted_tags: - logger.info(f"No tags found for image: {img_name}") + logger.info(f"No tags found for image: {img_name} after filter processing.") return None groups = [] + # Create filtering set (if valid filter) + filtering_set = matched_digests if query_filter else None + task_id = None if progress: task_id = progress.add_task(f"[cyan]Analyzing {img_name}[/cyan] ({len(sorted_tags)} tags)", total=len(sorted_tags)) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, include_all): t for t in sorted_tags} + # Pass filtering_set to fetch_tag_data + future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, include_all, filtering_set): t for t in sorted_tags} results = {} for future in concurrent.futures.as_completed(future_to_tag): tag = future_to_tag[future] try: - results[tag] = future.result() + data = future.result() + results[tag] = data except Exception as e: - # Log the error so we know why it failed instead of failing silently logger.error(f"Failed to fetch tag data for {tag}: {e}") if progress and task_id is not None: @@ -691,7 +760,9 @@ def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=N for t in sorted_tags: if t in results: - groups.append(results[t]) + # only append if we actually got results back (filtering might have returned empty) + if results[t]: + groups.append(results[t]) if progress and task_id is not None: progress.remove_task(task_id) @@ -756,7 +827,7 @@ def process_image(org, repo, img_name, args, progress=None): should_run_standard = (not (args.untagged or args.untagged_delete)) if should_run_standard: - tagged_groups = get_image_analysis(org, repo, img_name, delete_all=args.delete_all, delete_tag=args.delete_tag, detailed=args.detailed, progress=progress) + tagged_groups = get_image_analysis(org, repo, img_name, delete_all=args.delete_all, delete_tag=args.delete_tag, detailed=args.detailed, progress=progress, query_filter=args.filter) if tagged_groups: results.extend(tagged_groups) @@ -838,6 +909,7 @@ def main(): parser.add_argument("--untagged-delete", action="store_true", help="Delete untagged manifest lists") parser.add_argument("--delete-all", action="store_true", help="Delete ALL detected manifest lists") parser.add_argument("--delete-tag", help="Delete manifest lists matching this specific tag") + parser.add_argument("--filter", help="Filter packages using Cloudsmith search syntax (e.g. 'version:^1.0' or 'tag:latest')") parser.add_argument("--detailed", action="store_true", help="Show detailed breakdown of digests") parser.add_argument("--output", choices=['table', 'json'], default='table', help="Output format (default: table)") parser.add_argument("--debug-log", action="store_true", help="Enable debug logging to file") @@ -874,6 +946,33 @@ def main(): if args.img: images_to_scan.append(args.img) + elif args.filter: + if args.output == 'table': + console.print(f"[bold]Searching for packages matching filter: '{args.filter}'...[/bold]") + + logger.info(f"Searching for packages matching filter: {args.filter}") + # Search for packages to identify which images to scan + # We append format:docker to ensure we only get docker images + search_query = f"{args.filter} AND format:docker" + full_query = urlencode({'query': search_query}) + search_url = f"https://api.cloudsmith.io/v1/packages/{args.org}/{args.repo}/?{full_query}" + + found_packages = make_request(search_url, {"Cache-Control": "no-cache"}) + + if found_packages: + names = set() + for pkg in found_packages: + if 'name' in pkg: + names.add(pkg['name']) + images_to_scan = sorted(list(names)) + logger.info(f"Found {len(images_to_scan)} images matching filter.") + else: + msg = "No packages found matching the provided filter." + if args.output == 'table': + console.print(f"[yellow]{msg}[/yellow]") + logger.info(msg) + # Exit cleanly if nothing found + sys.exit(0) else: if args.output == 'table': console.print(f"[bold]Fetching catalog for {args.org}/{args.repo}...[/bold]")