182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import urllib.request
|
|
import urllib.error
|
|
import hashlib
|
|
|
|
|
|
def get_token(registry, repository):
|
|
# 1. Check endpoint to get realm
|
|
url = f"https://{registry}/v2/"
|
|
print(f"Checking {url}...", file=sys.stderr)
|
|
try:
|
|
urllib.request.urlopen(url)
|
|
return None # No auth needed?
|
|
except urllib.error.HTTPError as e:
|
|
if e.code != 401:
|
|
# Some registries might return 404 or others, but 401 is expected for auth discovery
|
|
# If 200, no auth.
|
|
if e.code == 200:
|
|
return None
|
|
print(f"Initial check to {url} returned {e.code}: {e}", file=sys.stderr)
|
|
# If we can't determine auth, assume none or fail later.
|
|
# But usually 401 gives the realm.
|
|
pass
|
|
|
|
auth_header = e.headers.get("Www-Authenticate")
|
|
if not auth_header:
|
|
# If no auth header, maybe it's public?
|
|
return None
|
|
|
|
print(f"Auth header: {auth_header}", file=sys.stderr)
|
|
|
|
realm_match = re.search(r'realm="([^"]+)"', auth_header)
|
|
service_match = re.search(r'service="([^"]+)"', auth_header)
|
|
|
|
if not realm_match:
|
|
print("Could not find realm in Www-Authenticate", file=sys.stderr)
|
|
return None
|
|
|
|
realm = realm_match.group(1)
|
|
service = service_match.group(1) if service_match else None
|
|
|
|
token_url = f"{realm}?scope=repository:{repository}:pull"
|
|
if service:
|
|
token_url += f"&service={service}"
|
|
|
|
print(f"Fetching token from {token_url}...", file=sys.stderr)
|
|
req = urllib.request.Request(token_url)
|
|
with urllib.request.urlopen(req) as r:
|
|
data = json.loads(r.read())
|
|
return data.get("token") or data.get("access_token")
|
|
|
|
|
|
def get_manifest(registry, repository, reference, token):
|
|
url = f"https://{registry}/v2/{repository}/manifests/{reference}"
|
|
# print(f"Fetching manifest from {url}...", file=sys.stderr)
|
|
req = urllib.request.Request(url)
|
|
if token:
|
|
req.add_header("Authorization", f"Bearer {token}")
|
|
|
|
# Accept OCI and Docker manifests
|
|
req.add_header(
|
|
"Accept",
|
|
"application/vnd.oci.image.manifest.v1+json, application/vnd.docker.distribution.manifest.v2+json",
|
|
)
|
|
|
|
with urllib.request.urlopen(req) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def download_blob(registry, repository, digest, token, output_path):
|
|
url = f"https://{registry}/v2/{repository}/blobs/{digest}"
|
|
# print(f"Downloading blob from {url} to {output_path}...", file=sys.stderr)
|
|
|
|
req = urllib.request.Request(url)
|
|
if token:
|
|
req.add_header("Authorization", f"Bearer {token}")
|
|
|
|
with urllib.request.urlopen(req) as r:
|
|
with open(output_path, "wb") as f:
|
|
while True:
|
|
chunk = r.read(8192)
|
|
if not chunk:
|
|
break
|
|
f.write(chunk)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Pull Helm chart from OCI registry")
|
|
parser.add_argument(
|
|
"--url",
|
|
required=True,
|
|
help="OCI URL (e.g., oci://ghcr.io/stefanprodan/charts/podinfo)",
|
|
)
|
|
parser.add_argument("--version", required=True, help="Chart version/tag")
|
|
parser.add_argument("--output", required=True, help="Output file path (.tgz)")
|
|
parser.add_argument(
|
|
"--digest", help="Expected SHA256 digest of the content (optional validation)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.url.startswith("oci://"):
|
|
print("Error: URL must start with oci://", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Parse URL
|
|
# oci://ghcr.io/stefanprodan/charts/podinfo -> registry=ghcr.io, repo=stefanprodan/charts/podinfo
|
|
path = args.url[6:]
|
|
if "/" not in path:
|
|
print("Error: Invalid OCI URL format", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
registry, repository = path.split("/", 1)
|
|
|
|
try:
|
|
token = get_token(registry, repository)
|
|
manifest = get_manifest(registry, repository, args.version, token)
|
|
|
|
# Find chart layer
|
|
chart_layer = None
|
|
# Priority: Helm chart content, then generic OCI layer if only one?
|
|
# But strictly speaking it should be 'application/vnd.cncf.helm.chart.content.v1.tar+gzip'
|
|
# or 'application/tar+gzip' sometimes?
|
|
|
|
valid_media_types = [
|
|
"application/vnd.cncf.helm.chart.content.v1.tar+gzip",
|
|
"application/x-tar", # Sometimes used incorrectly?
|
|
]
|
|
|
|
for layer in manifest.get("layers", []):
|
|
if layer.get("mediaType") in valid_media_types:
|
|
chart_layer = layer
|
|
break
|
|
|
|
if not chart_layer:
|
|
# Fallback: check if config has the media type (sometimes manifests are weird)
|
|
# or just take the first layer if it looks like a blob?
|
|
# Let's be strict for now.
|
|
print("Error: No Helm chart layer found in manifest", file=sys.stderr)
|
|
print(
|
|
f"Layers: {[l.get('mediaType') for l in manifest.get('layers', [])]}",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
digest = chart_layer["digest"]
|
|
print(f"Found layer digest: {digest}", file=sys.stderr)
|
|
|
|
download_blob(registry, repository, digest, token, args.output)
|
|
|
|
# Verify digest if provided
|
|
if args.digest:
|
|
sha256 = hashlib.sha256()
|
|
with open(args.output, "rb") as f:
|
|
while True:
|
|
data = f.read(65536)
|
|
if not data:
|
|
break
|
|
sha256.update(data)
|
|
calculated_digest = "sha256:" + sha256.hexdigest()
|
|
if calculated_digest != args.digest:
|
|
print(
|
|
f"Error: Digest mismatch. Expected {args.digest}, got {calculated_digest}",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
print("Digest verified.", file=sys.stderr)
|
|
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
import traceback
|
|
|
|
traceback.print_exc(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|