diff --git a/docker/crawler/analyze.py b/docker/crawler/analyze.py index e488e14..476c79b 100644 --- a/docker/crawler/analyze.py +++ b/docker/crawler/analyze.py @@ -1,42 +1,99 @@ +""" +analyze.py + +Reads scraped pages (crawl_results/successful_pages.json), +sends each page to Gemini for structured extraction, and writes +API-ready transactions to crawl_results/extracted_arms_deals.json. + +- The Gemini prompt requests output that *matches the API's expected fields*. +- Each output object includes `canadian_relevance` and `relation_explanation` + so we can filter out non-Canadian items while still capturing indirect cases. +""" + import google.generativeai as genai import json import os +import re import time from dotenv import load_dotenv + load_dotenv() GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") -# json generated by the scraper (markeltine_crawler.py) +# json generated by the scraper (marketline_crawler.py) INPUT_FILE = os.path.join("crawl_results", "successful_pages.json") -# output JSON any extracted deals from the scraped data +# output JSON any extracted deals from the scraped data (API-ready schema) OUTPUT_FILE = os.path.join("crawl_results", "extracted_arms_deals.json") MODEL_NAME = "gemini-2.0-flash-lite" -# TODO: refine +# Prompt: instruct model to return API schema fields and to explicitly indicate +# if and how the result is related to Canada (direct, indirect, none). EXTRACTION_PROMPT = """ -From the document text provided below, extract key details about any military or arms exports. +You are a precise data-extraction system. -Your task is to identify the following: -- "company_name": The name of the company involved in manufacturing or selling. -- "weapon_system": The specific type of weapon, vehicle, or military equipment. -- "destination_country": The country receiving the goods. -- "sale_value": The monetary value of the deal, including currency (e.g., "$15 Billion CAD"). -- "summary": A concise, one-sentence summary of the export deal or report. +Given the DOCUMENT TEXT below, extract ALL transactions or arms-export relevant +entries and output a JSON array (possibly empty) of objects that match the +Project Ploughshares API schema. Output ONLY the JSON array — no markdown, +no commentary, no code fences. -If a specific piece of information cannot be found in the text, you MUST use the value "Not Found". +Each object must use the following fields (required fields must be provided +and set to "Not Found" if absent): -Provide your response as a single, clean JSON object. Do not add any explanatory text before or after the JSON. +Required fields: +- transaction_type (string) # e.g., "Export", "Purchase Order", "Component Supply" +- company_division (string) # company or division name (use "Not Found" if unknown) +- recipient (string) # receiving country or recipient (use "Not Found" if unknown) + +Optional fields (include if present): +- amount (string or number) # monetary value if present (e.g., "15,000,000 CAD") +- description (string) +- address_1, address_2, city, province, region, postal_code +- source_date (string YYYY-MM-DD) +- source_description (string) +- grant_type (string) +- commodity_class (string) # e.g., missile components, avionics, engines +- contract_number (string) +- comments (string) +- is_primary (boolean) + +Additionally, include these two new fields to help filter relevance: +- canadian_relevance (string) # one of: "direct", "indirect", "none" + - "direct" = Canadian company or Canada-origin export of military goods/components + - "indirect" = Canadian-made parts/components appear in a larger export (final assembly elsewhere) + - "none" = no meaningful Canadian connection +- relation_explanation (string) # short explanation why this is direct/indirect/none (1-2 sentences) + +Rules: +1. If a piece of info cannot be found, set it to the string "Not Found" (not null). +2. If multiple transactions are described in the text, output them as separate objects. +3. If the text contains the same transaction repeated, ensure you only output one object per distinct transaction. +4. Output must be valid JSON (an array). Example: + [ + {{ + "transaction_type": "Export", + "company_division": "Example Corp Canada", + "recipient": "Country X", + "amount": "3,000,000 CAD", + "commodity_class": "avionics modules", + "description": "Example summary ...", + "source_url": "https://example.com/article", + "canadian_relevance": "direct", + "relation_explanation": "Company is based in Canada and shipped avionics modules." + }} + ] ---- DOCUMENT TEXT: {text_content} """ +# ------------------------- +# Helper functions +# ------------------------- def load_scraped_data(filepath): - """Loads the scraped data from the JSON file.""" + """Loads the scraped data from the JSON file created by the crawler.""" try: with open(filepath, "r", encoding="utf-8") as f: return json.load(f) @@ -45,39 +102,95 @@ def load_scraped_data(filepath): print("Ensure you have run the scraper first.") return None + def save_extracted_data(filepath, data): """Saves the final extracted data to a new JSON file.""" with open(filepath, "w", encoding="utf-8") as f: - json.dump(data, f, indent=4, ensure_ascii=False) + json.dump(data, f, indent=2, ensure_ascii=False) print(f"\nāœ… Success! Saved extracted info to '{filepath}'.") +def extract_json_from_text(text): + """ + Attempts to find and return the first JSON array or object in a text blob. + This removes markdown fences and extracts from the first '[' ... ']' or '{' ... '}' pair. + """ + if not text or not isinstance(text, str): + return None + # remove common fences + cleaned = text.strip() + cleaned = cleaned.replace("```json", "").replace("```", "").strip() + + # Try to locate a JSON array first + arr_match = re.search(r"(\[.*\])", cleaned, flags=re.DOTALL) + if arr_match: + return arr_match.group(1) + + # Otherwise try a single JSON object + obj_match = re.search(r"(\{.*\})", cleaned, flags=re.DOTALL) + if obj_match: + return obj_match.group(1) + + return None + + def process_content_with_gemini(text_content): """ - Sends the text to the Gemini API with the extraction prompt and - parses the JSON response. + Sends the text to Gemini with the extraction prompt and parses the JSON response. + Uses your existing SDK usage pattern (genai.GenerativeModel). """ + # Keep using your existing model init pattern model = genai.GenerativeModel(MODEL_NAME) + prompt = EXTRACTION_PROMPT.format(text_content=text_content) try: + # Generate content. Your original code used model.generate_content(prompt) response = model.generate_content(prompt) - # Clean the response to ensure it's valid JSON. Gemini sometimes - # wraps its JSON response in markdown backticks. - clean_json = response.text.strip().replace("```json", "").replace("```", "") - # print("GOT: ", clean_json) - return json.loads(clean_json) + # Response object in your environment exposes .text (as in your original script) + raw = getattr(response, "text", str(response)) + # Try to extract JSON from the possibly noisy response + json_fragment = extract_json_from_text(raw) or raw + + # Parse JSON + parsed = json.loads(json_fragment) + # Ensure it's an array + if isinstance(parsed, dict): + parsed = [parsed] + return parsed + except Exception as e: print(f" āŒ An error occurred while calling Gemini or parsing its response: {e}") + # print raw text to help debugging if available + try: + print(" Raw response (truncated):", raw[:1000]) + except Exception: + pass return {"error": str(e)} +def is_valid_transaction(tx): + """ + Basic validation to ensure required API fields exist. + Required fields (per API): transaction_type, company_division, recipient + If a field is present but "Not Found", treat as missing for the + purposes of deciding whether to keep the record (we still surface it sometimes). + """ + for field in ["transaction_type", "company_division", "recipient"]: + if field not in tx or not tx[field] or tx[field] == "Not Found": + return False + return True + + +# ------------------------- +# Main orchestration +# ------------------------- def main(): - """Main function to run the data extraction process.""" if not GOOGLE_API_KEY: print("āŒ Error: GOOGLE_API_KEY environment variable not set.") return + # Configure the SDK (this is your existing working pattern) genai.configure(api_key=GOOGLE_API_KEY) scraped_pages = load_scraped_data(INPUT_FILE) @@ -91,32 +204,59 @@ def main(): print(f"šŸ¤– Starting information extraction with Gemini for {total_pages} pages...") for i, page in enumerate(scraped_pages): - print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}") + url = page.get("url", "unknown_url") + print(f"\nProcessing page {i+1}/{total_pages}: {url}") - # Avoid processing pages with very little text - if len(page.get('content', '')) < 150: + text = page.get("content", "") + if len(text) < 150: print(" ā© Skipping page due to insufficient content.") continue - extracted_info = process_content_with_gemini(page['content']) - - # Check if the extraction was successful and contains actual data - if extracted_info and "error" not in extracted_info: - if extracted_info.get("company_name") != "Not Found" or extracted_info.get("weapon_system") != "Not Found": - print(f" āœ”ļø Found relevant info: {extracted_info.get('company_name', 'N/A')} | {extracted_info.get('weapon_system', 'N/A')}") - # Add the source URL for reference - extracted_info['source_url'] = page['url'] - all_extracted_deals.append(extracted_info) - else: - print(" ⚪ No relevant deals found on this page.") - - # Add a small delay to respect API rate limits (1 second is safe) + extracted_items = process_content_with_gemini(text) + + # If model returned a single object or error, handle gracefully + if not extracted_items: + print(" ⚪ Gemini returned no items.") + time.sleep(1) + continue + if isinstance(extracted_items, dict) and "error" in extracted_items: + print(" āš ļø Gemini error:", extracted_items.get("error")) + time.sleep(1) + continue + + # iterate through items (should be array of objects) + for tx in extracted_items: + # attach source_url for traceability + tx.setdefault("source_url", url) + + # if the model gives canadian_relevance, use it to decide whether to keep + relevance = (tx.get("canadian_relevance") or "none").lower() + explanation = tx.get("relation_explanation", "") + + # If model says 'none', skip by default (these are the irrelevant ones like US missile contracts) + if relevance == "none": + print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200]) + continue + + # basic required-field check (we want the API-required fields present) + if not is_valid_transaction(tx): + print(" āš ļø Skipping — missing required API fields in extracted transaction:", tx) + continue + + # Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now + # Save the item + all_extracted_deals.append(tx) + print(f" āœ”ļø Kept transaction: {tx.get('company_division')} → {tx.get('recipient')} ({relevance})") + + # Respect rate limit time.sleep(1) + # Save results if all_extracted_deals: save_extracted_data(OUTPUT_FILE, all_extracted_deals) else: - print("\nNo relevant deals were extracted from any of the pages.") + print("\nNo relevant Canadian deals were extracted from any of the pages.") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/docker/crawler/marketline_crawler.py b/docker/crawler/marketline_crawler.py index 17744ea..3dcf5e1 100644 --- a/docker/crawler/marketline_crawler.py +++ b/docker/crawler/marketline_crawler.py @@ -22,13 +22,16 @@ HOMEPAGE_URL = "https://advantage.marketline.com/HomePage/Home" # the root page to seed crawling CRAWLPAGE_URL = "https://advantage.marketline.com/Search?industry=2800001" +# trying out another page +# CRAWLPAGE_URL = "https://www.defensenews.com/" + # name of file where cookies are saved COOKIES_FILE = "marketline_cookies.json" # --- CRAWLER SETTINGS --- -DEPTH = 2 -COUNT = 10 # Increased for better testing +DEPTH = 3 +COUNT = 100 # TODO: maybe make this list more comprehensive? SCRAPER_KEYWORDS = [ @@ -177,7 +180,7 @@ async def crawl_with_saved_cookies(): if __name__ == "__main__": # Choose which function to run # 1. First, run the login function once to get your cookies - asyncio.run(login_and_save_cookies()) + # asyncio.run(login_and_save_cookies()) # 2. Then, comment out the login line and run the crawl - # asyncio.run(crawl_with_saved_cookies()) \ No newline at end of file + asyncio.run(crawl_with_saved_cookies()) \ No newline at end of file diff --git a/docker/crawler/run_all.sh b/docker/crawler/run_all.sh new file mode 100644 index 0000000..42f8888 --- /dev/null +++ b/docker/crawler/run_all.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +# runs from start to finish the scraper, analyzer and then writes to the API + +set -e # exit immediately if any step fails + +echo "šŸ“” Crawling data..." +python marketline_crawler.py + +echo "🧠 Analyzing with Gemini..." +python analyze.py crawled_data.json results.json + +echo "šŸ“¤ Sending to API..." +python write_to_api.py results.json + +echo "āœ… All done!" diff --git a/docker/crawler/write_to_api.py b/docker/crawler/write_to_api.py new file mode 100644 index 0000000..e7d98c7 --- /dev/null +++ b/docker/crawler/write_to_api.py @@ -0,0 +1,64 @@ +import json +import requests +import sys + +API_BASE_URL = "http://ploughshares.nixc.us/api/transaction" +HEADERS = {"Content-Type": "application/json"} + +allowed_fields = { + "transaction_type", "company_division", "recipient", "amount", + "description", "address_1", "address_2", "city", "province", "region", + "postal_code", "source_date", "source_description", "grant_type", + "commodity_class", "contract_number", "comments", "is_primary" +} + +def clean_for_api(tx): + cleaned = {k: v for k, v in tx.items() if k in allowed_fields} + + # Remove invalid source_date + if "source_date" in cleaned: + if not isinstance(cleaned["source_date"], str) or cleaned["source_date"].lower() == "not found": + cleaned.pop("source_date") + + # Remove invalid amount (API expects numeric) + if "amount" in cleaned: + # If "Not Found" or not parseable as a float, drop it + try: + float(str(cleaned["amount"]).replace(",", "").replace("$", "")) + except ValueError: + cleaned.pop("amount") + + # Use source_url for source_description + if "source_url" in tx: + cleaned["source_description"] = tx["source_url"] + + return cleaned + + +def post_transaction(transaction): + payload = clean_for_api(transaction) + response = requests.post(API_BASE_URL, headers=HEADERS, json=payload) + if response.status_code == 200 or response.status_code == 201: + print(f"āœ… Created transaction for {payload['company_division']} → ID: {response.json().get('transaction_id')}") + else: + print(f"āŒ Failed to create transaction: {response.status_code} - {response.text}") + +def main(json_file_path): + with open(json_file_path, "r", encoding="utf-8") as f: + transactions = json.load(f) + + if not isinstance(transactions, list): + transactions = [transactions] + + for tx in transactions: + try: + post_transaction(tx) + except Exception as e: + print(f"Error posting transaction: {e}") + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage: python write_to_api.py results.json") + sys.exit(1) + + main(sys.argv[1])