Compare commits

..

2 Commits

Author SHA1 Message Date
jChenvan 977e5b93ad Merge branch 'main' of https://git.nixc.us/colin/ploughshares
ci/woodpecker/push/woodpecker Pipeline was successful Details
2025-08-20 19:31:54 -04:00
jChenvan a3da858a16 Dork scraper use same prompt/logic as main crawler 2025-08-20 19:08:45 -04:00
1 changed files with 191 additions and 90 deletions

View File

@ -3,6 +3,7 @@ from typing import Optional
import google.generativeai as genai import google.generativeai as genai
import json import json
import os import os
import re
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
import requests import requests
@ -18,99 +19,172 @@ MODEL_NAME = "gemini-2.0-flash-lite"
# TODO: refine # TODO: refine
EXTRACTION_PROMPT = """ EXTRACTION_PROMPT = """
You are an information extraction system. You are a precise data-extraction system.
Your task is to extract specific fields from the provided article text (the 'source').
The topic is Canadian military exports/transactions.
Follow these rules strictly: Given the DOCUMENT TEXT below, extract ALL transactions or arms-export relevant
1. Output ONLY valid JSON no explanations or commentary. entries and output a JSON array (possibly empty) of objects that match the
2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders). Project Ploughshares API schema. Output ONLY the JSON array no markdown,
3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the fields requirement. no commentary, no code fences.
4. Do not guess or infer if the text is ambiguous, leave the field out.
5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition).
6. Do not mix unrelated information into a field.
Fields to extract (omit if not found): Each object must use the following fields (required fields must be provided
* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract") and set to "Not Found" if absent):
* "company_division": Canadian company/division involved in the transaction
* "address_1", "address_2", "city", "province", "region", "postal_code": Address of the company Required fields:
* "recipient": Recipient of the transaction, be it a country, organization, or individual - transaction_type (string) # e.g., "Export", "Purchase Order", "Component Supply"
* "amount": Transaction amount, including the currency - company_division (string) # company or division name (use "Not Found" if unknown)
* "description": Transaction description - recipient (string) # receiving country or recipient (use "Not Found" if unknown)
* "source_date": Date in YYYY-MM-DD format the source/article was posted at.
* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article. Optional fields (include if present):
* "grant_type": Type of grant - amount (string or number) # monetary value if present (e.g., "15,000,000 CAD")
* "commodity_class": Commodity classification or the product being exported in the transaction, e.g. missile components, avionics, engines - description (string)
* "contract_number": Contract number - address_1, address_2, city, province, region, postal_code
* "comments": Additional comments - source_date (string YYYY-MM-DD)
* "is_primary": Boolean flag - source_description (string)
- grant_type (string)
- commodity_class (string) # e.g., missile components, avionics, engines
- contract_number (string)
- comments (string)
- is_primary (boolean)
Additionally, include these two new fields to help filter relevance:
- canadian_relevance (string) # one of: "direct", "indirect", "none"
- "direct" = Canadian company or Canada-origin export of military goods/components
- "indirect" = Canadian-made parts/components appear in a larger export (final assembly elsewhere)
- "none" = no meaningful Canadian connection
- relation_explanation (string) # short explanation why this is direct/indirect/none (1-2 sentences)
Rules:
1. If a piece of info cannot be found, set it to the string "Not Found" (not null).
2. If multiple transactions are described in the text, output them as separate objects.
3. If the text contains the same transaction repeated, ensure you only output one object per distinct transaction.
4. Output must be valid JSON (an array). Example:
[
{{
"transaction_type": "Export",
"company_division": "Example Corp Canada",
"recipient": "Country X",
"amount": "3,000,000 CAD",
"commodity_class": "avionics modules",
"description": "Example summary ...",
"source_url": "https://example.com/article",
"canadian_relevance": "direct",
"relation_explanation": "Company is based in Canada and shipped avionics modules."
}}
]
---
DOCUMENT TEXT: DOCUMENT TEXT:
{text_content} {text_content}
""" """
SCHEMA = { def extract_json_from_text(text):
"type": "object", """
"required": ["source_description"], Attempts to find and return the first JSON array or object in a text blob.
"properties": { This removes markdown fences and extracts from the first '[' ... ']' or '{' ... '}' pair.
"transaction_type": {"type": "string"}, """
"company_division": {"type": "string"}, if not text or not isinstance(text, str):
"recipient": {"type": "string"}, return None
"amount": {"type": "number"}, # remove common fences
"description": {"type": "string"}, cleaned = text.strip()
"address_1": {"type": "string"}, cleaned = cleaned.replace("```json", "").replace("```", "").strip()
"address_2": {"type": "string"},
"city": {"type": "string"},
"province": {"type": "string"},
"region": {"type": "string"},
"postal_code": {"type": "string"},
"source_date": {"type": "string"},
"source_description": {"type": "string"},
"grant_type": {"type": "string"},
"commodity_class": {"type": "string"},
"contract_number": {"type": "string"},
"comments": {"type": "string"},
"is_primary": {"type": "boolean"}
}
}
def validate_info(extracted_info): # Try to locate a JSON array first
if ("transaction_type" not in extracted_info): arr_match = re.search(r"(\[.*\])", cleaned, flags=re.DOTALL)
return False if arr_match:
if (len(extracted_info["transaction_type"]) == 0): return arr_match.group(1)
return False
if ("company_division" not in extracted_info): # Otherwise try a single JSON object
return False obj_match = re.search(r"(\{.*\})", cleaned, flags=re.DOTALL)
if (len(extracted_info["company_division"]) == 0): if obj_match:
return False return obj_match.group(1)
if ("recipient" not in extracted_info):
return False return None
if (len(extracted_info["recipient"]) == 0):
return False
return True
def process_content_with_gemini(text_content): def process_content_with_gemini(text_content):
""" """
Sends the text to the Gemini API with the extraction prompt and Sends the text to Gemini with the extraction prompt and parses the JSON response.
parses the JSON response. Uses your existing SDK usage pattern (genai.GenerativeModel).
""" """
# Keep using your existing model init pattern
model = genai.GenerativeModel(MODEL_NAME) # type: ignore model = genai.GenerativeModel(MODEL_NAME) # type: ignore
prompt = EXTRACTION_PROMPT.format(text_content=text_content) prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try: try:
response = model.generate_content( # Generate content. Your original code used model.generate_content(prompt)
prompt, response = model.generate_content(prompt)
generation_config={ # Response object in your environment exposes .text (as in your original script)
"response_schema": SCHEMA, raw = getattr(response, "text", str(response))
"response_mime_type": 'application/json', # Try to extract JSON from the possibly noisy response
} json_fragment = extract_json_from_text(raw) or raw
)
return json.loads(response.text) # Parse JSON
parsed = json.loads(json_fragment)
# Ensure it's an array
if isinstance(parsed, dict):
parsed = [parsed]
return parsed
except Exception as e: except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}") print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
# print raw text to help debugging if available
try:
print(" Raw response (truncated):", raw[:1000])
except Exception:
pass
return {"error": str(e)} return {"error": str(e)}
def is_valid_transaction(tx):
"""
Basic validation to ensure required API fields exist.
Required fields (per API): transaction_type, company_division, recipient
If a field is present but "Not Found", treat as missing for the
purposes of deciding whether to keep the record (we still surface it sometimes).
"""
for field in ["transaction_type", "company_division", "recipient"]:
if field not in tx or not tx[field] or tx[field] == "Not Found":
return False
return True
API_BASE_URL = "http://ploughshares.nixc.us/api/transaction"
HEADERS = {"Content-Type": "application/json"}
allowed_fields = {
"transaction_type", "company_division", "recipient", "amount",
"description", "address_1", "address_2", "city", "province", "region",
"postal_code", "source_date", "source_description", "grant_type",
"commodity_class", "contract_number", "comments", "is_primary"
}
def clean_for_api(tx):
cleaned = {k: v for k, v in tx.items() if k in allowed_fields}
# Remove invalid source_date
if "source_date" in cleaned:
if not isinstance(cleaned["source_date"], str) or cleaned["source_date"].lower() == "not found":
cleaned.pop("source_date")
# Remove invalid amount (API expects numeric)
if "amount" in cleaned:
# If "Not Found" or not parseable as a float, drop it
try:
float(str(cleaned["amount"]).replace(",", "").replace("$", ""))
except ValueError:
cleaned.pop("amount")
# Use source_url for source_description
if "source_url" in tx:
cleaned["source_description"] = tx["source_url"]
return cleaned
def post_transaction(transaction):
payload = clean_for_api(transaction)
response = requests.post(API_BASE_URL, headers=HEADERS, json=payload)
if response.status_code == 200 or response.status_code == 201:
print(f"✅ Created transaction for {payload['company_division']} → ID: {response.json().get('transaction_id')}")
else:
print(f"❌ Failed to create transaction: {response.status_code} - {response.text}")
async def main(): async def main():
"""Main function to run the data extraction process.""" """Main function to run the data extraction process."""
@ -133,34 +207,61 @@ async def main():
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...") print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages): for i, page in enumerate(scraped_pages):
url = page.get("url", "unknown_url")
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}") print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
# Avoid processing pages with very little text # Avoid processing pages with very little text
if len(page.get('content', '')) < 150: text = page.get("content", "")
if len(text) < 150:
print(" ⏩ Skipping page due to insufficient content.") print(" ⏩ Skipping page due to insufficient content.")
continue continue
extracted_info = process_content_with_gemini(page['content']) extracted_items = process_content_with_gemini(page['content'])
# Check if the extraction was successful and contains actual data # If model returned a single object or error, handle gracefully
if extracted_info and "error" not in extracted_info: if not extracted_items:
if validate_info(extracted_info): print(" ⚪ Gemini returned no items.")
print(" ✔️ Found relevant info") time.sleep(1)
desc = "" continue
if "source_description" in extracted_info: if isinstance(extracted_items, dict) and "error" in extracted_items:
desc = extracted_info["source_description"] print(" ⚠️ Gemini error:", extracted_items.get("error"))
extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}" time.sleep(1)
all_extracted_deals.append(extracted_info) continue
else:
print(" ❌ insufficient info")
print(f" Extracted info: {extracted_info}")
# Add a small delay to respect API rate limits (1 second is safe) # iterate through items (should be array of objects)
for tx in extracted_items:
# attach source_url for traceability
tx.setdefault("source_url", url) # type: ignore
# if the model gives canadian_relevance, use it to decide whether to keep
relevance = (tx.get("canadian_relevance") or "none").lower() # type: ignore
explanation = tx.get("relation_explanation", "") # type: ignore
# If model says 'none', skip by default (these are the irrelevant ones like US missile contracts)
if relevance == "none":
print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200])
continue
# basic required-field check (we want the API-required fields present)
if not is_valid_transaction(tx):
print(" ⚠️ Skipping — missing required API fields in extracted transaction:", tx)
continue
# Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now
# Save the item
all_extracted_deals.append(tx)
print(f" ✔️ Kept transaction: {tx.get('company_division')}{tx.get('recipient')} ({relevance})") # type: ignore
# Respect rate limit
time.sleep(1) time.sleep(1)
if all_extracted_deals: if all_extracted_deals:
print("WRITING TO DB")
for transaction in all_extracted_deals: for transaction in all_extracted_deals:
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction) try:
post_transaction(transaction)
except Exception as e:
print(f"Error posting transaction: {e}")
else: else:
print("\nNo relevant deals were extracted from any of the pages.") print("\nNo relevant deals were extracted from any of the pages.")