#!/usr/bin/env python3 """ AOC Sync - Polls git repositories containing Advent of Code implementations and generates performance comparison reports. """ import os import sys import yaml import json import sqlite3 import subprocess import shutil import re import time import logging from pathlib import Path from datetime import datetime from typing import Dict, List, Optional, Tuple from dataclasses import dataclass, asdict from collections import defaultdict # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @dataclass class PerformanceResult: """Stores performance data for a single part of a day""" user: str year: int day: int part: int time_ns: int # Runner time in nanoseconds generator_time_ns: int = 0 # Generator time in nanoseconds (optional) git_rev: str = "" # Git revision (short hash) repo_url: str = "" # Repository URL timestamp: str = "" class Config: """Configuration manager""" def __init__(self, config_path: str = "config.yaml"): self.config_path = config_path self.config = self._load_config() def _load_config(self) -> dict: """Load configuration from YAML file""" if not os.path.exists(self.config_path): logger.error(f"Config file not found: {self.config_path}") sys.exit(1) with open(self.config_path, 'r') as f: return yaml.safe_load(f) @property def poll_interval(self) -> int: return self.config.get('poll_interval', 300) @property def output_dir(self) -> str: return self.config.get('output_dir', 'output') @property def data_dir(self) -> str: return self.config.get('data_dir', 'data') @property def repositories(self) -> List[dict]: return self.config.get('repositories', []) @property def compare_years(self) -> Optional[List[int]]: return self.config.get('compare_years') @property def compare_days(self) -> Optional[List[int]]: return self.config.get('compare_days') @property def rsync_config(self) -> Optional[dict]: return self.config.get('rsync') class Database: """SQLite database for storing performance results""" def __init__(self, db_path: str): self.db_path = db_path os.makedirs(os.path.dirname(db_path) if os.path.dirname(db_path) else '.', exist_ok=True) self._init_db() def _init_db(self): """Initialize database schema""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS results ( id INTEGER PRIMARY KEY AUTOINCREMENT, user TEXT NOT NULL, year INTEGER NOT NULL, day INTEGER NOT NULL, part INTEGER NOT NULL, time_ns INTEGER NOT NULL, generator_time_ns INTEGER NOT NULL DEFAULT 0, git_rev TEXT NOT NULL DEFAULT '', repo_url TEXT NOT NULL DEFAULT '', timestamp TEXT NOT NULL, UNIQUE(user, year, day, part, timestamp, git_rev) ) ''') # Add new columns if they don't exist (for existing databases) for column, col_type in [ ('generator_time_ns', 'INTEGER NOT NULL DEFAULT 0'), ('git_rev', 'TEXT NOT NULL DEFAULT \'\''), ('repo_url', 'TEXT NOT NULL DEFAULT \'\'') ]: try: cursor.execute(f'ALTER TABLE results ADD COLUMN {column} {col_type}') except sqlite3.OperationalError: # Column already exists pass cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_user_year_day_part ON results(user, year, day, part) ''') conn.commit() conn.close() def insert_result(self, result: PerformanceResult): """Insert a performance result""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() try: cursor.execute(''' INSERT OR REPLACE INTO results (user, year, day, part, time_ns, generator_time_ns, git_rev, repo_url, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (result.user, result.year, result.day, result.part, result.time_ns, result.generator_time_ns, result.git_rev, result.repo_url, result.timestamp)) conn.commit() except sqlite3.IntegrityError: # Already exists, skip pass finally: conn.close() def get_latest_results(self, years: Optional[List[int]] = None, days: Optional[List[int]] = None) -> List[Dict]: """Get latest performance results for each user/day/part If years is None, returns all years. If days is None, returns all days. """ conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = ''' SELECT user, year, day, part, time_ns, generator_time_ns, git_rev, repo_url, timestamp FROM results r1 WHERE timestamp = ( SELECT MAX(timestamp) FROM results r2 WHERE r2.user = r1.user AND r2.year = r1.year AND r2.day = r1.day AND r2.part = r1.part ) ''' conditions = [] params = [] if years is not None: placeholders = ','.join('?' * len(years)) conditions.append(f'year IN ({placeholders})') params.extend(years) if days is not None: placeholders = ','.join('?' * len(days)) conditions.append(f'day IN ({placeholders})') params.extend(days) if conditions: query += ' AND ' + ' AND '.join(conditions) query += ' ORDER BY year, day, part, user' cursor.execute(query, params) rows = cursor.fetchall() conn.close() return [ { 'user': row[0], 'year': row[1], 'day': row[2], 'part': row[3], 'time_ns': row[4], 'generator_time_ns': row[5] if len(row) > 5 else 0, 'git_rev': row[6] if len(row) > 6 else '', 'repo_url': row[7] if len(row) > 7 else '', 'timestamp': row[8] if len(row) > 8 else (row[6] if len(row) > 6 else '') } for row in rows ] def get_historical_results(self, user: str, year: int, day: int, part: int) -> List[Dict]: """Get historical results for a specific user/day/part""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT time_ns, generator_time_ns, git_rev, repo_url, timestamp FROM results WHERE user = ? AND year = ? AND day = ? AND part = ? ORDER BY timestamp DESC ''', (user, year, day, part)) rows = cursor.fetchall() conn.close() return [ { 'time_ns': row[0], 'generator_time_ns': row[1], 'git_rev': row[2], 'repo_url': row[3], 'timestamp': row[4] } for row in rows ] def get_all_users(self) -> List[str]: """Get list of all users""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT DISTINCT user FROM results') users = [row[0] for row in cursor.fetchall()] conn.close() return users def get_all_years(self) -> List[int]: """Get list of all years""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('SELECT DISTINCT year FROM results ORDER BY year') years = [row[0] for row in cursor.fetchall()] conn.close() return years class GitManager: """Manages git repository operations""" @staticmethod def clone_or_update_repo(url: str, local_path: str) -> bool: """Clone repository if it doesn't exist, or update if it does""" local_path = Path(local_path) if local_path.exists() and (local_path / '.git').exists(): # Update existing repository logger.info(f"Updating repository: {local_path}") try: subprocess.run( ['git', 'fetch', 'origin'], cwd=local_path, check=True, capture_output=True ) subprocess.run( ['git', 'reset', '--hard', 'origin/master'], cwd=local_path, check=True, capture_output=True ) # Try main branch if master fails except subprocess.CalledProcessError: try: subprocess.run( ['git', 'reset', '--hard', 'origin/main'], cwd=local_path, check=True, capture_output=True ) except subprocess.CalledProcessError as e: logger.error(f"Failed to update {local_path}: {e}") return False return True else: # Clone new repository logger.info(f"Cloning repository: {url} to {local_path}") local_path.parent.mkdir(parents=True, exist_ok=True) try: subprocess.run( ['git', 'clone', url, str(local_path)], check=True, capture_output=True ) return True except subprocess.CalledProcessError as e: logger.error(f"Failed to clone {url}: {e}") return False @staticmethod def has_changes(url: str, local_path: str) -> bool: """Check if remote repository has changes""" local_path = Path(local_path) if not local_path.exists() or not (local_path / '.git').exists(): return True # Needs to be cloned try: # Fetch latest changes subprocess.run( ['git', 'fetch', 'origin'], cwd=local_path, check=True, capture_output=True ) # Check if local is behind remote result = subprocess.run( ['git', 'rev-list', '--count', 'HEAD..origin/master'], cwd=local_path, capture_output=True, text=True ) if result.returncode != 0: # Try main branch result = subprocess.run( ['git', 'rev-list', '--count', 'HEAD..origin/main'], cwd=local_path, capture_output=True, text=True ) if result.returncode == 0: behind_count = int(result.stdout.strip()) return behind_count > 0 return False except Exception as e: logger.error(f"Error checking for changes: {e}") return True # Assume changes to be safe class CargoAOCRunner: """Runs cargo-aoc benchmarks and parses results""" @staticmethod def find_implemented_days(work_dir: Path) -> List[int]: """Find which days are implemented in the directory Args: work_dir: Directory to search (should be a year directory for single repos) """ days = [] work_dir = Path(work_dir) # Look for common patterns: src/bin/day01.rs, src/day01.rs, etc. patterns = [ 'src/bin/day*.rs', 'src/day*.rs', '**/src/bin/day*.rs', '**/src/day*.rs', ] for pattern in patterns: for day_file in work_dir.glob(pattern): match = re.search(r'day(\d+)', day_file.name) if match: day_num = int(match.group(1)) if day_num not in days: days.append(day_num) # Also check for Cargo.toml with day references cargo_toml = work_dir / 'Cargo.toml' if cargo_toml.exists(): with open(cargo_toml, 'r') as f: content = f.read() for match in re.finditer(r'day(\d+)', content): day_num = int(match.group(1)) if day_num not in days: days.append(day_num) return sorted(days) @staticmethod def extract_years_from_repo(repo_path: Path) -> List[int]: """Try to extract year(s) from repository structure For single repos, looks for year directories in the root (e.g., 2023/, 2024/) """ years = [] repo_path = Path(repo_path) # Check for year directories in root (e.g., 2023/, 2024/) # These should be directories with 4-digit year names for item in repo_path.iterdir(): if item.is_dir() and not item.name.startswith('.'): # Check if directory name is exactly a 4-digit year if re.match(r'^\d{4}$', item.name): year = int(item.name) if 2015 <= year <= 2030: # Reasonable range years.append(year) # Also check path name as fallback if not years: path_str = str(repo_path) for year_match in re.finditer(r'(\d{4})', path_str): year = int(year_match.group(1)) if 2015 <= year <= 2030 and year not in years: years.append(year) return sorted(years) if years else [] @staticmethod def get_git_rev(repo_path: Path) -> str: """Get short git revision hash""" try: result = subprocess.run( ['git', 'rev-parse', '--short', 'HEAD'], cwd=repo_path, capture_output=True, text=True, timeout=5 ) if result.returncode == 0: return result.stdout.strip() except Exception as e: logger.warning(f"Could not get git rev for {repo_path}: {e}") return "" @staticmethod def run_benchmarks(repo_path: Path, year: int, user: str = "unknown", repo_url: str = "", is_multi_year: bool = False) -> List[PerformanceResult]: """Run cargo aoc benchmarks and parse results Args: repo_path: Path to the repository root (for single repos) or year directory (for multi-year repos) year: The year to benchmark user: User name for the results repo_url: Repository URL for linking is_multi_year: True if this is a multi-year repo (repo_path is already the year directory) """ results = [] repo_path = Path(repo_path) # Get git revision git_rev = CargoAOCRunner.get_git_rev(repo_path) # Determine the working directory if is_multi_year: # For multi-year repos, repo_path is already the year directory work_dir = repo_path else: # For single repos, check if we need to navigate to a year subdirectory work_dir = repo_path year_dir = repo_path / str(year) if year_dir.exists() and year_dir.is_dir(): work_dir = year_dir logger.info(f"Using year directory: {work_dir}") # Get git rev from repo root, not year directory git_rev = CargoAOCRunner.get_git_rev(repo_path) if not (work_dir / 'Cargo.toml').exists(): logger.warning(f"No Cargo.toml found in {work_dir}") return results days = CargoAOCRunner.find_implemented_days(work_dir) logger.info(f"Found {len(days)} implemented days in {work_dir}") for day in days: try: logger.info(f"Running cargo aoc for {user} year {year} day {day} in {work_dir}") # Run cargo aoc for this day (no year flag, must be in correct directory) cmd = ['cargo', 'aoc', '--day', str(day)] result = subprocess.run( cmd, cwd=work_dir, capture_output=True, text=True, timeout=300 # 5 minute timeout per day ) if result.returncode != 0: logger.warning(f"cargo aoc failed for day {day} in {work_dir}: {result.stderr}") continue # Log output for debugging if no results found if not result.stdout.strip() and not result.stderr.strip(): logger.warning(f"No output from cargo aoc for {user} year {year} day {day}") # Parse output for runtime information day_results = CargoAOCRunner._parse_runtime_output( result.stdout, result.stderr, day, year, user, git_rev, repo_url ) if day_results: logger.info(f"Parsed {len(day_results)} runtime result(s) for {user} year {year} day {day}") else: # Log a sample of the output to help debug parsing issues output_sample = (result.stdout + "\n" + result.stderr).strip()[:500] logger.warning(f"No runtime data parsed for {user} year {year} day {day}. Output sample: {output_sample}") results.extend(day_results) except subprocess.TimeoutExpired: logger.error(f"Timeout running cargo aoc for day {day}") except Exception as e: logger.error(f"Error running cargo aoc for day {day}: {e}") return results @staticmethod def _parse_runtime_output(stdout: str, stderr: str, day: int, year: int, user: str, git_rev: str = "", repo_url: str = "") -> List[PerformanceResult]: """Parse cargo-aoc runtime output cargo aoc typically outputs timing information like: - "Day X - Part Y: XXX.XXX ms" - "Day X - Part Y: XXX.XXX μs" - "Day X - Part Y: XXX.XXX ns" - "Part Y: XXX.XXX ms" - Or similar formats """ results = [] timestamp = datetime.now().isoformat() # Combine stdout and stderr (timing info might be in either) output = stdout + "\n" + stderr # Patterns to match various cargo-aoc output formats # Common formats: # "Day 1 - Part 1: 123.456 ms" # "Day 1 Part 1: 123.456 ms" # "day 1 - part 1: 123.456 ms" # "Part 1: 123.456 ms" (when day is already known) # Also handle formats like "Day 01 - Part 1" or "Day 1, Part 1" # And the format with generator/runner on separate lines: # "Day 2 - Part 1 : " # " generator: 5.651µs," # " runner: 3.07µs" patterns = [ # Format with generator/runner on separate lines (most common cargo-aoc format) # Match "Day X - Part Y" followed by lines with "runner:" or "generator:" r'Day\s+(\d+)\s*-\s*Part\s+(\d+)[:\s]+.*?(?:^|\n).*?runner\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'day\s+(\d+)\s*-\s*part\s+(\d+)[:\s]+.*?(?:^|\n).*?runner\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', # Standalone runner/generator lines (for when we're already in a Day X - Part Y block) r'runner\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'generator\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', # Full format with day and part - various separators r'Day\s+(\d+)\s*[-,\s]+\s*Part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'day\s+(\d+)\s*[-,\s]+\s*part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'Day\s+(\d+)\s+Part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'day\s+(\d+)\s+part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'day(\d+)\s*[-,\s]+\s*part(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', # Part only (use provided day) - more flexible r'Part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'part\s+(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', # Handle formats without explicit "Part" label r'Day\s+(\d+)\s*[-,\s]+\s*(\d+)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', # Handle formats with parentheses or brackets r'\(Part\s+(\d+)\)[:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', r'\[Part\s+(\d+)\][:\s]+([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', ] # First, try to parse the generator/runner format which is most common # Look for "Day X - Part Y" lines and extract both generator and runner times lines = output.split('\n') current_day = None current_part = None actual_day = day # Default to provided day generator_time_ns = 0 runner_time_ns = 0 for i, line in enumerate(lines): # Check if this line starts a new Day/Part block day_part_match = re.match(r'Day\s+(\d+)\s*-\s*Part\s+(\d+)[:\s]', line, re.IGNORECASE) if day_part_match: # Save previous part's data if we have it if current_day is not None and current_part is not None and runner_time_ns > 0: results.append(PerformanceResult( user=user, year=year, day=actual_day, part=current_part, time_ns=runner_time_ns, generator_time_ns=generator_time_ns, git_rev=git_rev, repo_url=repo_url, timestamp=timestamp )) # Start new part current_day = int(day_part_match.group(1)) current_part = int(day_part_match.group(2)) actual_day = current_day if current_day > 0 and current_day <= 25 else day generator_time_ns = 0 runner_time_ns = 0 continue # If we're in a Day/Part block, look for generator and runner timing if current_day is not None and current_part is not None: generator_match = re.search(r'generator\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', line, re.IGNORECASE) if generator_match: time_str = generator_match.group(1) unit = generator_match.group(2).lower() try: time_val = float(time_str) generator_time_ns = CargoAOCRunner._convert_to_nanoseconds(time_val, unit) except ValueError: logger.warning(f"Could not parse generator time: {time_str}") runner_match = re.search(r'runner\s*:\s*([\d.]+)\s*(ns|μs|µs|us|ms|s|sec)', line, re.IGNORECASE) if runner_match: time_str = runner_match.group(1) unit = runner_match.group(2).lower() try: time_val = float(time_str) runner_time_ns = CargoAOCRunner._convert_to_nanoseconds(time_val, unit) except ValueError: logger.warning(f"Could not parse runner time: {time_str}") # Save the last part's data if current_day is not None and current_part is not None and runner_time_ns > 0: results.append(PerformanceResult( user=user, year=year, day=actual_day, part=current_part, time_ns=runner_time_ns, generator_time_ns=generator_time_ns, git_rev=git_rev, repo_url=repo_url, timestamp=timestamp )) # If we found results with the line-by-line approach, return them if results: return results # Otherwise, try the original pattern-based approach for pattern in patterns: for match in re.finditer(pattern, output, re.IGNORECASE | re.MULTILINE): groups = match.groups() # Determine day and part based on pattern if len(groups) == 4: # Pattern with day and part part_day = int(groups[0]) part_num = int(groups[1]) time_str = groups[2] unit = groups[3].lower() actual_day = part_day if part_day > 0 and part_day <= 25 else day elif len(groups) == 3: # Pattern with only part (use provided day) part_num = int(groups[0]) time_str = groups[1] unit = groups[2].lower() actual_day = day elif len(groups) == 2: # Standalone runner/generator line (use provided day, assume part from context) # This is tricky - we'll skip these and rely on the block-based approach above continue else: continue try: time_val = float(time_str) # Convert to nanoseconds if unit == 's' or unit == 'sec' or unit == 'second': time_ns = int(time_val * 1_000_000_000) elif unit == 'ms' or unit == 'millisecond': time_ns = int(time_val * 1_000_000) elif unit == 'μs' or unit == 'us' or unit == 'microsecond': time_ns = int(time_val * 1_000) elif unit == 'ns' or unit == 'nanosecond': time_ns = int(time_val) else: # Default to nanoseconds if unit unclear logger.warning(f"Unknown time unit '{unit}', assuming nanoseconds") time_ns = int(time_val) # Avoid duplicates if not any(r.day == actual_day and r.part == part_num for r in results): results.append(PerformanceResult( user=user, year=year, day=actual_day, part=part_num, time_ns=time_ns, timestamp=timestamp )) except ValueError: logger.warning(f"Could not parse time: {time_str}") # If no results found, try a more lenient approach - look for any numbers with time units if not results: # Look for patterns like "123.456ms" or "123.456 ms" anywhere in output lenient_patterns = [ r'([\d.]+)\s*(ns|μs|us|ms|s|sec)', r'([\d.]+)(ns|μs|us|ms|s|sec)', ] # Try to extract parts sequentially if we find timing info for pattern in lenient_patterns: matches = list(re.finditer(pattern, output, re.IGNORECASE)) if matches: # If we find exactly 1 or 2 matches, assume they're Part 1 and Part 2 if len(matches) == 1: match = matches[0] time_val = float(match.group(1)) unit = match.group(2).lower() time_ns = CargoAOCRunner._convert_to_nanoseconds(time_val, unit) results.append(PerformanceResult( user=user, year=year, day=day, part=1, time_ns=time_ns, timestamp=timestamp )) elif len(matches) == 2: for idx, match in enumerate(matches, 1): time_val = float(match.group(1)) unit = match.group(2).lower() time_ns = CargoAOCRunner._convert_to_nanoseconds(time_val, unit) results.append(PerformanceResult( user=user, year=year, day=day, part=idx, time_ns=time_ns, timestamp=timestamp )) break return results @staticmethod def _convert_to_nanoseconds(time_val: float, unit: str) -> int: """Convert time value to nanoseconds based on unit""" unit = unit.lower() # Handle unicode micro symbol (µ) and regular u if unit == 's' or unit == 'sec' or unit == 'second': return int(time_val * 1_000_000_000) elif unit == 'ms' or unit == 'millisecond': return int(time_val * 1_000_000) elif unit == 'μs' or unit == 'µs' or unit == 'us' or unit == 'microsecond': return int(time_val * 1_000) elif unit == 'ns' or unit == 'nanosecond': return int(time_val) else: # Default to nanoseconds return int(time_val) class HTMLGenerator: """Generates HTML comparison pages""" def __init__(self, output_dir: str): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) def generate(self, db: Database, config: Config): """Generate HTML comparison page""" # Get all years from database, but filter by compare_years if specified all_years_in_db = db.get_all_years() if config.compare_years: # Only include years that are both in compare_years AND in the database years = [y for y in config.compare_years if y in all_years_in_db] if not years: logger.warning(f"compare_years {config.compare_years} specified but no matching data found. Using all years from database.") years = all_years_in_db else: # Use all years from database years = all_years_in_db days = config.compare_days results = db.get_latest_results(years=None, days=days) # Get all years, filter in Python # Filter results by years if needed if years: results = [r for r in results if r['year'] in years] users = db.get_all_users() # Organize data by year -> day -> part -> user data = defaultdict(lambda: defaultdict(lambda: defaultdict(dict))) for result in results: year = result['year'] day = result['day'] part = result['part'] user = result['user'] runner_time_ns = result['time_ns'] generator_time_ns = result.get('generator_time_ns', 0) git_rev = result.get('git_rev', '') repo_url = result.get('repo_url', '') # Only store if runner_time_ns > 0 (valid result) # Store total time (generator + runner) for comparison if runner_time_ns > 0: total_time_ns = runner_time_ns + generator_time_ns data[year][day][part][user] = { 'total': total_time_ns, 'runner': runner_time_ns, 'generator': generator_time_ns, 'git_rev': git_rev, 'repo_url': repo_url } html = self._generate_html(data, years, users, db) output_file = self.output_dir / 'index.html' with open(output_file, 'w') as f: f.write(html) logger.info(f"Generated HTML report: {output_file}") def _generate_html(self, data: dict, years: List[int], users: List[str], db: Database) -> str: """Generate HTML content""" # Sort years descending (most recent first) sorted_years = sorted(years, reverse=True) # Calculate summary statistics total_days = sum(len(data[year]) for year in data) total_parts = sum(len(parts) for year in data for day in data[year].values() for parts in day.values()) users_with_data = set() for year in data.values(): for day in year.values(): for part in day.values(): users_with_data.update(part.keys()) html = f""" Advent of Code Performance Comparison

🎄 Advent of Code Performance Comparison

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

Users: """ + ', '.join(sorted(users)) + """

""" # Generate content for each year (sorted descending) for year in sorted_years: if year not in data: continue html += f"""

Year {year}

""" # Collect all day/part combinations for this year day_part_combos = [] for day in sorted(data[year].keys()): for part in sorted(data[year][day].keys()): day_part_combos.append((day, part)) if not day_part_combos: html += "

No data available for this year.

" html += "
" continue # Collect user info (git rev, repo_url) for header user_info = {} for day, part in day_part_combos: part_data = data[year][day][part] for user in users: if user not in user_info: user_info[user] = {'git_rev': '', 'repo_url': ''} if user in part_data: time_data = part_data[user] if isinstance(time_data, dict): if not user_info[user]['git_rev']: user_info[user]['git_rev'] = time_data.get('git_rev', '') if not user_info[user]['repo_url']: user_info[user]['repo_url'] = time_data.get('repo_url', '') # Find fastest and slowest times per day/part for highlighting and speed multiple fastest_times = {} slowest_times = {} speed_multiples = {} for day, part in day_part_combos: part_data = data[year][day][part] times = [] for user, time_data in part_data.items(): if isinstance(time_data, dict): total_time = time_data.get('total', 0) else: total_time = time_data if time_data > 0 else 0 if total_time > 0: times.append(total_time) if times: fastest_times[(day, part)] = min(times) slowest_times[(day, part)] = max(times) if fastest_times[(day, part)] > 0: speed_multiples[(day, part)] = slowest_times[(day, part)] / fastest_times[(day, part)] else: speed_multiples[(day, part)] = 0 # Create table with transposed structure html += """ """ # Add user columns with git rev in header for user in sorted(users): git_rev = user_info[user]['git_rev'] repo_url = user_info[user]['repo_url'] if git_rev and repo_url: commit_url = repo_url.rstrip('/') + '/commit/' + git_rev git_rev_html = f'
{git_rev[:7]}' elif git_rev: git_rev_html = f'
{git_rev[:7]}' else: git_rev_html = '' html += f' \n' html += """ """ # Add rows for each day/part combination for day, part in day_part_combos: part_data = data[year][day][part] fastest_time = fastest_times.get((day, part), 0) speed_multiple = speed_multiples.get((day, part), 0) row_history_modals = [] html += f""" """ # Add timing data for each user for user in sorted(users): time_data = part_data.get(user, 0) if time_data == 0 or (isinstance(time_data, dict) and time_data.get('total', 0) == 0): html += ' \n' else: # Extract times if isinstance(time_data, dict): total_time_ns = time_data.get('total', 0) runner_time_ns = time_data.get('runner', 0) generator_time_ns = time_data.get('generator', 0) git_rev = time_data.get('git_rev', '') repo_url = time_data.get('repo_url', '') else: # Backward compatibility total_time_ns = time_data runner_time_ns = total_time_ns generator_time_ns = 0 git_rev = '' repo_url = '' # Format total time total_ms = total_time_ns / 1_000_000 total_us = total_time_ns / 1_000 if total_ms >= 1: total_str = f"{total_ms:.2f} ms" elif total_us >= 1: total_str = f"{total_us:.2f} μs" else: total_str = f"{total_time_ns} ns" # Determine if fastest cell_class = "" if fastest_time > 0 and total_time_ns == fastest_time: cell_class = "fastest" # Get historical data for this user/day/part historical = db.get_historical_results(user, year, day, part) history_link = "" if len(historical) > 1: history_items = [] for hist in historical[:10]: # Show last 10 runs hist_total = hist['time_ns'] + hist.get('generator_time_ns', 0) hist_ms = hist_total / 1_000_000 hist_us = hist_total / 1_000 if hist_ms >= 1: hist_time_str = f"{hist_ms:.2f} ms" elif hist_us >= 1: hist_time_str = f"{hist_us:.2f} μs" else: hist_time_str = f"{hist_total} ns" hist_git = hist.get('git_rev', '')[:7] if hist.get('git_rev') else '-' hist_date = hist.get('timestamp', '')[:16] if hist.get('timestamp') else '' hist_repo_url = hist.get('repo_url', '') if hist_git != '-' and hist_repo_url: hist_git_link = f'{hist_git}' else: hist_git_link = hist_git history_items.append(f'
{hist_date}: {hist_time_str} ({hist_git_link})
') history_modal = f''' ''' row_history_modals.append(history_modal) history_link = f' 📊' html += f' \n' # Add speed multiple column if speed_multiple > 0: speed_multiple_str = f"{speed_multiple:.2f}x" else: speed_multiple_str = "-" html += f' \n' html += """ """ # Add history modals after the row for hist_modal in row_history_modals: html += hist_modal html += """
Day/Part{user}{git_rev_html}Speed Multiple
Day {day} Part {part}-{total_str}{history_link}{speed_multiple_str}
""" # Add summary statistics at the bottom html += f"""

Summary Statistics

Total Years
{len(data)}
Total Days
{total_days}
Total Parts
{total_parts}
Users with Data
{len(users_with_data)}
""" html += """ """ return html class AOCSync: """Main synchronization orchestrator""" def __init__(self, config_path: str = "config.yaml", force_rerun: bool = False): self.config = Config(config_path) self.db = Database(os.path.join(self.config.data_dir, 'results.db')) self.html_gen = HTMLGenerator(self.config.output_dir) self.git_manager = GitManager() self.force_rerun = force_rerun def process_repository(self, repo_config: dict, user_name: str): """Process a single repository configuration""" repo_type = repo_config.get('type', 'single') if repo_type == 'single': # Single repository with all years url = repo_config['url'] local_path = repo_config['local_path'] if self.force_rerun or self.git_manager.has_changes(url, local_path): if self.force_rerun: logger.info(f"Force rerun enabled, processing repository {user_name}...") else: logger.info(f"Repository {user_name} has changes, updating...") if self.git_manager.clone_or_update_repo(url, local_path): repo_path = Path(local_path) # Check if years are specified in config config_years = repo_config.get('years') url = repo_config['url'] if config_years: # Use years from config for year in config_years: self._run_and_store_benchmarks(repo_path, year, user_name, repo_url=url, is_multi_year=False) else: # Try to determine year(s) from the repository years = CargoAOCRunner.extract_years_from_repo(repo_path) if years: # Run benchmarks for each detected year for year in years: self._run_and_store_benchmarks(repo_path, year, user_name, repo_url=url, is_multi_year=False) else: # If no year detected, check for year directories logger.warning(f"No year detected for {user_name}, checking for year directories") # Try common years as fallback for try_year in [2025, 2024, 2023, 2022, 2021, 2020]: year_dir = repo_path / str(try_year) if year_dir.exists() and year_dir.is_dir(): logger.info(f"Found year directory {try_year} for {user_name}") self._run_and_store_benchmarks(repo_path, try_year, user_name, repo_url=url, is_multi_year=False) elif repo_type == 'multi-year': # Multiple repositories, one per year years_config = repo_config.get('years', []) for year_config in years_config: year = year_config['year'] url = year_config['url'] local_path = year_config['local_path'] if self.force_rerun or self.git_manager.has_changes(url, local_path): if self.force_rerun: logger.info(f"Force rerun enabled, processing repository {user_name} year {year}...") else: logger.info(f"Repository {user_name} year {year} has changes, updating...") if self.git_manager.clone_or_update_repo(url, local_path): repo_path = Path(local_path) self._run_and_store_benchmarks(repo_path, year, user_name, repo_url=url, is_multi_year=True) def _check_year_in_repo(self, repo_path: Path, year: int) -> bool: """Check if a repository contains solutions for a specific year""" # Simple heuristic: check if year appears in path or files path_str = str(repo_path) if str(year) in path_str: return True # Check Cargo.toml cargo_toml = repo_path / 'Cargo.toml' if cargo_toml.exists(): with open(cargo_toml, 'r') as f: if str(year) in f.read(): return True return False def _run_and_store_benchmarks(self, repo_path: Path, year: int, user: str, repo_url: str = "", is_multi_year: bool = False): """Run benchmarks and store results""" logger.info(f"Running benchmarks for {user} year {year} in {repo_path}") results = CargoAOCRunner.run_benchmarks(repo_path, year=year, user=user, repo_url=repo_url, is_multi_year=is_multi_year) # Store results for result in results: self.db.insert_result(result) logger.info(f"Stored {len(results)} benchmark results for {user} year {year}") def sync_all(self): """Sync all repositories""" logger.info("Starting sync of all repositories...") for repo_config in self.config.repositories: user_name = repo_config['name'] try: self.process_repository(repo_config, user_name) except Exception as e: logger.error(f"Error processing repository {user_name}: {e}") # Generate HTML report logger.info("Generating HTML report...") self.html_gen.generate(self.db, self.config) # Rsync output if configured self._rsync_output() def _rsync_output(self): """Rsync output directory to remote server if configured""" rsync_config = self.config.rsync_config if not rsync_config or not rsync_config.get('enabled', False): return destination = rsync_config.get('destination') if not destination: logger.warning("Rsync enabled but no destination specified") return output_dir = Path(self.config.output_dir) if not output_dir.exists(): logger.warning(f"Output directory {output_dir} does not exist, skipping rsync") return logger.info(f"Rsyncing {output_dir} to {destination}...") try: # Build rsync command # Use trailing slash on source to sync contents, not the directory itself source = str(output_dir) + "/" cmd = ['rsync', '-avz', '--delete', source, destination] result = subprocess.run( cmd, capture_output=True, text=True, timeout=60 # 1 minute timeout ) if result.returncode == 0: logger.info(f"Successfully rsynced output to {destination}") else: logger.error(f"Rsync failed: {result.stderr}") except subprocess.TimeoutExpired: logger.error("Rsync timed out") except Exception as e: logger.error(f"Error during rsync: {e}") def run_continuous(self): """Run continuous polling""" logger.info(f"Starting continuous polling (interval: {self.config.poll_interval}s)") try: while True: self.sync_all() logger.info(f"Sleeping for {self.config.poll_interval} seconds...") time.sleep(self.config.poll_interval) except KeyboardInterrupt: logger.info("Stopped by user") def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description='AOC Sync - Poll and compare AOC implementations') parser.add_argument('--config', default='config.yaml', help='Path to config file') parser.add_argument('--once', action='store_true', help='Run once instead of continuously') parser.add_argument('--force', '--rerun-all', action='store_true', dest='force_rerun', help='Force rerun all days even if repository has not changed') args = parser.parse_args() sync = AOCSync(args.config, force_rerun=args.force_rerun) if args.once: sync.sync_all() else: sync.run_continuous() if __name__ == '__main__': main()