Compare commits

..

2 Commits

Author SHA1 Message Date
jChenvan adb266a1e9 Update prompt
ci/woodpecker/push/woodpecker Pipeline was successful Details
2025-08-20 16:32:50 -04:00
jChenvan 7e819a6601 Updated google alerts scraper 2025-08-20 16:25:11 -04:00
14 changed files with 353 additions and 285 deletions

View File

@ -0,0 +1,5 @@
.env
feeds.csv
feed_contents.xml
page_content.json
__pycache__/

View File

@ -0,0 +1,61 @@
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
async def fetch_site(url: str) -> str | None:
"""
Fetches the main article text of a URL using Playwright and BeautifulSoup.
Args:
url: The URL of the website to fetch.
Returns:
A string containing the main text content of the page, or None on error.
"""
print(f"fetching {url}")
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
try:
await page.goto(url, wait_until='domcontentloaded', timeout=60000)
content = await page.content()
soup = BeautifulSoup(content, 'html.parser')
# Strategy: Find the main content container
# First, try to find a <main> tag. If not, look for an <article> tag.
# You can add more fallbacks based on common website structures,
# e.g., soup.find('div', id='content')
main_content = soup.find('main')
if not main_content:
main_content = soup.find('article')
# If a main content area is found, extract text from it.
if main_content:
# (Optional) Remove unwanted elements like scripts or ads from within the main content
for element in main_content(['script', 'style', 'aside']): # type: ignore
element.decompose()
print(f"SUCCESSFUL FETCH: {url}")
# .get_text() with separator and strip for cleaner output
return main_content.get_text(separator='\n', strip=True)
else:
# Fallback if no specific container is found (less reliable)
print("WARNING: No main content container found. Falling back to body.")
if soup.body:
body_text = soup.body.get_text(separator='\n', strip=True)
print(f"SUCCESSFUL FETCH: {url}")
return body_text
except Exception as e:
print(f"FAILED FETCH: {url}")
print(f"An error occurred: {e}")
return None
finally:
await browser.close()
# Example usage:
# asyncio.run(fetch_site("https://www.example.com"))

View File

@ -0,0 +1,92 @@
import asyncio
from typing import Dict, List, Tuple
from clean_string import clean_string
from fetch_site import fetch_site
from get_feeds import get_feeds
from get_links_from_feed import Alert, get_links_from_feed
import xml.etree.ElementTree as ET
async def get_all_feed_contents() -> List[Dict[str, str]]:
"""
Asynchronously fetches and processes content from multiple RSS feeds.
This function first gets a list of RSS feeds, extracts all article URLs from them,
and then asynchronously fetches the content of each URL. The content is cleaned
and returned as a list of dictionaries.
Returns:
List[Dict[str, str]]: A list of dictionaries, where each dictionary
contains the 'url' and its cleaned 'content'.
"""
feeds: List[Tuple[str, str]] = get_feeds()
urls: List[str] = []
for keyword, feed in feeds:
alerts: List[Alert] = get_links_from_feed(feed)
for alert in alerts:
urls.append(alert.url)
print(f"{len(alerts)} links found for '{keyword}'")
print(f"\n{len(urls)} total links found. Starting fetch process.")
pages: List[Dict[str, str]] = []
# Create a list of tasks to run concurrently
tasks = [fetch_site(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
if content:
pages.append({
"url": url,
"content": clean_string(content)
})
print(f"\nSuccessfully fetched {len(pages)} webpages.")
return pages
def write_to_xml(pages: List[Dict[str, str]], filename: str) -> None:
"""
Writes a list of page data to an XML file.
The XML structure will be:
<pages>
<page>
<url>http://...</url>
<content>...</content>
</page>
...
</pages>
Args:
pages (List[Dict[str, str]]): The list of page data to write.
filename (str): The name of the output XML file.
"""
root = ET.Element("pages")
for page_data in pages:
page_element = ET.SubElement(root, "page")
url_element = ET.SubElement(page_element, "url")
url_element.text = page_data.get("url")
content_element = ET.SubElement(page_element, "content")
content_element.text = page_data.get("content")
tree = ET.ElementTree(root)
# The 'xml_declaration' and 'encoding' arguments ensure it's a well-formed XML file.
tree.write(filename, encoding='utf-8', xml_declaration=True)
print(f"Data successfully written to {filename}")
async def main() -> None:
"""
Main entry point for the script.
"""
all_pages = await get_all_feed_contents()
if all_pages:
write_to_xml(all_pages, "feed_contents.xml")
else:
print("No pages were fetched. XML file not created.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,27 @@
import csv
import os
def get_feeds() -> list[tuple[str, str]]:
"""Reads feed names and URLs from a local CSV file.
This function opens 'feeds.csv', which is expected to be in the
same directory as this script. The CSV must have two columns:
the first for the feed name and the second for the URL.
Returns:
list[tuple[str, str]]: A list of tuples, where each tuple
contains a feed's name and its URL.
"""
feeds = []
file_path = os.path.join(os.path.dirname(__file__), "feeds.csv")
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
# If your CSV has a header row, uncomment the next line to skip it
# next(reader, None)
for row in reader:
# Ensure the row has exactly two columns to avoid errors
if len(row) == 2:
feeds.append((row[0], row[1]))
return feeds

View File

@ -0,0 +1,168 @@
import asyncio
from typing import Optional
import google.generativeai as genai
import json
import os
import time
from dotenv import load_dotenv
import requests
from get_all_feed_contents import get_all_feed_contents
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
INPUT_FILE = "./page_content.json"
MODEL_NAME = "gemini-2.0-flash-lite"
# TODO: refine
EXTRACTION_PROMPT = """
You are an information extraction system.
Your task is to extract specific fields from the provided article text (the 'source').
The topic is Canadian military exports/transactions.
Follow these rules strictly:
1. Output ONLY valid JSON no explanations or commentary.
2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders).
3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the fields requirement.
4. Do not guess or infer if the text is ambiguous, leave the field out.
5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition).
6. Do not mix unrelated information into a field.
Fields to extract (omit if not found):
* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract")
* "company_division": Canadian company/division involved in the transaction
* "address_1", "address_2", "city", "province", "region", "postal_code": Address of the company
* "recipient": Recipient of the transaction, be it a country, organization, or individual
* "amount": Transaction amount, including the currency
* "description": Transaction description
* "source_date": Date in YYYY-MM-DD format the source/article was posted at.
* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article.
* "grant_type": Type of grant
* "commodity_class": Commodity classification or the product being exported in the transaction, e.g. missile components, avionics, engines
* "contract_number": Contract number
* "comments": Additional comments
* "is_primary": Boolean flag
---
DOCUMENT TEXT:
{text_content}
"""
SCHEMA = {
"type": "object",
"required": ["source_description"],
"properties": {
"transaction_type": {"type": "string"},
"company_division": {"type": "string"},
"recipient": {"type": "string"},
"amount": {"type": "number"},
"description": {"type": "string"},
"address_1": {"type": "string"},
"address_2": {"type": "string"},
"city": {"type": "string"},
"province": {"type": "string"},
"region": {"type": "string"},
"postal_code": {"type": "string"},
"source_date": {"type": "string"},
"source_description": {"type": "string"},
"grant_type": {"type": "string"},
"commodity_class": {"type": "string"},
"contract_number": {"type": "string"},
"comments": {"type": "string"},
"is_primary": {"type": "boolean"}
}
}
def validate_info(extracted_info):
if ("transaction_type" not in extracted_info):
return False
if (len(extracted_info["transaction_type"]) == 0):
return False
if ("company_division" not in extracted_info):
return False
if (len(extracted_info["company_division"]) == 0):
return False
if ("recipient" not in extracted_info):
return False
if (len(extracted_info["recipient"]) == 0):
return False
return True
def process_content_with_gemini(text_content):
"""
Sends the text to the Gemini API with the extraction prompt and
parses the JSON response.
"""
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try:
response = model.generate_content(
prompt,
generation_config={
"response_schema": SCHEMA,
"response_mime_type": 'application/json',
}
)
return json.loads(response.text)
except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
return {"error": str(e)}
async def main():
"""Main function to run the data extraction process."""
if not GOOGLE_API_KEY:
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
return
genai.configure(api_key=GOOGLE_API_KEY) # type: ignore
print("Retrieving all feed contents...")
scraped_pages = await get_all_feed_contents()
if not scraped_pages:
print("❌ Error: No scraper results found.")
return
print("✅ Successfully retrieved all feed contents.")
all_extracted_deals = []
total_pages = len(scraped_pages)
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages):
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
# Avoid processing pages with very little text
if len(page.get('content', '')) < 150:
print(" ⏩ Skipping page due to insufficient content.")
continue
extracted_info = process_content_with_gemini(page['content'])
# Check if the extraction was successful and contains actual data
if extracted_info and "error" not in extracted_info:
if validate_info(extracted_info):
print(" ✔️ Found relevant info")
desc = ""
if "source_description" in extracted_info:
desc = extracted_info["source_description"]
extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}"
all_extracted_deals.append(extracted_info)
else:
print(" ❌ insufficient info")
print(f" Extracted info: {extracted_info}")
# Add a small delay to respect API rate limits (1 second is safe)
time.sleep(1)
if all_extracted_deals:
for transaction in all_extracted_deals:
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
else:
print("\nNo relevant deals were extracted from any of the pages.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,3 +0,0 @@
.env
page_content.json
__pycache__/

View File

@ -1,3 +0,0 @@
{
"Canadian Military Exports": "https://www.google.ca/alerts/feeds/02962857334213646081/4156920188674433267"
}

View File

@ -1,34 +0,0 @@
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
async def fetch_site(url: str) -> str | None:
"""
Fetches the text content of a URL using Playwright.
Args:
url: The URL of the website to fetch.
Returns:
A string containing the text content of the page, or None on error.
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
try:
# Change 'networkidle' to 'domcontentloaded' and increase timeout as a fallback
await page.goto(url, wait_until='domcontentloaded', timeout=60000)
content = await page.content()
soup = BeautifulSoup(content, 'html.parser')
# .get_text() is the standard method in modern BeautifulSoup
return soup.get_text()
except Exception as e:
print(f"An error occurred: {e}")
return None
finally:
await browser.close()

View File

@ -1,31 +0,0 @@
import asyncio
import json
from clean_string import clean_string
from fetch_site import fetch_site
from get_feeds import get_feeds
from get_links_from_feed import get_links_from_feed
async def get_all_feed_contents():
feeds = get_feeds()
urls = []
for keyword, feed in feeds:
alerts = get_links_from_feed(feed)
for alert in alerts:
urls.append(alert.url)
pages = []
for url in urls:
content = await fetch_site(url)
if not content:
continue
pages.append({
"url": url,
"content": clean_string(content)
})
return pages
async def main():
print(await get_all_feed_contents())
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,18 +0,0 @@
import json
import os
def get_feeds() -> list[tuple[str, str]]:
"""Reads feed names and URLs from a local JSON file.
This function opens 'feeds.json', which is expected to be in the
same directory as this script. It parses the JSON object, which
should contain string keys (feed names) and string values (URLs).
Returns:
list[tuple[str, str]]: A list of tuples, where each tuple
contains a feed's name and its URL.
"""
file_path = os.path.join(os.path.dirname(__file__), "./feeds.json")
with open(file_path, "r") as f:
data: dict[str, str] = json.load(f)
return list(data.items())

View File

@ -1,196 +0,0 @@
import asyncio
from typing import Optional
import google.generativeai as genai
import json
import os
import time
from dotenv import load_dotenv
from pydantic import BaseModel, Field
import requests
from get_all_feed_contents import get_all_feed_contents
load_dotenv()
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
INPUT_FILE = "./page_content.json"
MODEL_NAME = "gemini-2.0-flash-lite"
# TODO: refine
EXTRACTION_PROMPT = """
From the document text provided below, extract key details about any military or arms exports. More specifically, look for the following fields:
transaction_type - Type of transaction (e.g., "Purchase Order", "Subcontract")
company_division - Company or division name
recipient - Recipient of the transaction
amount - Transaction amount (defaults to 0)
description - Transaction description
address_1, address_2, city, province, region, postal_code - Address fields
source_date - Date in YYYY-MM-DD format
source_description - Source description
grant_type - Type of grant
commodity_class - Commodity classification
contract_number - Contract number
comments - Additional comments
is_primary - Boolean flag (defaults to false)
Do not hallucinate. If a field cannot be detemined from the text, leave it empty.
---
DOCUMENT TEXT:
{text_content}
"""
SCHEMA = {
"type": "object",
"properties": {
"transaction_type": {
"type": "string",
"description": "Type of transaction (e.g., 'Purchase Order', 'Subcontract')"
},
"company_division": {
"type": "string",
"description": "Company or division name"
},
"recipient": {
"type": "string",
"description": "Recipient of the transaction"
},
"amount": {
"type": "number",
"description": "Transaction amount",
},
"description": {
"type": "string",
"description": "Transaction description"
},
"address_1": {
"type": "string",
"description": "Address line 1"
},
"address_2": {
"type": "string",
"description": "Address line 2"
},
"city": {
"type": "string",
"description": "City"
},
"province": {
"type": "string",
"description": "Province/State"
},
"region": {
"type": "string",
"description": "Region"
},
"postal_code": {
"type": "string",
"description": "Postal code"
},
"source_date": {
"type": "string",
"format": "date-time",
"description": "Date in YYYY-MM-DD format"
},
"source_description": {
"type": "string",
"description": "Source description"
},
"grant_type": {
"type": "string",
"description": "Type of grant"
},
"commodity_class": {
"type": "string",
"description": "Commodity classification"
},
"contract_number": {
"type": "string",
"description": "Contract number"
},
"comments": {
"type": "string",
"description": "Additional comments"
},
"is_primary": {
"type": "boolean",
"description": "Boolean flag indicating if it's primary",
}
}
}
def process_content_with_gemini(text_content):
"""
Sends the text to the Gemini API with the extraction prompt and
parses the JSON response.
"""
model = genai.GenerativeModel(MODEL_NAME) # type: ignore
prompt = EXTRACTION_PROMPT.format(text_content=text_content)
try:
response = model.generate_content(
prompt,
generation_config={
"response_schema": SCHEMA,
"response_mime_type": 'application/json',
}
)
return json.loads(response.text)
except Exception as e:
print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}")
return {"error": str(e)}
async def main():
"""Main function to run the data extraction process."""
if not GOOGLE_API_KEY:
print("❌ Error: GOOGLE_API_KEY environment variable not set.")
return
genai.configure(api_key=GOOGLE_API_KEY) # type: ignore
print("Retrieving all feed contents...")
scraped_pages = await get_all_feed_contents()
if not scraped_pages:
print("❌ Error: No scraper results found.")
return
print("✅ Successfully retrieved all feed contents.")
all_extracted_deals = []
total_pages = len(scraped_pages)
print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...")
for i, page in enumerate(scraped_pages):
print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}")
# Avoid processing pages with very little text
if len(page.get('content', '')) < 150:
print(" ⏩ Skipping page due to insufficient content.")
continue
extracted_info = process_content_with_gemini(page['content'])
# Check if the extraction was successful and contains actual data
if extracted_info and "error" not in extracted_info:
if ("transaction_type" in extracted_info) and ("company_division" in extracted_info) and ("recipient" in extracted_info):
print(" ✔️ Found relevant info")
all_extracted_deals.append(extracted_info)
else:
print(" ❌ insufficient info")
print(f" Extracted info: {extracted_info}")
# Add a small delay to respect API rate limits (1 second is safe)
time.sleep(1)
if all_extracted_deals:
for transaction in all_extracted_deals:
requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction)
else:
print("\nNo relevant deals were extracted from any of the pages.")
if __name__ == "__main__":
asyncio.run(main())