173 lines
6.5 KiB
Python
173 lines
6.5 KiB
Python
import asyncio
|
||
from typing import Optional
|
||
import google.generativeai as genai
|
||
import json
|
||
import os
|
||
import time
|
||
from dotenv import load_dotenv
|
||
import requests
|
||
|
||
from get_all_feed_contents import get_all_feed_contents
|
||
load_dotenv()
|
||
|
||
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
|
||
|
||
INPUT_FILE = "./page_content.json"
|
||
|
||
MODEL_NAME = "gemini-2.0-flash-lite"
|
||
|
||
# TODO: refine
|
||
EXTRACTION_PROMPT = """
|
||
You are an information extraction system.
|
||
Your task is to extract specific fields from the provided article text (the 'source').
|
||
The topic is Canadian military exports/transactions.
|
||
|
||
Follow these rules strictly:
|
||
1. Output ONLY valid JSON — no explanations or commentary.
|
||
2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders).
|
||
3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the field’s requirement.
|
||
4. Do not guess or infer — if the text is ambiguous, leave the field out.
|
||
5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition).
|
||
6. Do not mix unrelated information into a field.
|
||
|
||
Fields to extract (omit if not found):
|
||
* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract")
|
||
* "company_division": Canadian company/division involved in the transaction
|
||
* "address_1": Address line 1 of the Company
|
||
* "address_2": Address line 2 of the Company
|
||
* "city": city where the Company is located
|
||
* "province": province where the Company is located
|
||
* "region": region where the Company is located
|
||
* "postal_code": postal Code of the Company
|
||
* "recipient": Recipient of the transaction, be it a country, organization, or individual
|
||
* "amount": Transaction amount including the currency
|
||
* "description": Transaction description
|
||
* "source_date": Date in YYYY-MM-DD format the source/article was posted at.
|
||
* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article.
|
||
* "grant_type": Type of grant
|
||
* "commodity_class": Commodity classification or the product being exported in the transaction
|
||
* "contract_number": Contract number
|
||
* "comments": Additional comments
|
||
* "is_primary": Boolean flag
|
||
|
||
---
|
||
DOCUMENT TEXT:
|
||
{text_content}
|
||
"""
|
||
|
||
SCHEMA = {
|
||
"type": "object",
|
||
"required": ["source_description"],
|
||
"properties": {
|
||
"transaction_type": {"type": "string"},
|
||
"company_division": {"type": "string"},
|
||
"recipient": {"type": "string"},
|
||
"amount": {"type": "number"},
|
||
"description": {"type": "string"},
|
||
"address_1": {"type": "string"},
|
||
"address_2": {"type": "string"},
|
||
"city": {"type": "string"},
|
||
"province": {"type": "string"},
|
||
"region": {"type": "string"},
|
||
"postal_code": {"type": "string"},
|
||
"source_date": {"type": "string"},
|
||
"source_description": {"type": "string"},
|
||
"grant_type": {"type": "string"},
|
||
"commodity_class": {"type": "string"},
|
||
"contract_number": {"type": "string"},
|
||
"comments": {"type": "string"},
|
||
"is_primary": {"type": "boolean"}
|
||
}
|
||
}
|
||
|
||
def validate_info(extracted_info):
|
||
if ("transaction_type" not in extracted_info):
|
||
return False
|
||
if (len(extracted_info["transaction_type"]) == 0):
|
||
return False
|
||
if ("company_division" not in extracted_info):
|
||
return False
|
||
if (len(extracted_info["company_division"]) == 0):
|
||
return False
|
||
if ("recipient" not in extracted_info):
|
||
return False
|
||
if (len(extracted_info["recipient"]) == 0):
|
||
return False
|
||
return True
|
||
|
||
def process_content_with_gemini(text_content):
|
||
"""
|
||
Sends the text to the Gemini API with the extraction prompt and
|
||
parses the JSON response.
|
||
"""
|
||
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
|
||
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
|
||
|
||
try:
|
||
response = model.generate_content(
|
||
prompt,
|
||
generation_config={
|
||
"response_schema": SCHEMA,
|
||
"response_mime_type": 'application/json',
|
||
}
|
||
)
|
||
return json.loads(response.text)
|
||
except Exception as e:
|
||
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
|
||
return {"error": str(e)}
|
||
|
||
|
||
async def main():
|
||
"""Main function to run the data extraction process."""
|
||
if not GOOGLE_API_KEY:
|
||
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
|
||
return
|
||
|
||
genai.configure(api_key=GOOGLE_API_KEY) # type: ignore
|
||
|
||
print("Retrieving all feed contents...")
|
||
scraped_pages = await get_all_feed_contents()
|
||
if not scraped_pages:
|
||
print("❌ Error: No scraper results found.")
|
||
return
|
||
print("✅ Successfully retrieved all feed contents.")
|
||
|
||
all_extracted_deals = []
|
||
total_pages = len(scraped_pages)
|
||
|
||
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
|
||
|
||
for i, page in enumerate(scraped_pages):
|
||
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
|
||
|
||
# Avoid processing pages with very little text
|
||
if len(page.get('content', '')) < 150:
|
||
print(" ⏩ Skipping page due to insufficient content.")
|
||
continue
|
||
|
||
extracted_info = process_content_with_gemini(page['content'])
|
||
|
||
# Check if the extraction was successful and contains actual data
|
||
if extracted_info and "error" not in extracted_info:
|
||
if validate_info(extracted_info):
|
||
print(" ✔️ Found relevant info")
|
||
desc = ""
|
||
if "source_description" in extracted_info:
|
||
desc = extracted_info["source_description"]
|
||
extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}"
|
||
all_extracted_deals.append(extracted_info)
|
||
else:
|
||
print(" ❌ insufficient info")
|
||
print(f" Extracted info: {extracted_info}")
|
||
|
||
# Add a small delay to respect API rate limits (1 second is safe)
|
||
time.sleep(1)
|
||
|
||
if all_extracted_deals:
|
||
for transaction in all_extracted_deals:
|
||
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
|
||
else:
|
||
print("\nNo relevant deals were extracted from any of the pages.")
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |