ploughshares/docker/crawler/analyze.py

290 lines
12 KiB
Python

"""
analyze.py
Reads scraped pages (crawl_results/successful_pages.json),
sends each page to Gemini for structured extraction, and writes
API-ready transactions to crawl_results/extracted_arms_deals.json.
- The Gemini prompt requests output that *matches the API's expected fields*.
- Each output object includes `canadian_relevance` and `relation_explanation`
so we can filter out non-Canadian items while still capturing indirect cases.
"""
import google.generativeai as genai
import json
import os
import re
import time
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
# json generated by the scraper (marketline_crawler.py)
INPUT_FILE = os.path.join("crawl_results", "successful_pages.json")
# output JSON any extracted deals from the scraped data (API-ready schema)
OUTPUT_FILE = os.path.join("crawl_results", "extracted_arms_deals.json")
# TODO; we can use 2.0 flash lite it just has a lower request per day and seems to perform slightly worst...
# we should switch if we run into significiant request per minute issues...
# see the most updated docs below, my info above may become outdated by the time anyone reads this
# see the overview: https://ai.google.dev/gemini-api/docs/rate-limits
MODEL_NAME = "gemini-2.5-flash-lite"
# Prompt: instruct model to return API schema fields and to explicitly indicate
# if and how the result is related to Canada (direct, indirect, none).
EXTRACTION_PROMPT = """
You are an expert intelligence analyst specializing in the global defense supply chain. Your task is to act as a precise data-extraction system.
Given the DOCUMENT TEXT below, your mission is to identify and extract ALL potential transactions, contracts, supply chain mentions, or other arms-export relevant events with a potential connection to Canada. Your primary objective is high recall; you should err on the side of including an entry if it has any plausible link to Canada.
Output a JSON array of objects that match the Project Ploughshares API schema. Output ONLY the JSON array — no markdown, no commentary, no code fences.
---
### Guiding Principles & Heuristics
To determine Canadian relevance, use the following rules:
1. **Canadian Company Identification:** A company or division is considered Canadian if:
a. Its name explicitly includes "Canada" (e.g., "L3Harris Canada").
b. Its address is located within Canada.
c. It is one of the following known major players in the Canadian defense industry:
- General Dynamics Land Systems-Canada (GDLS-C)
- CAE Inc.
- Bombardier
- L3Harris Technologies Canada
- Thales Canada
- MDA
- IMP Group
- Magellan Aerospace
- Heroux-Devtek
- PAL Aerospace
- Irving Shipbuilding
- Seaspan Shipyards
- Babcock Canada
d. The text describes it as a "Canadian company" or "based in Canada".
2. **Indirect Link Identification:** An 'indirect' link exists when Canadian-made parts, materials, or sub-systems are part of a larger product assembled or sold by a non-Canadian entity. Look for phrases like:
- "powered by engines from..."
- "utilizing components supplied by..."
- "avionics provided by..."
- "built with steel from..."
- "the supply chain includes..."
3. **Transaction Definition:** A "transaction" is defined broadly. It can be a direct sale, a purchase order, a maintenance contract, a government grant for development, a component supply agreement, or even a confirmed report of a transfer.
---
### JSON Output Schema
Each object in the output array must use the following fields.
**Required fields (must be provided; use "Not Found" if absent):**
- transaction_type (string) # e.g., "Export", "Purchase Order", "Component Supply", "Maintenance Contract", "Grant"
- company_division (string) # The primary company or division involved.
- recipient (string) # The receiving country, company, or entity.
**Optional fields (include if present, otherwise omit the key):**
- amount (string or number) # e.g., "15,000,000 CAD"
- description (string) # A summary of the transaction.
- address_1, address_2, city, province, region, postal_code
- source_date (string YYYY-MM-DD)
- source_description (string)
- grant_type (string)
- commodity_class (string) # e.g., "Armoured Vehicles", "Avionics", "Engine Components", "Naval Systems"
- contract_number (string)
- comments (string)
- is_primary (boolean)
**Mandatory Relevance Analysis Fields:**
- canadian_relevance (string) # Must be one of: "direct", "indirect", "none"
- "direct": A Canadian company or the Canadian government is directly exporting/selling military goods or services.
- "indirect": Canadian-made parts, materials, or sub-systems are identified as being part of a larger system exported by another country.
- "none": No meaningful Canadian connection can be established.
- relation_explanation (string) # A brief (1-2 sentence) explanation for the 'canadian_relevance' classification, citing the evidence from the text.
---
### Final Output Rules
1. If a required field's value cannot be found in the text, you MUST set its value to the string "Not Found". Do not use null.
2. If multiple distinct transactions are described, output them as separate objects in the array.
3. Do not duplicate the same transaction. If mentioned multiple times, consolidate into one object.
4. Your final output must be ONLY the raw, valid JSON array.
---
DOCUMENT TEXT:
{text_content}
"""
def load_scraped_data(filepath):
"""Loads the scraped data from the JSON file created by the crawler."""
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
print(f"❌ Error: Input file not found at '{filepath}'.")
print("Ensure you have run the scraper first.")
return None
def save_extracted_data(filepath, data):
"""Saves the final extracted data to a new JSON file."""
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"\n✅ Success! Saved extracted info to '{filepath}'.")
def extract_json_from_text(text):
"""
Attempts to find and return the first JSON array or object in a text blob.
This removes markdown fences and extracts from the first '[' ... ']' or '{' ... '}' pair.
"""
if not text or not isinstance(text, str):
return None
# remove common fences
cleaned = text.strip()
cleaned = cleaned.replace("```json", "").replace("```", "").strip()
# Try to locate a JSON array first
arr_match = re.search(r"(\[.*\])", cleaned, flags=re.DOTALL)
if arr_match:
return arr_match.group(1)
# Otherwise try a single JSON object
obj_match = re.search(r"(\{.*\})", cleaned, flags=re.DOTALL)
if obj_match:
return obj_match.group(1)
return None
def process_content_with_gemini(text_content):
"""
Sends the text to Gemini with the extraction prompt and parses the JSON response.
Uses your existing SDK usage pattern (genai.GenerativeModel).
"""
# Keep using your existing model init pattern
model = genai.GenerativeModel(MODEL_NAME)
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try:
# Generate content. Your original code used model.generate_content(prompt)
response = model.generate_content(prompt)
# Response object in your environment exposes .text (as in your original script)
raw = getattr(response, "text", str(response))
# Try to extract JSON from the possibly noisy response
json_fragment = extract_json_from_text(raw) or raw
# Parse JSON
parsed = json.loads(json_fragment)
# Ensure it's an array
if isinstance(parsed, dict):
parsed = [parsed]
return parsed
except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
# print raw text to help debugging if available
try:
print(" Raw response (truncated):", raw[:1000])
except Exception:
pass
return {"error": str(e)}
def is_valid_transaction(tx):
"""
Basic validation to ensure required API fields exist.
Required fields (per API): transaction_type, company_division, recipient
If a field is present but "Not Found", treat as missing for the
purposes of deciding whether to keep the record (we still surface it sometimes).
"""
for field in ["transaction_type", "company_division", "recipient"]:
if field not in tx or not tx[field] or tx[field] == "Not Found":
return False
return True
# -------------------------
# Main orchestration
# -------------------------
def main():
if not GOOGLE_API_KEY:
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
return
# Configure the SDK (this is your existing working pattern)
genai.configure(api_key=GOOGLE_API_KEY)
scraped_pages = load_scraped_data(INPUT_FILE)
if not scraped_pages:
print("❌ Error: No scraper results found. Run marketline_crawler.py to generate crawl_results/successful_pages.json")
return
all_extracted_deals = []
total_pages = len(scraped_pages)
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages):
url = page.get("url", "unknown_url")
print(f"\nProcessing page {i+1}/{total_pages}: {url}")
text = page.get("content", "")
if len(text) < 150:
print(" ⏩ Skipping page due to insufficient content.")
continue
extracted_items = process_content_with_gemini(text)
# If model returned a single object or error, handle gracefully
if not extracted_items:
print(" ⚪ Gemini returned no items.")
time.sleep(1)
continue
if isinstance(extracted_items, dict) and "error" in extracted_items:
print(" ⚠️ Gemini error:", extracted_items.get("error"))
time.sleep(1)
continue
# iterate through items (should be array of objects)
for tx in extracted_items:
# attach source_url for traceability
tx.setdefault("source_url", url)
# if the model gives canadian_relevance, use it to decide whether to keep
relevance = (tx.get("canadian_relevance") or "none").lower()
explanation = tx.get("relation_explanation", "")
# If model says 'none', skip by default (these are the irrelevant ones like US missile contracts)
if relevance == "none":
print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200])
continue
# basic required-field check (we want the API-required fields present)
if not is_valid_transaction(tx):
print(" ⚠️ missing required API fields in extracted transaction:", tx)
#continue
# Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now
# Save the item
all_extracted_deals.append(tx)
print(f" ✔️ Kept transaction: {tx.get('company_division')}{tx.get('recipient')} ({relevance})")
# Respect rate limit
time.sleep(1)
# Save results
if all_extracted_deals:
save_extracted_data(OUTPUT_FILE, all_extracted_deals)
else:
print("\nNo relevant Canadian deals were extracted from any of the pages.")
if __name__ == "__main__":
main()