import argparse import json import os import re import sys import urllib.request import urllib.error import hashlib def get_token(registry, repository): # 1. Check endpoint to get realm url = f"https://{registry}/v2/" print(f"Checking {url}...", file=sys.stderr) try: urllib.request.urlopen(url) return None # No auth needed? except urllib.error.HTTPError as e: if e.code != 401: # Some registries might return 404 or others, but 401 is expected for auth discovery # If 200, no auth. if e.code == 200: return None print(f"Initial check to {url} returned {e.code}: {e}", file=sys.stderr) # If we can't determine auth, assume none or fail later. # But usually 401 gives the realm. pass auth_header = e.headers.get("Www-Authenticate") if not auth_header: # If no auth header, maybe it's public? return None print(f"Auth header: {auth_header}", file=sys.stderr) realm_match = re.search(r'realm="([^"]+)"', auth_header) service_match = re.search(r'service="([^"]+)"', auth_header) if not realm_match: print("Could not find realm in Www-Authenticate", file=sys.stderr) return None realm = realm_match.group(1) service = service_match.group(1) if service_match else None token_url = f"{realm}?scope=repository:{repository}:pull" if service: token_url += f"&service={service}" print(f"Fetching token from {token_url}...", file=sys.stderr) req = urllib.request.Request(token_url) with urllib.request.urlopen(req) as r: data = json.loads(r.read()) return data.get("token") or data.get("access_token") def get_manifest(registry, repository, reference, token): url = f"https://{registry}/v2/{repository}/manifests/{reference}" # print(f"Fetching manifest from {url}...", file=sys.stderr) req = urllib.request.Request(url) if token: req.add_header("Authorization", f"Bearer {token}") # Accept OCI and Docker manifests req.add_header( "Accept", "application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json", ) with urllib.request.urlopen(req) as r: return json.loads(r.read()) def download_blob(registry, repository, digest, token, output_path): url = f"https://{registry}/v2/{repository}/blobs/{digest}" # print(f"Downloading blob from {url} to {output_path}...", file=sys.stderr) req = urllib.request.Request(url) if token: req.add_header("Authorization", f"Bearer {token}") with urllib.request.urlopen(req) as r: with open(output_path, "wb") as f: while True: chunk = r.read(8192) if not chunk: break f.write(chunk) def main(): parser = argparse.ArgumentParser(description="Pull Helm chart from OCI registry") parser.add_argument( "--url", required=True, help="OCI URL (e.g., oci://ghcr.io/stefanprodan/charts/podinfo)", ) parser.add_argument("--version", required=True, help="Chart version/tag") parser.add_argument("--output", required=True, help="Output file path (.tgz)") parser.add_argument( "--digest", help="Expected SHA256 digest of the content (optional validation)" ) args = parser.parse_args() if not args.url.startswith("oci://"): print("Error: URL must start with oci://", file=sys.stderr) sys.exit(1) # Parse URL # oci://ghcr.io/stefanprodan/charts/podinfo -> registry=ghcr.io, repo=stefanprodan/charts/podinfo path = args.url[6:] if "/" not in path: print("Error: Invalid OCI URL format", file=sys.stderr) sys.exit(1) registry, repository = path.split("/", 1) try: token = get_token(registry, repository) manifest = get_manifest(registry, repository, args.version, token) # Find chart layer chart_layer = None # Priority: Helm chart content, then generic OCI layer if only one? # But strictly speaking it should be 'application/vnd.cncf.helm.chart.content.v1.tar+gzip' # or 'application/tar+gzip' sometimes? valid_media_types = [ "application/vnd.cncf.helm.chart.content.v1.tar+gzip", "application/x-tar", # Sometimes used incorrectly? ] for layer in manifest.get("layers", []): if layer.get("mediaType") in valid_media_types: chart_layer = layer break if not chart_layer: # Fallback: check if config has the media type (sometimes manifests are weird) # or just take the first layer if it looks like a blob? # Let's be strict for now. print("Error: No Helm chart layer found in manifest", file=sys.stderr) print( f"Layers: {[l.get('mediaType') for l in manifest.get('layers', [])]}", file=sys.stderr, ) sys.exit(1) digest = chart_layer["digest"] print(f"Found layer digest: {digest}", file=sys.stderr) download_blob(registry, repository, digest, token, args.output) # Verify digest if provided if args.digest: sha256 = hashlib.sha256() with open(args.output, "rb") as f: while True: data = f.read(65536) if not data: break sha256.update(data) calculated_digest = "sha256:" + sha256.hexdigest() if calculated_digest != args.digest: print( f"Error: Digest mismatch. Expected {args.digest}, got {calculated_digest}", file=sys.stderr, ) sys.exit(1) print("Digest verified.", file=sys.stderr) except Exception as e: print(f"Error: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()