Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 48 additions & 53 deletions Docker/Sonar/sonar.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ def find_key_recursive(obj, key):

def format_status(status_str):
"""Returns a rich-formatted status string."""
if status_str == "Completed": return f"[green]{status_str}[/green] ✅"
if status_str == "In Progress": return f"[yellow]{status_str}[/yellow] ⏳"
if status_str == "Quarantined": return f"[red]{status_str}[/red] ☠️"
if status_str == "Failed": return f"[bold red]{status_str}[/bold red] ❌"
if status_str == "Completed": return f"[green]{status_str}[/green]"
if status_str == "In Progress": return f"[yellow]{status_str}[/yellow]"
if status_str == "Quarantined": return f"[red]{status_str}[/red]"
if status_str == "Partial Quarantine": return f"[bold yellow]{status_str}[/bold yellow]"
if status_str == "Failed": return f"[bold red]{status_str}[/bold red]"
return status_str

def batch_delete_packages(workspace, repo, slugs):
Expand Down Expand Up @@ -266,7 +267,7 @@ def get_digest_data(workspace, repo, img, digest, ntag_display, platform="unknow
"is_child": True
}

def fetch_tag_data(workspace, repo, img, ntag, detailed=False, all=False):
def fetch_tag_data(workspace, repo, img, ntag, detailed=False, include_all=False):
"""Fetches the manifest list for a tag and returns a list of data dicts."""

manifest_url = f"{CLOUDSMITH_URL}/v2/{workspace}/{repo}/{img}/manifests/{ntag}"
Expand All @@ -279,7 +280,7 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, all=False):
is_list = 'manifests' in manifest_json

# Removed to allow single images to be processed by default
# if not is_list and not all:
# if not is_list and not include_all:
# return []

children = []
Expand All @@ -292,7 +293,8 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, all=False):
arch = p.get('architecture', 'unknown')
plat = f"{os_name}/{arch}"

if d and arch.lower() != 'unknown':
# Removed strict arch check to prevent filtering valid images with missing metadata
if d:
children.append({'digest': d, 'platform': plat})
else:
# Fallback
Expand All @@ -306,12 +308,23 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, all=False):
# Process children
children_data = []
total_downloads = 0
derived_status = None

if is_list:
for child in children:
data = get_digest_data(workspace, repo, img, child['digest'], ntag, platform=child['platform'])
children_data.append(data)
total_downloads += data['downloads']

# Check quarantine status of children
if children_data:
quarantined_count = sum(1 for c in children_data if "Quarantined" in c.get('status', ''))
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string matching logic "Quarantined" in c.get('status', '') will incorrectly match the new "Partial Quarantine" status as quarantined. This causes inaccurate quarantine counting when aggregating child statuses. Use exact string comparison: c.get('status', '') == "Quarantined" instead.

Suggested change
quarantined_count = sum(1 for c in children_data if "Quarantined" in c.get('status', ''))
quarantined_count = sum(1 for c in children_data if c.get('status', '') == "Quarantined")

Copilot uses AI. Check for mistakes.
count = len(children_data)

if quarantined_count == count:
derived_status = "Quarantined"
elif quarantined_count > 0:
derived_status = "Partial Quarantine"

# Fetch parent package info
api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=version:{ntag}"
Expand All @@ -338,6 +351,10 @@ def fetch_tag_data(workspace, repo, img, ntag, detailed=False, all=False):
if arch and arch.lower() != 'unknown':
parent_platform = arch

# Override parent status if derived from children
if derived_status:
parent_status = derived_status

# Fallback: Fetch config blob for single images to determine platform
if not is_list and (parent_platform == "unknown" or not parent_platform):
# 1. Check for Schema 1 top-level architecture
Expand Down Expand Up @@ -626,47 +643,24 @@ def get_untagged_images(workspace, repo, img, delete=False, detailed=False, prog

return groups

# filepath: /Users/cmoynes/dev/support-engineering/Docker/Cloudsmith Sonar/sonar.py
# filepath: /Users/cmoynes/dev/support-engineering/Docker/Sonar/sonar.py
# --- Core Logic ---

def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, all=False):
# Switch to Cloudsmith API to avoid upstream tags and allow filtering
api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/"

# Construct query: format:docker AND name:{img_name} (if provided)
query_parts = ["format:docker"]
if img_name:
query_parts.append(f"name:{img_name}")

query = urlencode({'query': " AND ".join(query_parts)})
next_url = f"{api_url}?{query}"
def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=None, detailed=False, progress=None, include_all=False):
# Fetch all tags (including untagged if requested, but logic handled separately)
api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=name:{img_name}"
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query parameter is not URL-encoded, which can cause request failures if img_name contains special characters like spaces, slashes, or ampersands. Use urlencode to properly encode the query parameter as was done in the original implementation.

Suggested change
api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?query=name:{img_name}"
query_params = urlencode({"query": f"name:{img_name}"})
api_url = f"https://api.cloudsmith.io/v1/packages/{workspace}/{repo}/?{query_params}"

Copilot uses AI. Check for mistakes.
packages = make_request(api_url, {"Cache-Control": "no-cache"})
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pagination logic has been removed, limiting results to the first page only (default page size varies by API). For images with many packages/tags, this will silently skip data. The original implementation correctly handled pagination via the Link header to retrieve all results across multiple pages.

Copilot uses AI. Check for mistakes.

if not packages:
logger.info(f"No packages found for image: {img_name}")
return None

# Extract tags from package data
tags = set()

# Pagination Loop
while next_url:
result = make_request(next_url, {"Cache-Control": "no-cache"}, return_headers=True)
if not result:
break

data, headers = result

for pkg in data:
# pkg['tags'] is a dict like {'version': [...]}
version_tags = pkg.get('tags', {}).get('version', [])
for t in version_tags:
tags.add(t)

# Handle Pagination via Link header
next_url = None
link_header = headers.get('Link')
if link_header:
links = link_header.split(',')
for link in links:
if 'rel="next"' in link:
# Format: <url>; rel="next"
next_url = link.split(';')[0].strip('<> ')
break
for pkg in packages:
pkg_tags = pkg.get('tags', {}).get('version', [])
for t in pkg_tags:
tags.add(t)

sorted_tags = sorted(list(tags))

Expand All @@ -681,15 +675,16 @@ def get_image_analysis(workspace, repo, img_name, delete_all=False, delete_tag=N
task_id = progress.add_task(f"[cyan]Analyzing {img_name}[/cyan] ({len(sorted_tags)} tags)", total=len(sorted_tags))

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, all): t for t in sorted_tags}
future_to_tag = {executor.submit(fetch_tag_data, workspace, repo, img_name, t, detailed, include_all): t for t in sorted_tags}

results = {}
for future in concurrent.futures.as_completed(future_to_tag):
tag = future_to_tag[future]
try:
results[tag] = future.result()
except Exception:
pass
except Exception as e:
# Log the error so we know why it failed instead of failing silently
logger.error(f"Failed to fetch tag data for {tag}: {e}")

if progress and task_id is not None:
progress.advance(task_id)
Expand Down Expand Up @@ -794,7 +789,7 @@ def render_table(image_name, groups, is_untagged=False, has_action=False):
parent.get("type", ""),
parent.get("platform", ""),
format_status(parent.get("status", "")),
f"[green]{parent.get('downloads', 0)}[/green]",
f"[bold cyan]{parent.get('downloads', 0)}[/bold cyan]",
f"[dim]{parent.get('digest', '')}[/dim]",
action_str if has_action else None
)
Expand All @@ -804,8 +799,8 @@ def render_table(image_name, groups, is_untagged=False, has_action=False):
f"[magenta]{parent.get('type', 'manifest/list')}[/magenta]",
parent.get('platform', 'multi'),
format_status(parent.get("status", "")),
f"[green]{parent.get('downloads', 0)}[/green]",
f"[dim]{parent.get('digest', '')}[/dim]"
f"[bold cyan]{parent.get('downloads', 0)}[/bold cyan]",
f"[bold cyan]{parent.get('digest', '')}[/bold cyan]"
]
if has_action:
row_data.append(action_str)
Expand All @@ -819,11 +814,11 @@ def render_table(image_name, groups, is_untagged=False, has_action=False):
table.add_section()
else:
row_data = [
f" └─ {row.get('tag', '')}",
row.get("type", ""),
f" └─ [dim]{row.get('tag', '')}[/dim]",
f"[white]{row.get('type', '')}[/white]",
row.get("platform", ""),
format_status(row.get("status", "")),
f"[green]{row.get('downloads', 0)}[/green]",
f"[dim]{row.get('downloads', 0)}[/dim]",
f"[dim]{row.get('digest', '')}[/dim]"
]
if has_action:
Expand Down
Loading