#!/usr/bin/env python3 """ AOC Sync - Polls git repositories containing Advent of Code implementations and generates performance comparison reports. """ import os import sys import yaml import json import sqlite3 import subprocess import shutil import re import time import logging from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class PerformanceResult: """Stores performance data for a single part of a day""" user: str year: int day: int part: int time_ns: int # Time in nanoseconds timestamp: str class Config: """Configuration manager""" def __init__(self, config_path: str = "config.yaml"): self.config_path = config_path self.config = self._load_config() def _load_config(self) -> dict: """Load configuration from YAML file""" if not os.path.exists(self.config_path): logger.error(f"Config file not found: {self.config_path}") sys.exit(1) with open(self.config_path, 'r') as f: return yaml.safe_load(f) @property def poll_interval(self) -> int: return self.config.get('poll_interval', 300) @property def output_dir(self) -> str: return self.config.get('output_dir', 'output') @property def data_dir(self) -> str: return self.config.get('data_dir', 'data') @property def repositories(self) -> List[dict]: return self.config.get('repositories', []) @property def compare_years(self) -> Optional[List[int]]: return self.config.get('compare_years') @property def compare_days(self) -> Optional[List[int]]: return self.config.get('compare_days') class Database: """SQLite database for storing performance results""" def __init__(self, db_path: str): self.db_path = db_path os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else '.', exist_ok=True) self._init_db() def _init_db(self): """Initialize database schema""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS results ( id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT NOT NULL, year INTEGER NOT NULL, day INTEGER NOT NULL, part INTEGER NOT NULL, time_ns INTEGER NOT NULL, timestamp TEXT NOT NULL, UNIQUE(user, year, day, part, timestamp) ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_user_year_day_part ON results(user, year, day, part) ''') conn.commit() conn.close() def insert_result(self, result: PerformanceResult): """Insert a performance result""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' INSERT OR REPLACE INTO results (user, year, day, part, time_ns, timestamp) VALUES (?, ?, ?, ?, ?, ?) ''', (result.user, result.year, result.day, result.part, result.time_ns, result.timestamp)) conn.commit() except sqlite3.IntegrityError: # Already exists, skip pass finally: conn.close() def get_latest_results(self, years: Optional[List[int]] = None, days: Optional[List[int]] = None) -> List[Dict]: """Get latest performance results for each user/day/part""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = ''' SELECT user, year, day, part, time_ns, timestamp FROM results r1 WHERE timestamp = ( SELECT MAX(timestamp) FROM results r2 WHERE r2.user = r1.user AND r2.year = r1.year AND r2.day = r1.day AND r2.part = r1.part ) ''' conditions = [] params = [] if years: placeholders = ','.join('?' * len(years)) conditions.append(f'year IN ({placeholders})') params.extend(years) if days: placeholders = ','.join('?' * len(days)) conditions.append(f'day IN ({placeholders})') params.extend(days) if conditions: query += ' AND ' + ' AND '.join(conditions) query += ' ORDER BY year, day, part, user' cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [ { 'user': row[0], 'year': row[1], 'day': row[2], 'part': row[3], 'time_ns': row[4], 'timestamp': row[5] } for row in rows ] def get_all_users(self) -> List[str]: """Get list of all users""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT DISTINCT user FROM results') users = [row[0] for row in cursor.fetchall()] conn.close() return users def get_all_years(self) -> List[int]: """Get list of all years""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT DISTINCT year FROM results ORDER BY year') years = [row[0] for row in cursor.fetchall()] conn.close() return years class GitManager: """Manages git repository operations""" @staticmethod def clone_or_update_repo(url: str, local_path: str) -> bool: """Clone repository if it doesn't exist, or update if it does""" local_path = Path(local_path) if local_path.exists() and (local_path / '.git').exists(): # Update existing repository logger.info(f"Updating repository: {local_path}") try: subprocess.run( ['git', 'fetch', 'origin'], cwd=local_path, check=True, capture_output=True ) subprocess.run( ['git', 'reset', '--hard', 'origin/master'], cwd=local_path, check=True, capture_output=True ) # Try main branch if master fails except subprocess.CalledProcessError: try: subprocess.run( ['git', 'reset', '--hard', 'origin/main'], cwd=local_path, check=True, capture_output=True ) except subprocess.CalledProcessError as e: logger.error(f"Failed to update {local_path}: {e}") return False return True else: # Clone new repository logger.info(f"Cloning repository: {url} to {local_path}") local_path.parent.mkdir(parents=True, exist_ok=True) try: subprocess.run( ['git', 'clone', url, str(local_path)], check=True, capture_output=True ) return True except subprocess.CalledProcessError as e: logger.error(f"Failed to clone {url}: {e}") return False @staticmethod def has_changes(url: str, local_path: str) -> bool: """Check if remote repository has changes""" local_path = Path(local_path) if not local_path.exists() or not (local_path / '.git').exists(): return True # Needs to be cloned try: # Fetch latest changes subprocess.run( ['git', 'fetch', 'origin'], cwd=local_path, check=True, capture_output=True ) # Check if local is behind remote result = subprocess.run( ['git', 'rev-list', '--count', 'HEAD..origin/master'], cwd=local_path, capture_output=True, text=True ) if result.returncode != 0: # Try main branch result = subprocess.run( ['git', 'rev-list', '--count', 'HEAD..origin/main'], cwd=local_path, capture_output=True, text=True ) if result.returncode == 0: behind_count = int(result.stdout.strip()) return behind_count > 0 return False except Exception as e: logger.error(f"Error checking for changes: {e}") return True # Assume changes to be safe class CargoAOCRunner: """Runs cargo-aoc benchmarks and parses results""" @staticmethod def find_implemented_days(repo_path: Path) -> List[int]: """Find which days are implemented in the repository""" days = [] # Look for common patterns: src/bin/day01.rs, src/day01.rs, etc. patterns = [ repo_path / 'src' / 'bin' / 'day*.rs', repo_path / 'src' / 'day*.rs', repo_path / 'src' / '**' / 'day*.rs', ] for pattern in patterns: for day_file in repo_path.glob(str(pattern.relative_to(repo_path))): match = re.search(r'day(\d+)', day_file.name) if match: day_num = int(match.group(1)) if day_num not in days: days.append(day_num) # Also check for Cargo.toml with day references cargo_toml = repo_path / 'Cargo.toml' if cargo_toml.exists(): with open(cargo_toml, 'r') as f: content = f.read() for match in re.finditer(r'day(\d+)', content): day_num = int(match.group(1)) if day_num not in days: days.append(day_num) return sorted(days) @staticmethod def extract_years_from_repo(repo_path: Path) -> List[int]: """Try to extract year(s) from repository path, name, or structure""" years = [] repo_path = Path(repo_path) # Check path name path_str = str(repo_path) for year_match in re.finditer(r'(\d{4})', path_str): year = int(year_match.group(1)) if 2015 <= year <= 2030 and year not in years: # Reasonable range years.append(year) # Check for year directories (common pattern: src/2023/, year2023/, etc.) for item in repo_path.iterdir(): if item.is_dir(): year_match = re.search(r'(\d{4})', item.name) if year_match: year = int(year_match.group(1)) if 2015 <= year <= 2030 and year not in years: years.append(year) # Check Cargo.toml cargo_toml = repo_path / 'Cargo.toml' if cargo_toml.exists(): with open(cargo_toml, 'r') as f: content = f.read() for year_match in re.finditer(r'(\d{4})', content): year = int(year_match.group(1)) if 2015 <= year <= 2030 and year not in years: years.append(year) return sorted(years) if years else [] @staticmethod def run_benchmarks(repo_path: Path, year: Optional[int] = None, user: str = "unknown") -> List[PerformanceResult]: """Run cargo aoc benchmarks and parse results""" results = [] repo_path = Path(repo_path) if not (repo_path / 'Cargo.toml').exists(): logger.warning(f"No Cargo.toml found in {repo_path}") return results days = CargoAOCRunner.find_implemented_days(repo_path) logger.info(f"Found {len(days)} implemented days in {repo_path}") for day in days: try: # Run cargo aoc bench for this day cmd = ['cargo', 'aoc', 'bench', '--day', str(day)] if year: cmd.extend(['--year', str(year)]) result = subprocess.run( cmd, cwd=repo_path, capture_output=True, text=True, timeout=300 # 5 minute timeout per day ) if result.returncode != 0: logger.warning(f"cargo aoc bench failed for day {day}: {result.stderr}") continue # Parse output for performance data # Try to extract year from output if not provided actual_year = year if not actual_year: # Look for year in output year_match = re.search(r'(\d{4})', result.stdout) if year_match: potential_year = int(year_match.group(1)) if 2015 <= potential_year <= 2030: actual_year = potential_year if not actual_year: logger.warning(f"Could not determine year for day {day}, skipping") continue day_results = CargoAOCRunner._parse_benchmark_output( result.stdout, day, actual_year, user ) results.extend(day_results) except subprocess.TimeoutExpired: logger.error(f"Timeout running benchmarks for day {day}") except Exception as e: logger.error(f"Error running benchmarks for day {day}: {e}") return results @staticmethod def _parse_benchmark_output(output: str, day: int, year: int, user: str) -> List[PerformanceResult]: """Parse cargo-aoc benchmark output""" results = [] timestamp = datetime.now().isoformat() # Pattern: "Day X - Part Y: XXX.XXX ns (XXX.XXX ms)" # or "Day X - Part Y: XXX.XXX ns" # Also handles formats like "Day 1 Part 1", "day01-part1", etc. patterns = [ r'Day\s+(\d+)\s*-\s*Part\s+(\d+)[:\s]+([\d.]+)\s*ns', r'day\s+(\d+)\s*-\s*part\s+(\d+)[:\s]+([\d.]+)\s*ns', r'Day\s+(\d+)\s+Part\s+(\d+)[:\s]+([\d.]+)\s*ns', r'day\s+(\d+)\s+part\s+(\d+)[:\s]+([\d.]+)\s*ns', r'day(\d+)\s*-\s*part(\d+)[:\s]+([\d.]+)\s*ns', r'(\d+)\s*-\s*(\d+)[:\s]+([\d.]+)\s*ns', # Handle microseconds and milliseconds too r'Day\s+(\d+)\s*-\s*Part\s+(\d+)[:\s]+([\d.]+)\s*(?:ns|μs|us|ms)', ] for pattern in patterns: for match in re.finditer(pattern, output, re.IGNORECASE): part_day = int(match.group(1)) part_num = int(match.group(2)) time_str = match.group(3) # Use the day from the match if available, otherwise use provided day actual_day = part_day if part_day > 0 else day try: time_ns = int(float(time_str)) # Check if the time unit is in the match (for patterns that include it) # If not, assume nanoseconds (most common) # Look for unit in the original match context match_text = match.group(0).lower() if 'ms' in match_text or 'millisecond' in match_text: time_ns = int(time_ns * 1_000_000) # Convert ms to ns elif 'μs' in match_text or 'us' in match_text or 'microsecond' in match_text: time_ns = int(time_ns * 1_000) # Convert μs to ns # else: already in nanoseconds results.append(PerformanceResult( user=user, year=year, day=actual_day, part=part_num, time_ns=time_ns, timestamp=timestamp )) except ValueError: logger.warning(f"Could not parse time: {time_str}") return results class HTMLGenerator: """Generates HTML comparison pages""" def __init__(self, output_dir: str): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def generate(self, db: Database, config: Config): """Generate HTML comparison page""" years = config.compare_years or db.get_all_years() days = config.compare_days results = db.get_latest_results(years=years, days=days) users = db.get_all_users() # Organize data by year -> day -> part -> user data = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) for result in results: year = result['year'] day = result['day'] part = result['part'] user = result['user'] time_ns = result['time_ns'] # Only store if time_ns > 0 (valid result) if time_ns > 0: data[year][day][part][user] = time_ns html = self._generate_html(data, years, users) output_file = self.output_dir / 'index.html' with open(output_file, 'w') as f: f.write(html) logger.info(f"Generated HTML report: {output_file}") def _generate_html(self, data: dict, years: List[int], users: List[str]) -> str: """Generate HTML content""" html = f"""
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Years: {', '.join(map(str, sorted(years)))}
Users: {', '.join(sorted(users))}
| User | Time | Relative Speed |
|---|---|---|
| {user} | No data | - |
| {user} | {time_str} | {relative_str} |