#!/usr/bin/env python3 """Validate Google Alert query blocks and generate working replacements.""" from __future__ import annotations import argparse import dataclasses import json import re from pathlib import Path from typing import List, Optional, Tuple ALERT_NAME_RE = re.compile(r"`([^`]+)`") HEADING_RE = re.compile(r"^(#{3,})\s+(.*)") SITE_RE = re.compile(r"site:[^\s)]+", re.IGNORECASE) OR_RE = re.compile(r"\bOR\b", re.IGNORECASE) QUOTE_RE = re.compile(r'"([^"]+)"') NEGATIVE_TOKEN_RE = re.compile(r"(?:^|\s)-(?!\s)([^\s]+)") # Regional groupings for Canadian subreddits REDDIT_REGIONS = { "Ontario-GTA": ["r/kitchener", "r/waterloo", "r/CambridgeON", "r/guelph", "r/toronto", "r/mississauga", "r/brampton"], "Ontario-Other": ["r/ontario", "r/londonontario", "r/HamiltonOntario", "r/niagara", "r/ottawa"], "Western": ["r/vancouver", "r/VictoriaBC", "r/Calgary", "r/Edmonton"], "Prairies": ["r/saskatoon", "r/regina", "r/winnipeg"], "Eastern": ["r/montreal", "r/quebeccity", "r/halifax", "r/newfoundland"], } @dataclasses.dataclass class AlertBlock: heading: str alert_name: str purpose: Optional[str] target: Optional[str] query: str start_line: int @dataclasses.dataclass class Finding: rule: str severity: str message: str suggestion: str @dataclasses.dataclass class Analysis: alert: AlertBlock metrics: dict findings: List[Finding] fixed_queries: List[Tuple[str, str]] # [(alert_name, query)] def parse_alerts(markdown_path: Path) -> List[AlertBlock]: text = markdown_path.read_text(encoding="utf-8") lines = text.splitlines() alerts: List[AlertBlock] = [] current_heading = "" pending: Optional[dict] = None code_lines: List[str] = [] collecting_code = False for idx, raw_line in enumerate(lines, start=1): line = raw_line.rstrip("\n") heading_match = HEADING_RE.match(line) if heading_match: hashes, heading_text = heading_match.groups() if len(hashes) >= 3: # only capture tertiary sections current_heading = heading_text.strip() if line.startswith("**Alert Name:**"): match = ALERT_NAME_RE.search(line) alert_name = match.group(1).strip() if match else line.split("**Alert Name:**", 1)[1].strip() pending = { "heading": current_heading, "alert_name": alert_name, "purpose": None, "target": None, "query": None, "start_line": idx, } continue if pending: if line.startswith("**Purpose:**"): pending["purpose"] = line.split("**Purpose:**", 1)[1].strip() continue if line.startswith("**Target:**"): pending["target"] = line.split("**Target:**", 1)[1].strip() continue if line.strip() == "```": if not pending: # ignore code blocks unrelated to alerts collecting_code = False code_lines = [] continue if not collecting_code: collecting_code = True code_lines = [] else: collecting_code = False query_text = "\n".join(code_lines).strip() alert_block = AlertBlock( heading=pending["heading"], alert_name=pending["alert_name"], purpose=pending["purpose"], target=pending["target"], query=query_text, start_line=pending["start_line"], ) alerts.append(alert_block) pending = None code_lines = [] continue if collecting_code: code_lines.append(line) return alerts def extract_query_parts(query: str) -> Tuple[List[str], List[str], List[str]]: """Extract site filters, keywords, and exclusions from query.""" sites = SITE_RE.findall(query) # Extract all quoted phrases first (these are the keywords) all_keywords = QUOTE_RE.findall(query) # Filter out ALERT_NAME markers keywords = [kw for kw in all_keywords if not kw.startswith("ALERT_NAME:")] # Find exclusions (negative terms) exclusions = [] for match in NEGATIVE_TOKEN_RE.finditer(query): term = match.group(1) # Skip if it's part of quoted text if '"' not in match.group(0): exclusions.append(term) return sites, keywords, exclusions def generate_fixed_queries(alert: AlertBlock, findings: List[Finding]) -> List[Tuple[str, str]]: """Generate working replacement queries when issues are found.""" if not findings or not any(f.severity == "high" for f in findings): return [] sites, keywords, exclusions = extract_query_parts(alert.query) fixed = [] # Check if this is a Reddit alert with too many sites is_reddit = any("reddit.com" in s for s in sites) has_site_issue = any(f.rule == "site-filter-limit" for f in findings) has_term_issue = any(f.rule == "term-limit" for f in findings) if is_reddit and has_site_issue: # Split by region for region_name, subreddits in REDDIT_REGIONS.items(): # Limit keywords to top 10-12 most specific ones top_keywords = keywords[:12] if has_term_issue else keywords[:18] site_part = " OR ".join([f"site:reddit.com/{sub}" for sub in subreddits]) keyword_part = " OR ".join([f'"{kw}"' for kw in top_keywords]) exclusion_part = " ".join([f"-{ex}" for ex in exclusions[:4]]) # Limit exclusions fixed_query = f"({site_part})\n({keyword_part})\n{exclusion_part}".strip() # Verify it meets limits test_metrics = { "site_filters": len(subreddits), "approx_terms": len(top_keywords), "char_length": len(fixed_query), } if test_metrics["site_filters"] <= 8 and test_metrics["approx_terms"] <= 18 and test_metrics["char_length"] <= 500: new_name = f"{alert.alert_name.replace(' - Reddit CA', '')} - {region_name}" fixed.append((new_name, fixed_query)) elif has_term_issue and not is_reddit: # For non-Reddit, just trim keywords top_keywords = keywords[:15] site_part = " OR ".join(sites) keyword_part = " OR ".join([f'"{kw}"' for kw in top_keywords]) exclusion_part = " ".join([f"-{ex}" for ex in exclusions[:4]]) if site_part: fixed_query = f"({site_part})\n({keyword_part})\n{exclusion_part}".strip() else: fixed_query = f"({keyword_part})\n{exclusion_part}".strip() if len(fixed_query) <= 500: fixed.append((alert.alert_name + " (Fixed)", fixed_query)) return fixed def evaluate(alert: AlertBlock) -> Analysis: query = alert.query normalized = " ".join(query.split()) site_filters = SITE_RE.findall(query) or_count = len(OR_RE.findall(query)) approx_terms = or_count + 1 quoted_phrases = len(QUOTE_RE.findall(query)) negative_tokens = len(NEGATIVE_TOKEN_RE.findall(query)) char_length = len(normalized) lines = query.count("\n") + 1 metrics = { "site_filters": len(site_filters), "or_operators": or_count, "approx_terms": approx_terms, "quoted_phrases": quoted_phrases, "negative_tokens": negative_tokens, "char_length": char_length, "line_count": lines, } findings: List[Finding] = [] if metrics["site_filters"] > 12: findings.append(Finding( rule="site-filter-limit", severity="high", message=f"Contains {metrics['site_filters']} site filters, which usually exceeds Google Alerts reliability.", suggestion="Split geography into multiple alerts with fewer site: clauses each.", )) if metrics["approx_terms"] > 28: findings.append(Finding( rule="term-limit", severity="high", message=f"Approx {metrics['approx_terms']} OR terms detected (>{28}).", suggestion="Break the keyword block into two alerts or remove low-value phrases.", )) if metrics["quoted_phrases"] > 12: findings.append(Finding( rule="quoted-phrases", severity="medium", message=f"Uses {metrics['quoted_phrases']} exact-phrase matches, reducing match surface.", suggestion="Convert some exact phrases into (keyword AND variant) pairs to widen matches.", )) if metrics["char_length"] > 600: findings.append(Finding( rule="length", severity="medium", message=f"Query is {metrics['char_length']} characters long (Google truncates beyond ~512).", suggestion="Remove redundant OR terms or shorten site filter lists.", )) if metrics["negative_tokens"] > 8: findings.append(Finding( rule="exclusion-limit", severity="low", message=f"Contains {metrics['negative_tokens']} negative filters; excess exclusions may hide valid leads.", suggestion="Keep only the highest noise sources (e.g., -job -jobs).", )) if metrics["line_count"] > 3: findings.append(Finding( rule="multiline", severity="low", message="Query spans more than three lines, which often indicates chained filters beyond alert limits.", suggestion="Condense by running separate alerts per platform or intent.", )) fixed_queries = generate_fixed_queries(alert, findings) return Analysis(alert=alert, metrics=metrics, findings=findings, fixed_queries=fixed_queries) def format_markdown(analyses: List[Analysis]) -> str: lines: List[str] = [] for analysis in analyses: alert = analysis.alert lines.append(f"### {alert.alert_name}") heading = alert.heading or "(No heading)" lines.append(f"Section: {heading}") lines.append(f"Start line: {alert.start_line}") metric_parts = [f"site:{analysis.metrics['site_filters']}", f"ORs:{analysis.metrics['or_operators']}", f"phrases:{analysis.metrics['quoted_phrases']}", f"len:{analysis.metrics['char_length']}"] lines.append("Metrics: " + ", ".join(metric_parts)) if analysis.findings: lines.append("Findings:") for finding in analysis.findings: lines.append(f"- ({finding.severity}) {finding.message} Suggestion: {finding.suggestion}") else: lines.append("Findings: None detected by heuristics.") lines.append("") return "\n".join(lines).strip() + "\n" def generate_fixed_markdown(analyses: List[Analysis]) -> str: """Generate new markdown with working queries.""" lines = ["# Google Alert Queries - Working Versions", "", "These queries have been validated to work within Google Alerts limits.", "Each query stays under 500 chars, uses ≤8 site filters, and ≤18 OR terms.", ""] for analysis in analyses: alert = analysis.alert if analysis.fixed_queries: # Use fixed versions for new_name, new_query in analysis.fixed_queries: lines.append(f"## {new_name}") if alert.purpose: lines.append(f"**Purpose:** {alert.purpose}") if alert.target: lines.append(f"**Target:** {alert.target}") lines.append("") lines.append("```") lines.append(new_query) lines.append("```") lines.append("") elif not any(f.severity == "high" for f in analysis.findings): # Query is already OK, keep it lines.append(f"## {alert.alert_name}") if alert.purpose: lines.append(f"**Purpose:** {alert.purpose}") if alert.target: lines.append(f"**Target:** {alert.target}") lines.append("") lines.append("```") lines.append(alert.query) lines.append("```") lines.append("") return "\n".join(lines) def run(markdown_path: Path, output_format: str, fix_mode: bool) -> None: alerts = parse_alerts(markdown_path) analyses = [evaluate(alert) for alert in alerts] if fix_mode: print(generate_fixed_markdown(analyses)) elif output_format == "json": payload = [ { "alert_name": analysis.alert.alert_name, "heading": analysis.alert.heading, "start_line": analysis.alert.start_line, "metrics": analysis.metrics, "findings": [dataclasses.asdict(f) for f in analysis.findings], "fixed_count": len(analysis.fixed_queries), } for analysis in analyses ] print(json.dumps(payload, indent=2)) else: print(format_markdown(analyses)) def main() -> None: parser = argparse.ArgumentParser(description="Validate Google Alert queries and generate working replacements.") parser.add_argument("markdown", nargs="?", default="docs/google-alerts.md", help="Path to the markdown file containing alerts.") parser.add_argument("--format", choices=["markdown", "json"], default="markdown") parser.add_argument("--fix", action="store_true", help="Generate fixed/working queries") args = parser.parse_args() markdown_path = Path(args.markdown) if not markdown_path.exists(): raise SystemExit(f"File not found: {markdown_path}") run(markdown_path, args.format, args.fix) if __name__ == "__main__": main()