Compare commits
2 Commits
969e229ced
...
977e5b93ad
Author | SHA1 | Date |
---|---|---|
![]() |
977e5b93ad | |
![]() |
a3da858a16 |
|
@ -3,6 +3,7 @@ from typing import Optional
|
|||
import google.generativeai as genai
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
|
@ -18,99 +19,172 @@ MODEL_NAME = "gemini-2.0-flash-lite"
|
|||
|
||||
# TODO: refine
|
||||
EXTRACTION_PROMPT = """
|
||||
You are an information extraction system.
|
||||
Your task is to extract specific fields from the provided article text (the 'source').
|
||||
The topic is Canadian military exports/transactions.
|
||||
You are a precise data-extraction system.
|
||||
|
||||
Follow these rules strictly:
|
||||
1. Output ONLY valid JSON — no explanations or commentary.
|
||||
2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders).
|
||||
3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the field’s requirement.
|
||||
4. Do not guess or infer — if the text is ambiguous, leave the field out.
|
||||
5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition).
|
||||
6. Do not mix unrelated information into a field.
|
||||
Given the DOCUMENT TEXT below, extract ALL transactions or arms-export relevant
|
||||
entries and output a JSON array (possibly empty) of objects that match the
|
||||
Project Ploughshares API schema. Output ONLY the JSON array — no markdown,
|
||||
no commentary, no code fences.
|
||||
|
||||
Fields to extract (omit if not found):
|
||||
* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract")
|
||||
* "company_division": Canadian company/division involved in the transaction
|
||||
* "address_1", "address_2", "city", "province", "region", "postal_code": Address of the company
|
||||
* "recipient": Recipient of the transaction, be it a country, organization, or individual
|
||||
* "amount": Transaction amount, including the currency
|
||||
* "description": Transaction description
|
||||
* "source_date": Date in YYYY-MM-DD format the source/article was posted at.
|
||||
* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article.
|
||||
* "grant_type": Type of grant
|
||||
* "commodity_class": Commodity classification or the product being exported in the transaction, e.g. missile components, avionics, engines
|
||||
* "contract_number": Contract number
|
||||
* "comments": Additional comments
|
||||
* "is_primary": Boolean flag
|
||||
Each object must use the following fields (required fields must be provided
|
||||
and set to "Not Found" if absent):
|
||||
|
||||
Required fields:
|
||||
- transaction_type (string) # e.g., "Export", "Purchase Order", "Component Supply"
|
||||
- company_division (string) # company or division name (use "Not Found" if unknown)
|
||||
- recipient (string) # receiving country or recipient (use "Not Found" if unknown)
|
||||
|
||||
Optional fields (include if present):
|
||||
- amount (string or number) # monetary value if present (e.g., "15,000,000 CAD")
|
||||
- description (string)
|
||||
- address_1, address_2, city, province, region, postal_code
|
||||
- source_date (string YYYY-MM-DD)
|
||||
- source_description (string)
|
||||
- grant_type (string)
|
||||
- commodity_class (string) # e.g., missile components, avionics, engines
|
||||
- contract_number (string)
|
||||
- comments (string)
|
||||
- is_primary (boolean)
|
||||
|
||||
Additionally, include these two new fields to help filter relevance:
|
||||
- canadian_relevance (string) # one of: "direct", "indirect", "none"
|
||||
- "direct" = Canadian company or Canada-origin export of military goods/components
|
||||
- "indirect" = Canadian-made parts/components appear in a larger export (final assembly elsewhere)
|
||||
- "none" = no meaningful Canadian connection
|
||||
- relation_explanation (string) # short explanation why this is direct/indirect/none (1-2 sentences)
|
||||
|
||||
Rules:
|
||||
1. If a piece of info cannot be found, set it to the string "Not Found" (not null).
|
||||
2. If multiple transactions are described in the text, output them as separate objects.
|
||||
3. If the text contains the same transaction repeated, ensure you only output one object per distinct transaction.
|
||||
4. Output must be valid JSON (an array). Example:
|
||||
[
|
||||
{{
|
||||
"transaction_type": "Export",
|
||||
"company_division": "Example Corp Canada",
|
||||
"recipient": "Country X",
|
||||
"amount": "3,000,000 CAD",
|
||||
"commodity_class": "avionics modules",
|
||||
"description": "Example summary ...",
|
||||
"source_url": "https://example.com/article",
|
||||
"canadian_relevance": "direct",
|
||||
"relation_explanation": "Company is based in Canada and shipped avionics modules."
|
||||
}}
|
||||
]
|
||||
|
||||
---
|
||||
DOCUMENT TEXT:
|
||||
{text_content}
|
||||
"""
|
||||
|
||||
SCHEMA = {
|
||||
"type": "object",
|
||||
"required": ["source_description"],
|
||||
"properties": {
|
||||
"transaction_type": {"type": "string"},
|
||||
"company_division": {"type": "string"},
|
||||
"recipient": {"type": "string"},
|
||||
"amount": {"type": "number"},
|
||||
"description": {"type": "string"},
|
||||
"address_1": {"type": "string"},
|
||||
"address_2": {"type": "string"},
|
||||
"city": {"type": "string"},
|
||||
"province": {"type": "string"},
|
||||
"region": {"type": "string"},
|
||||
"postal_code": {"type": "string"},
|
||||
"source_date": {"type": "string"},
|
||||
"source_description": {"type": "string"},
|
||||
"grant_type": {"type": "string"},
|
||||
"commodity_class": {"type": "string"},
|
||||
"contract_number": {"type": "string"},
|
||||
"comments": {"type": "string"},
|
||||
"is_primary": {"type": "boolean"}
|
||||
}
|
||||
}
|
||||
def extract_json_from_text(text):
|
||||
"""
|
||||
Attempts to find and return the first JSON array or object in a text blob.
|
||||
This removes markdown fences and extracts from the first '[' ... ']' or '{' ... '}' pair.
|
||||
"""
|
||||
if not text or not isinstance(text, str):
|
||||
return None
|
||||
# remove common fences
|
||||
cleaned = text.strip()
|
||||
cleaned = cleaned.replace("```json", "").replace("```", "").strip()
|
||||
|
||||
def validate_info(extracted_info):
|
||||
if ("transaction_type" not in extracted_info):
|
||||
return False
|
||||
if (len(extracted_info["transaction_type"]) == 0):
|
||||
return False
|
||||
if ("company_division" not in extracted_info):
|
||||
return False
|
||||
if (len(extracted_info["company_division"]) == 0):
|
||||
return False
|
||||
if ("recipient" not in extracted_info):
|
||||
return False
|
||||
if (len(extracted_info["recipient"]) == 0):
|
||||
return False
|
||||
return True
|
||||
# Try to locate a JSON array first
|
||||
arr_match = re.search(r"(\[.*\])", cleaned, flags=re.DOTALL)
|
||||
if arr_match:
|
||||
return arr_match.group(1)
|
||||
|
||||
# Otherwise try a single JSON object
|
||||
obj_match = re.search(r"(\{.*\})", cleaned, flags=re.DOTALL)
|
||||
if obj_match:
|
||||
return obj_match.group(1)
|
||||
|
||||
return None
|
||||
|
||||
def process_content_with_gemini(text_content):
|
||||
"""
|
||||
Sends the text to the Gemini API with the extraction prompt and
|
||||
parses the JSON response.
|
||||
Sends the text to Gemini with the extraction prompt and parses the JSON response.
|
||||
Uses your existing SDK usage pattern (genai.GenerativeModel).
|
||||
"""
|
||||
# Keep using your existing model init pattern
|
||||
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
|
||||
|
||||
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
|
||||
|
||||
try:
|
||||
response = model.generate_content(
|
||||
prompt,
|
||||
generation_config={
|
||||
"response_schema": SCHEMA,
|
||||
"response_mime_type": 'application/json',
|
||||
}
|
||||
)
|
||||
return json.loads(response.text)
|
||||
# Generate content. Your original code used model.generate_content(prompt)
|
||||
response = model.generate_content(prompt)
|
||||
# Response object in your environment exposes .text (as in your original script)
|
||||
raw = getattr(response, "text", str(response))
|
||||
# Try to extract JSON from the possibly noisy response
|
||||
json_fragment = extract_json_from_text(raw) or raw
|
||||
|
||||
# Parse JSON
|
||||
parsed = json.loads(json_fragment)
|
||||
# Ensure it's an array
|
||||
if isinstance(parsed, dict):
|
||||
parsed = [parsed]
|
||||
return parsed
|
||||
|
||||
except Exception as e:
|
||||
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
|
||||
# print raw text to help debugging if available
|
||||
try:
|
||||
print(" Raw response (truncated):", raw[:1000])
|
||||
except Exception:
|
||||
pass
|
||||
return {"error": str(e)}
|
||||
|
||||
def is_valid_transaction(tx):
|
||||
"""
|
||||
Basic validation to ensure required API fields exist.
|
||||
Required fields (per API): transaction_type, company_division, recipient
|
||||
If a field is present but "Not Found", treat as missing for the
|
||||
purposes of deciding whether to keep the record (we still surface it sometimes).
|
||||
"""
|
||||
for field in ["transaction_type", "company_division", "recipient"]:
|
||||
if field not in tx or not tx[field] or tx[field] == "Not Found":
|
||||
return False
|
||||
return True
|
||||
|
||||
API_BASE_URL = "http://ploughshares.nixc.us/api/transaction"
|
||||
HEADERS = {"Content-Type": "application/json"}
|
||||
|
||||
allowed_fields = {
|
||||
"transaction_type", "company_division", "recipient", "amount",
|
||||
"description", "address_1", "address_2", "city", "province", "region",
|
||||
"postal_code", "source_date", "source_description", "grant_type",
|
||||
"commodity_class", "contract_number", "comments", "is_primary"
|
||||
}
|
||||
|
||||
def clean_for_api(tx):
|
||||
cleaned = {k: v for k, v in tx.items() if k in allowed_fields}
|
||||
|
||||
# Remove invalid source_date
|
||||
if "source_date" in cleaned:
|
||||
if not isinstance(cleaned["source_date"], str) or cleaned["source_date"].lower() == "not found":
|
||||
cleaned.pop("source_date")
|
||||
|
||||
# Remove invalid amount (API expects numeric)
|
||||
if "amount" in cleaned:
|
||||
# If "Not Found" or not parseable as a float, drop it
|
||||
try:
|
||||
float(str(cleaned["amount"]).replace(",", "").replace("$", ""))
|
||||
except ValueError:
|
||||
cleaned.pop("amount")
|
||||
|
||||
# Use source_url for source_description
|
||||
if "source_url" in tx:
|
||||
cleaned["source_description"] = tx["source_url"]
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
def post_transaction(transaction):
|
||||
payload = clean_for_api(transaction)
|
||||
response = requests.post(API_BASE_URL, headers=HEADERS, json=payload)
|
||||
if response.status_code == 200 or response.status_code == 201:
|
||||
print(f"✅ Created transaction for {payload['company_division']} → ID: {response.json().get('transaction_id')}")
|
||||
else:
|
||||
print(f"❌ Failed to create transaction: {response.status_code} - {response.text}")
|
||||
|
||||
async def main():
|
||||
"""Main function to run the data extraction process."""
|
||||
|
@ -133,34 +207,61 @@ async def main():
|
|||
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
|
||||
|
||||
for i, page in enumerate(scraped_pages):
|
||||
url = page.get("url", "unknown_url")
|
||||
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
|
||||
|
||||
# Avoid processing pages with very little text
|
||||
if len(page.get('content', '')) < 150:
|
||||
text = page.get("content", "")
|
||||
if len(text) < 150:
|
||||
print(" ⏩ Skipping page due to insufficient content.")
|
||||
continue
|
||||
|
||||
extracted_info = process_content_with_gemini(page['content'])
|
||||
extracted_items = process_content_with_gemini(page['content'])
|
||||
|
||||
# Check if the extraction was successful and contains actual data
|
||||
if extracted_info and "error" not in extracted_info:
|
||||
if validate_info(extracted_info):
|
||||
print(" ✔️ Found relevant info")
|
||||
desc = ""
|
||||
if "source_description" in extracted_info:
|
||||
desc = extracted_info["source_description"]
|
||||
extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}"
|
||||
all_extracted_deals.append(extracted_info)
|
||||
else:
|
||||
print(" ❌ insufficient info")
|
||||
print(f" Extracted info: {extracted_info}")
|
||||
|
||||
# Add a small delay to respect API rate limits (1 second is safe)
|
||||
# If model returned a single object or error, handle gracefully
|
||||
if not extracted_items:
|
||||
print(" ⚪ Gemini returned no items.")
|
||||
time.sleep(1)
|
||||
continue
|
||||
if isinstance(extracted_items, dict) and "error" in extracted_items:
|
||||
print(" ⚠️ Gemini error:", extracted_items.get("error"))
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
# iterate through items (should be array of objects)
|
||||
for tx in extracted_items:
|
||||
# attach source_url for traceability
|
||||
tx.setdefault("source_url", url) # type: ignore
|
||||
|
||||
# if the model gives canadian_relevance, use it to decide whether to keep
|
||||
relevance = (tx.get("canadian_relevance") or "none").lower() # type: ignore
|
||||
explanation = tx.get("relation_explanation", "") # type: ignore
|
||||
|
||||
# If model says 'none', skip by default (these are the irrelevant ones like US missile contracts)
|
||||
if relevance == "none":
|
||||
print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200])
|
||||
continue
|
||||
|
||||
# basic required-field check (we want the API-required fields present)
|
||||
if not is_valid_transaction(tx):
|
||||
print(" ⚠️ Skipping — missing required API fields in extracted transaction:", tx)
|
||||
continue
|
||||
|
||||
# Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now
|
||||
# Save the item
|
||||
all_extracted_deals.append(tx)
|
||||
print(f" ✔️ Kept transaction: {tx.get('company_division')} → {tx.get('recipient')} ({relevance})") # type: ignore
|
||||
|
||||
# Respect rate limit
|
||||
time.sleep(1)
|
||||
|
||||
if all_extracted_deals:
|
||||
print("WRITING TO DB")
|
||||
for transaction in all_extracted_deals:
|
||||
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
|
||||
try:
|
||||
post_transaction(transaction)
|
||||
except Exception as e:
|
||||
print(f"Error posting transaction: {e}")
|
||||
else:
|
||||
print("\nNo relevant deals were extracted from any of the pages.")
|
||||
|
||||
|
|
Loading…
Reference in New Issue