From a3da858a169865d2e2e04c60a9fda0684a388cee Mon Sep 17 00:00:00 2001 From: jChenvan <188939308+jChenvan@users.noreply.github.com> Date: Wed, 20 Aug 2025 19:08:45 -0400 Subject: [PATCH] Dork scraper use same prompt/logic as main crawler --- docker/crawler-google-alerts/main.py | 281 ++++++++++++++++++--------- 1 file changed, 191 insertions(+), 90 deletions(-) diff --git a/docker/crawler-google-alerts/main.py b/docker/crawler-google-alerts/main.py index 6f1f0d7..a73f2d2 100644 --- a/docker/crawler-google-alerts/main.py +++ b/docker/crawler-google-alerts/main.py @@ -3,6 +3,7 @@ from typing import Optional import google.generativeai as genai import json import os +import re import time from dotenv import load_dotenv import requests @@ -18,99 +19,172 @@ MODEL_NAME = "gemini-2.0-flash-lite" # TODO: refine EXTRACTION_PROMPT = """ -You are an information extraction system. -Your task is to extract specific fields from the provided article text (the 'source'). -The topic is Canadian military exports/transactions. +You are a precise data-extraction system. -Follow these rules strictly: -1. Output ONLY valid JSON — no explanations or commentary. -2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders). -3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the field’s requirement. -4. Do not guess or infer — if the text is ambiguous, leave the field out. -5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition). -6. Do not mix unrelated information into a field. +Given the DOCUMENT TEXT below, extract ALL transactions or arms-export relevant +entries and output a JSON array (possibly empty) of objects that match the +Project Ploughshares API schema. Output ONLY the JSON array — no markdown, +no commentary, no code fences. -Fields to extract (omit if not found): -* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract") -* "company_division": Canadian company/division involved in the transaction -* "address_1", "address_2", "city", "province", "region", "postal_code": Address of the company -* "recipient": Recipient of the transaction, be it a country, organization, or individual -* "amount": Transaction amount, including the currency -* "description": Transaction description -* "source_date": Date in YYYY-MM-DD format the source/article was posted at. -* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article. -* "grant_type": Type of grant -* "commodity_class": Commodity classification or the product being exported in the transaction, e.g. missile components, avionics, engines -* "contract_number": Contract number -* "comments": Additional comments -* "is_primary": Boolean flag +Each object must use the following fields (required fields must be provided +and set to "Not Found" if absent): + +Required fields: +- transaction_type (string) # e.g., "Export", "Purchase Order", "Component Supply" +- company_division (string) # company or division name (use "Not Found" if unknown) +- recipient (string) # receiving country or recipient (use "Not Found" if unknown) + +Optional fields (include if present): +- amount (string or number) # monetary value if present (e.g., "15,000,000 CAD") +- description (string) +- address_1, address_2, city, province, region, postal_code +- source_date (string YYYY-MM-DD) +- source_description (string) +- grant_type (string) +- commodity_class (string) # e.g., missile components, avionics, engines +- contract_number (string) +- comments (string) +- is_primary (boolean) + +Additionally, include these two new fields to help filter relevance: +- canadian_relevance (string) # one of: "direct", "indirect", "none" + - "direct" = Canadian company or Canada-origin export of military goods/components + - "indirect" = Canadian-made parts/components appear in a larger export (final assembly elsewhere) + - "none" = no meaningful Canadian connection +- relation_explanation (string) # short explanation why this is direct/indirect/none (1-2 sentences) + +Rules: +1. If a piece of info cannot be found, set it to the string "Not Found" (not null). +2. If multiple transactions are described in the text, output them as separate objects. +3. If the text contains the same transaction repeated, ensure you only output one object per distinct transaction. +4. Output must be valid JSON (an array). Example: + [ + {{ + "transaction_type": "Export", + "company_division": "Example Corp Canada", + "recipient": "Country X", + "amount": "3,000,000 CAD", + "commodity_class": "avionics modules", + "description": "Example summary ...", + "source_url": "https://example.com/article", + "canadian_relevance": "direct", + "relation_explanation": "Company is based in Canada and shipped avionics modules." + }} + ] ---- DOCUMENT TEXT: {text_content} """ -SCHEMA = { - "type": "object", - "required": ["source_description"], - "properties": { - "transaction_type": {"type": "string"}, - "company_division": {"type": "string"}, - "recipient": {"type": "string"}, - "amount": {"type": "number"}, - "description": {"type": "string"}, - "address_1": {"type": "string"}, - "address_2": {"type": "string"}, - "city": {"type": "string"}, - "province": {"type": "string"}, - "region": {"type": "string"}, - "postal_code": {"type": "string"}, - "source_date": {"type": "string"}, - "source_description": {"type": "string"}, - "grant_type": {"type": "string"}, - "commodity_class": {"type": "string"}, - "contract_number": {"type": "string"}, - "comments": {"type": "string"}, - "is_primary": {"type": "boolean"} - } -} +def extract_json_from_text(text): + """ + Attempts to find and return the first JSON array or object in a text blob. + This removes markdown fences and extracts from the first '[' ... ']' or '{' ... '}' pair. + """ + if not text or not isinstance(text, str): + return None + # remove common fences + cleaned = text.strip() + cleaned = cleaned.replace("```json", "").replace("```", "").strip() -def validate_info(extracted_info): - if ("transaction_type" not in extracted_info): - return False - if (len(extracted_info["transaction_type"]) == 0): - return False - if ("company_division" not in extracted_info): - return False - if (len(extracted_info["company_division"]) == 0): - return False - if ("recipient" not in extracted_info): - return False - if (len(extracted_info["recipient"]) == 0): - return False - return True + # Try to locate a JSON array first + arr_match = re.search(r"(\[.*\])", cleaned, flags=re.DOTALL) + if arr_match: + return arr_match.group(1) + + # Otherwise try a single JSON object + obj_match = re.search(r"(\{.*\})", cleaned, flags=re.DOTALL) + if obj_match: + return obj_match.group(1) + + return None def process_content_with_gemini(text_content): """ - Sends the text to the Gemini API with the extraction prompt and - parses the JSON response. + Sends the text to Gemini with the extraction prompt and parses the JSON response. + Uses your existing SDK usage pattern (genai.GenerativeModel). """ + # Keep using your existing model init pattern model = genai.GenerativeModel(MODEL_NAME) # type: ignore + prompt = EXTRACTION_PROMPT.format(text_content=text_content) try: - response = model.generate_content( - prompt, - generation_config={ - "response_schema": SCHEMA, - "response_mime_type": 'application/json', - } - ) - return json.loads(response.text) + # Generate content. Your original code used model.generate_content(prompt) + response = model.generate_content(prompt) + # Response object in your environment exposes .text (as in your original script) + raw = getattr(response, "text", str(response)) + # Try to extract JSON from the possibly noisy response + json_fragment = extract_json_from_text(raw) or raw + + # Parse JSON + parsed = json.loads(json_fragment) + # Ensure it's an array + if isinstance(parsed, dict): + parsed = [parsed] + return parsed + except Exception as e: print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}") + # print raw text to help debugging if available + try: + print(" Raw response (truncated):", raw[:1000]) + except Exception: + pass return {"error": str(e)} +def is_valid_transaction(tx): + """ + Basic validation to ensure required API fields exist. + Required fields (per API): transaction_type, company_division, recipient + If a field is present but "Not Found", treat as missing for the + purposes of deciding whether to keep the record (we still surface it sometimes). + """ + for field in ["transaction_type", "company_division", "recipient"]: + if field not in tx or not tx[field] or tx[field] == "Not Found": + return False + return True + +API_BASE_URL = "http://ploughshares.nixc.us/api/transaction" +HEADERS = {"Content-Type": "application/json"} + +allowed_fields = { + "transaction_type", "company_division", "recipient", "amount", + "description", "address_1", "address_2", "city", "province", "region", + "postal_code", "source_date", "source_description", "grant_type", + "commodity_class", "contract_number", "comments", "is_primary" +} + +def clean_for_api(tx): + cleaned = {k: v for k, v in tx.items() if k in allowed_fields} + + # Remove invalid source_date + if "source_date" in cleaned: + if not isinstance(cleaned["source_date"], str) or cleaned["source_date"].lower() == "not found": + cleaned.pop("source_date") + + # Remove invalid amount (API expects numeric) + if "amount" in cleaned: + # If "Not Found" or not parseable as a float, drop it + try: + float(str(cleaned["amount"]).replace(",", "").replace("$", "")) + except ValueError: + cleaned.pop("amount") + + # Use source_url for source_description + if "source_url" in tx: + cleaned["source_description"] = tx["source_url"] + + return cleaned + + +def post_transaction(transaction): + payload = clean_for_api(transaction) + response = requests.post(API_BASE_URL, headers=HEADERS, json=payload) + if response.status_code == 200 or response.status_code == 201: + print(f"✅ Created transaction for {payload['company_division']} → ID: {response.json().get('transaction_id')}") + else: + print(f"❌ Failed to create transaction: {response.status_code} - {response.text}") async def main(): """Main function to run the data extraction process.""" @@ -133,34 +207,61 @@ async def main(): print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...") for i, page in enumerate(scraped_pages): + url = page.get("url", "unknown_url") print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}") # Avoid processing pages with very little text - if len(page.get('content', '')) < 150: + text = page.get("content", "") + if len(text) < 150: print(" ⏩ Skipping page due to insufficient content.") continue - extracted_info = process_content_with_gemini(page['content']) + extracted_items = process_content_with_gemini(page['content']) - # Check if the extraction was successful and contains actual data - if extracted_info and "error" not in extracted_info: - if validate_info(extracted_info): - print(" ✔️ Found relevant info") - desc = "" - if "source_description" in extracted_info: - desc = extracted_info["source_description"] - extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}" - all_extracted_deals.append(extracted_info) - else: - print(" ❌ insufficient info") - print(f" Extracted info: {extracted_info}") - - # Add a small delay to respect API rate limits (1 second is safe) + # If model returned a single object or error, handle gracefully + if not extracted_items: + print(" ⚪ Gemini returned no items.") + time.sleep(1) + continue + if isinstance(extracted_items, dict) and "error" in extracted_items: + print(" ⚠️ Gemini error:", extracted_items.get("error")) + time.sleep(1) + continue + + # iterate through items (should be array of objects) + for tx in extracted_items: + # attach source_url for traceability + tx.setdefault("source_url", url) # type: ignore + + # if the model gives canadian_relevance, use it to decide whether to keep + relevance = (tx.get("canadian_relevance") or "none").lower() # type: ignore + explanation = tx.get("relation_explanation", "") # type: ignore + + # If model says 'none', skip by default (these are the irrelevant ones like US missile contracts) + if relevance == "none": + print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200]) + continue + + # basic required-field check (we want the API-required fields present) + if not is_valid_transaction(tx): + print(" ⚠️ Skipping — missing required API fields in extracted transaction:", tx) + continue + + # Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now + # Save the item + all_extracted_deals.append(tx) + print(f" ✔️ Kept transaction: {tx.get('company_division')} → {tx.get('recipient')} ({relevance})") # type: ignore + + # Respect rate limit time.sleep(1) if all_extracted_deals: + print("WRITING TO DB") for transaction in all_extracted_deals: - requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction) + try: + post_transaction(transaction) + except Exception as e: + print(f"Error posting transaction: {e}") else: print("\nNo relevant deals were extracted from any of the pages.")