ploughshares/docker/crawler_dorks/main.py

196 lines
5.9 KiB
Python

import asyncio
from typing import Optional
import google.generativeai as genai
import json
import os
import time
from dotenv import load_dotenv
from pydantic import BaseModel, Field
import requests
from get_all_feed_contents import get_all_feed_contents
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
INPUT_FILE = "./page_content.json"
MODEL_NAME = "gemini-2.0-flash-lite"
# TODO: refine
EXTRACTION_PROMPT = """
From the document text provided below, extract key details about any military or arms exports. More specifically, look for the following fields:
transaction_type - Type of transaction (e.g., "Purchase Order", "Subcontract")
company_division - Company or division name
recipient - Recipient of the transaction
amount - Transaction amount (defaults to 0)
description - Transaction description
address_1, address_2, city, province, region, postal_code - Address fields
source_date - Date in YYYY-MM-DD format
source_description - Source description
grant_type - Type of grant
commodity_class - Commodity classification
contract_number - Contract number
comments - Additional comments
is_primary - Boolean flag (defaults to false)
Do not hallucinate. If a field cannot be detemined from the text, leave it empty.
---
DOCUMENT TEXT:
{text_content}
"""
SCHEMA = {
"type": "object",
"properties": {
"transaction_type": {
"type": "string",
"description": "Type of transaction (e.g., 'Purchase Order', 'Subcontract')"
},
"company_division": {
"type": "string",
"description": "Company or division name"
},
"recipient": {
"type": "string",
"description": "Recipient of the transaction"
},
"amount": {
"type": "number",
"description": "Transaction amount",
},
"description": {
"type": "string",
"description": "Transaction description"
},
"address_1": {
"type": "string",
"description": "Address line 1"
},
"address_2": {
"type": "string",
"description": "Address line 2"
},
"city": {
"type": "string",
"description": "City"
},
"province": {
"type": "string",
"description": "Province/State"
},
"region": {
"type": "string",
"description": "Region"
},
"postal_code": {
"type": "string",
"description": "Postal code"
},
"source_date": {
"type": "string",
"format": "date-time",
"description": "Date in YYYY-MM-DD format"
},
"source_description": {
"type": "string",
"description": "Source description"
},
"grant_type": {
"type": "string",
"description": "Type of grant"
},
"commodity_class": {
"type": "string",
"description": "Commodity classification"
},
"contract_number": {
"type": "string",
"description": "Contract number"
},
"comments": {
"type": "string",
"description": "Additional comments"
},
"is_primary": {
"type": "boolean",
"description": "Boolean flag indicating if it's primary",
}
}
}
def process_content_with_gemini(text_content):
"""
Sends the text to the Gemini API with the extraction prompt and
parses the JSON response.
"""
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try:
response = model.generate_content(
prompt,
generation_config={
"response_schema": SCHEMA,
"response_mime_type": 'application/json',
}
)
return json.loads(response.text)
except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
return {"error": str(e)}
async def main():
"""Main function to run the data extraction process."""
if not GOOGLE_API_KEY:
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
return
genai.configure(api_key=GOOGLE_API_KEY) # type: ignore
print("Retrieving all feed contents...")
scraped_pages = await get_all_feed_contents()
if not scraped_pages:
print("❌ Error: No scraper results found.")
return
print("✅ Successfully retrieved all feed contents.")
all_extracted_deals = []
total_pages = len(scraped_pages)
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages):
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
# Avoid processing pages with very little text
if len(page.get('content', '')) < 150:
print(" ⏩ Skipping page due to insufficient content.")
continue
extracted_info = process_content_with_gemini(page['content'])
# Check if the extraction was successful and contains actual data
if extracted_info and "error" not in extracted_info:
if ("transaction_type" in extracted_info) and ("company_division" in extracted_info) and ("recipient" in extracted_info):
print(" ✔️ Found relevant info")
all_extracted_deals.append(extracted_info)
else:
print(" ❌ insufficient info")
print(f" Extracted info: {extracted_info}")
# Add a small delay to respect API rate limits (1 second is safe)
time.sleep(1)
if all_extracted_deals:
for transaction in all_extracted_deals:
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
else:
print("\nNo relevant deals were extracted from any of the pages.")
if __name__ == "__main__":
asyncio.run(main())