#!/usr/bin/env python3 """ TSHARPS CI — Webhook Receiver Lightweight HTTP server that listens for Gitea push webhooks and spawns the CI runner as a background process. Returns 200 immediately. If the runner subprocess crashes, sends a distinct "CI RUNNER ERROR" alert. """ import hashlib import hmac import json import os import subprocess import sys from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path SCRIPT_DIR = Path(__file__).resolve().parent CONFIG_PATH = SCRIPT_DIR / "ci-config.json" def load_config(): with open(CONFIG_PATH) as f: return json.load(f) CONFIG = load_config() PORT = CONFIG.get("webhook_port", 9500) SECRET = CONFIG.get("webhook_secret", "").encode() def verify_signature(payload: bytes, signature: str) -> bool: """Verify Gitea webhook HMAC signature.""" if not SECRET: return True # No secret configured — accept all expected = hmac.new(SECRET, payload, hashlib.sha256).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) class WebhookHandler(BaseHTTPRequestHandler): def do_POST(self): if self.path != "/ci": self.send_response(404) self.end_headers() return content_length = int(self.headers.get("Content-Length", 0)) payload = self.rfile.read(content_length) # Verify signature if configured signature = self.headers.get("X-Gitea-Signature", "") if SECRET and not verify_signature(payload, signature): self.send_response(403) self.end_headers() self.wfile.write(b"Invalid signature") return # Parse push event try: data = json.loads(payload) except json.JSONDecodeError: self.send_response(400) self.end_headers() self.wfile.write(b"Invalid JSON") return # Extract branch, commit, actor ref = data.get("ref", "") branch = ref.replace("refs/heads/", "") if ref.startswith("refs/heads/") else ref commits = data.get("commits", []) commit = commits[-1]["id"][:7] if commits else data.get("after", "")[:7] actor = data.get("pusher", {}).get("login", "unknown") # Check if branch is in our config if branch not in CONFIG.get("branches", {}): self.send_response(200) self.end_headers() self.wfile.write(f"Branch '{branch}' not configured — skipping".encode()) return print(f"[CI] Push to {branch} ({commit}) by {actor} — spawning runner") # Spawn ci-runner.sh as background process runner_path = SCRIPT_DIR / "ci-runner.sh" try: proc = subprocess.Popen( ["bash", str(runner_path), branch, commit, actor], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=str(SCRIPT_DIR), ) # Non-blocking — don't wait for completion # But set up a thread to catch crashes import threading def _monitor(p, branch, commit, actor): stdout, _ = p.communicate() if p.returncode != 0: print(f"[CI] RUNNER CRASHED for {branch} ({commit}): exit {p.returncode}") # Send runner error notification notify_path = SCRIPT_DIR / "ci-notify.sh" subprocess.run( ["bash", str(notify_path), branch, commit, "error", f"Runner crashed with exit code {p.returncode}", "", "0s", actor], cwd=str(SCRIPT_DIR), ) else: print(f"[CI] Runner completed for {branch} ({commit})") t = threading.Thread(target=_monitor, args=(proc, branch, commit, actor), daemon=True) t.start() except Exception as e: print(f"[CI] Failed to spawn runner: {e}") # Send error notification directly notify_path = SCRIPT_DIR / "ci-notify.sh" subprocess.run( ["bash", str(notify_path), branch, commit, "error", f"Failed to spawn runner: {e}", "", "0s", actor], cwd=str(SCRIPT_DIR), ) # Return 200 immediately self.send_response(200) self.end_headers() self.wfile.write(f"CI triggered for {branch} ({commit})".encode()) def do_GET(self): """Health check endpoint.""" if self.path == "/health": self.send_response(200) self.end_headers() self.wfile.write(json.dumps({ "status": "healthy", "branches": list(CONFIG.get("branches", {}).keys()), "port": PORT, }).encode()) return self.send_response(404) self.end_headers() def log_message(self, format, *args): print(f"[CI-Webhook] {args[0]}") def main(): print(f"[CI] TSHARPS CI Webhook Receiver starting on port {PORT}") print(f"[CI] Configured branches: {list(CONFIG.get('branches', {}).keys())}") print(f"[CI] Signature validation: {'enabled' if SECRET else 'disabled'}") server = HTTPServer(("127.0.0.1", PORT), WebhookHandler) try: server.serve_forever() except KeyboardInterrupt: print("[CI] Shutting down") server.shutdown() if __name__ == "__main__": main()