ploughshares/docker/crawler-google-alerts/main.py

168 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from typing import Optional
import google.generativeai as genai
import json
import os
import time
from dotenv import load_dotenv
import requests
from get_all_feed_contents import get_all_feed_contents
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
INPUT_FILE = "./page_content.json"
MODEL_NAME = "gemini-2.0-flash-lite"
# TODO: refine
EXTRACTION_PROMPT = """
You are an information extraction system.
Your task is to extract specific fields from the provided article text (the 'source').
The topic is Canadian military exports/transactions.
Follow these rules strictly:
1. Output ONLY valid JSON — no explanations or commentary.
2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders).
3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the fields requirement.
4. Do not guess or infer — if the text is ambiguous, leave the field out.
5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition).
6. Do not mix unrelated information into a field.
Fields to extract (omit if not found):
* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract")
* "company_division": Canadian company/division involved in the transaction
* "address_1", "address_2", "city", "province", "region", "postal_code": Address of the company
* "recipient": Recipient of the transaction, be it a country, organization, or individual
* "amount": Transaction amount, including the currency
* "description": Transaction description
* "source_date": Date in YYYY-MM-DD format the source/article was posted at.
* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article.
* "grant_type": Type of grant
* "commodity_class": Commodity classification or the product being exported in the transaction, e.g. missile components, avionics, engines
* "contract_number": Contract number
* "comments": Additional comments
* "is_primary": Boolean flag
---
DOCUMENT TEXT:
{text_content}
"""
SCHEMA = {
"type": "object",
"required": ["source_description"],
"properties": {
"transaction_type": {"type": "string"},
"company_division": {"type": "string"},
"recipient": {"type": "string"},
"amount": {"type": "number"},
"description": {"type": "string"},
"address_1": {"type": "string"},
"address_2": {"type": "string"},
"city": {"type": "string"},
"province": {"type": "string"},
"region": {"type": "string"},
"postal_code": {"type": "string"},
"source_date": {"type": "string"},
"source_description": {"type": "string"},
"grant_type": {"type": "string"},
"commodity_class": {"type": "string"},
"contract_number": {"type": "string"},
"comments": {"type": "string"},
"is_primary": {"type": "boolean"}
}
}
def validate_info(extracted_info):
if ("transaction_type" not in extracted_info):
return False
if (len(extracted_info["transaction_type"]) == 0):
return False
if ("company_division" not in extracted_info):
return False
if (len(extracted_info["company_division"]) == 0):
return False
if ("recipient" not in extracted_info):
return False
if (len(extracted_info["recipient"]) == 0):
return False
return True
def process_content_with_gemini(text_content):
"""
Sends the text to the Gemini API with the extraction prompt and
parses the JSON response.
"""
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try:
response = model.generate_content(
prompt,
generation_config={
"response_schema": SCHEMA,
"response_mime_type": 'application/json',
}
)
return json.loads(response.text)
except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
return {"error": str(e)}
async def main():
"""Main function to run the data extraction process."""
if not GOOGLE_API_KEY:
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
return
genai.configure(api_key=GOOGLE_API_KEY) # type: ignore
print("Retrieving all feed contents...")
scraped_pages = await get_all_feed_contents()
if not scraped_pages:
print("❌ Error: No scraper results found.")
return
print("✅ Successfully retrieved all feed contents.")
all_extracted_deals = []
total_pages = len(scraped_pages)
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages):
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
# Avoid processing pages with very little text
if len(page.get('content', '')) < 150:
print(" ⏩ Skipping page due to insufficient content.")
continue
extracted_info = process_content_with_gemini(page['content'])
# Check if the extraction was successful and contains actual data
if extracted_info and "error" not in extracted_info:
if validate_info(extracted_info):
print(" ✔️ Found relevant info")
desc = ""
if "source_description" in extracted_info:
desc = extracted_info["source_description"]
extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}"
all_extracted_deals.append(extracted_info)
else:
print(" ❌ insufficient info")
print(f" Extracted info: {extracted_info}")
# Add a small delay to respect API rate limits (1 second is safe)
time.sleep(1)
if all_extracted_deals:
for transaction in all_extracted_deals:
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
else:
print("\nNo relevant deals were extracted from any of the pages.")
if __name__ == "__main__":
asyncio.run(main())