diff --git a/docker/crawler-google-alerts/.gitignore b/docker/crawler-google-alerts/.gitignore new file mode 100644 index 0000000..9b97969 --- /dev/null +++ b/docker/crawler-google-alerts/.gitignore @@ -0,0 +1,5 @@ +.env +feeds.csv +feed_contents.xml +page_content.json +__pycache__/ \ No newline at end of file diff --git a/docker/crawler_dorks/clean_string.py b/docker/crawler-google-alerts/clean_string.py similarity index 100% rename from docker/crawler_dorks/clean_string.py rename to docker/crawler-google-alerts/clean_string.py diff --git a/docker/crawler-google-alerts/fetch_site.py b/docker/crawler-google-alerts/fetch_site.py new file mode 100644 index 0000000..a902041 --- /dev/null +++ b/docker/crawler-google-alerts/fetch_site.py @@ -0,0 +1,61 @@ +import asyncio +from playwright.async_api import async_playwright +from bs4 import BeautifulSoup + +async def fetch_site(url: str) -> str | None: + """ + Fetches the main article text of a URL using Playwright and BeautifulSoup. + + Args: + url: The URL of the website to fetch. + + Returns: + A string containing the main text content of the page, or None on error. + """ + print(f"fetching {url}") + async with async_playwright() as p: + browser = await p.chromium.launch() + page = await browser.new_page() + + try: + await page.goto(url, wait_until='domcontentloaded', timeout=60000) + + content = await page.content() + soup = BeautifulSoup(content, 'html.parser') + + # Strategy: Find the main content container + # First, try to find a
tag. If not, look for an
tag. + # You can add more fallbacks based on common website structures, + # e.g., soup.find('div', id='content') + main_content = soup.find('main') + if not main_content: + main_content = soup.find('article') + + # If a main content area is found, extract text from it. + if main_content: + + # (Optional) Remove unwanted elements like scripts or ads from within the main content + for element in main_content(['script', 'style', 'aside']): # type: ignore + element.decompose() + + print(f"SUCCESSFUL FETCH: {url}") + # .get_text() with separator and strip for cleaner output + return main_content.get_text(separator='\n', strip=True) + else: + # Fallback if no specific container is found (less reliable) + print("WARNING: No main content container found. Falling back to body.") + if soup.body: + body_text = soup.body.get_text(separator='\n', strip=True) + print(f"SUCCESSFUL FETCH: {url}") + return body_text + + except Exception as e: + print(f"FAILED FETCH: {url}") + print(f"An error occurred: {e}") + return None + + finally: + await browser.close() + +# Example usage: +# asyncio.run(fetch_site("https://www.example.com")) \ No newline at end of file diff --git a/docker/crawler-google-alerts/get_all_feed_contents.py b/docker/crawler-google-alerts/get_all_feed_contents.py new file mode 100644 index 0000000..2d32fed --- /dev/null +++ b/docker/crawler-google-alerts/get_all_feed_contents.py @@ -0,0 +1,92 @@ +import asyncio +from typing import Dict, List, Tuple +from clean_string import clean_string +from fetch_site import fetch_site +from get_feeds import get_feeds +from get_links_from_feed import Alert, get_links_from_feed +import xml.etree.ElementTree as ET + + +async def get_all_feed_contents() -> List[Dict[str, str]]: + """ + Asynchronously fetches and processes content from multiple RSS feeds. + + This function first gets a list of RSS feeds, extracts all article URLs from them, + and then asynchronously fetches the content of each URL. The content is cleaned + and returned as a list of dictionaries. + + Returns: + List[Dict[str, str]]: A list of dictionaries, where each dictionary + contains the 'url' and its cleaned 'content'. + """ + feeds: List[Tuple[str, str]] = get_feeds() + urls: List[str] = [] + + for keyword, feed in feeds: + alerts: List[Alert] = get_links_from_feed(feed) + for alert in alerts: + urls.append(alert.url) + print(f"{len(alerts)} links found for '{keyword}'") + + print(f"\n{len(urls)} total links found. Starting fetch process.") + pages: List[Dict[str, str]] = [] + + # Create a list of tasks to run concurrently + tasks = [fetch_site(url) for url in urls] + results = await asyncio.gather(*tasks) + + for url, content in zip(urls, results): + if content: + pages.append({ + "url": url, + "content": clean_string(content) + }) + + print(f"\nSuccessfully fetched {len(pages)} webpages.") + return pages + +def write_to_xml(pages: List[Dict[str, str]], filename: str) -> None: + """ + Writes a list of page data to an XML file. + + The XML structure will be: + + + http://... + ... + + ... + + + Args: + pages (List[Dict[str, str]]): The list of page data to write. + filename (str): The name of the output XML file. + """ + root = ET.Element("pages") + + for page_data in pages: + page_element = ET.SubElement(root, "page") + url_element = ET.SubElement(page_element, "url") + url_element.text = page_data.get("url") + content_element = ET.SubElement(page_element, "content") + content_element.text = page_data.get("content") + + tree = ET.ElementTree(root) + # The 'xml_declaration' and 'encoding' arguments ensure it's a well-formed XML file. + tree.write(filename, encoding='utf-8', xml_declaration=True) + print(f"Data successfully written to {filename}") + + +async def main() -> None: + """ + Main entry point for the script. + """ + all_pages = await get_all_feed_contents() + if all_pages: + write_to_xml(all_pages, "feed_contents.xml") + else: + print("No pages were fetched. XML file not created.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docker/crawler-google-alerts/get_feeds.py b/docker/crawler-google-alerts/get_feeds.py new file mode 100644 index 0000000..bb26186 --- /dev/null +++ b/docker/crawler-google-alerts/get_feeds.py @@ -0,0 +1,27 @@ +import csv +import os + +def get_feeds() -> list[tuple[str, str]]: + """Reads feed names and URLs from a local CSV file. + + This function opens 'feeds.csv', which is expected to be in the + same directory as this script. The CSV must have two columns: + the first for the feed name and the second for the URL. + + Returns: + list[tuple[str, str]]: A list of tuples, where each tuple + contains a feed's name and its URL. + """ + feeds = [] + file_path = os.path.join(os.path.dirname(__file__), "feeds.csv") + + with open(file_path, mode="r", newline="", encoding="utf-8") as f: + reader = csv.reader(f) + # If your CSV has a header row, uncomment the next line to skip it + # next(reader, None) + for row in reader: + # Ensure the row has exactly two columns to avoid errors + if len(row) == 2: + feeds.append((row[0], row[1])) + + return feeds \ No newline at end of file diff --git a/docker/crawler_dorks/get_links_from_feed.py b/docker/crawler-google-alerts/get_links_from_feed.py similarity index 100% rename from docker/crawler_dorks/get_links_from_feed.py rename to docker/crawler-google-alerts/get_links_from_feed.py diff --git a/docker/crawler-google-alerts/main.py b/docker/crawler-google-alerts/main.py new file mode 100644 index 0000000..79d30a6 --- /dev/null +++ b/docker/crawler-google-alerts/main.py @@ -0,0 +1,173 @@ +import asyncio +from typing import Optional +import google.generativeai as genai +import json +import os +import time +from dotenv import load_dotenv +import requests + +from get_all_feed_contents import get_all_feed_contents +load_dotenv() + +GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") + +INPUT_FILE = "./page_content.json" + +MODEL_NAME = "gemini-2.0-flash-lite" + +# TODO: refine +EXTRACTION_PROMPT = """ +You are an information extraction system. +Your task is to extract specific fields from the provided article text (the 'source'). +The topic is Canadian military exports/transactions. + +Follow these rules strictly: +1. Output ONLY valid JSON — no explanations or commentary. +2. Only include a field if you find a clear and unambiguous match. If the information is not explicitly present, omit that field entirely (do not use null, "", or placeholders). +3. Do not copy entire paragraphs into a field. Summarize or extract only the relevant fragment directly answering the field’s requirement. +4. Do not guess or infer — if the text is ambiguous, leave the field out. +5. If a number is expected, provide only the numeric value (without units unless the unit is part of the field definition). +6. Do not mix unrelated information into a field. + +Fields to extract (omit if not found): +* "transaction_type": Type of transaction being made (e.g., "Purchase Order", "Subcontract") +* "company_division": Canadian company/division involved in the transaction +* "address_1": Address line 1 of the Company +* "address_2": Address line 2 of the Company +* "city": city where the Company is located +* "province": province where the Company is located +* "region": region where the Company is located +* "postal_code": postal Code of the Company +* "recipient": Recipient of the transaction, be it a country, organization, or individual +* "amount": Transaction amount including the currency +* "description": Transaction description +* "source_date": Date in YYYY-MM-DD format the source/article was posted at. +* "source_description": Decription of the platform the source/article came from, as well as the content of the source/article. +* "grant_type": Type of grant +* "commodity_class": Commodity classification or the product being exported in the transaction +* "contract_number": Contract number +* "comments": Additional comments +* "is_primary": Boolean flag + +--- +DOCUMENT TEXT: +{text_content} +""" + +SCHEMA = { + "type": "object", + "required": ["source_description"], + "properties": { + "transaction_type": {"type": "string"}, + "company_division": {"type": "string"}, + "recipient": {"type": "string"}, + "amount": {"type": "number"}, + "description": {"type": "string"}, + "address_1": {"type": "string"}, + "address_2": {"type": "string"}, + "city": {"type": "string"}, + "province": {"type": "string"}, + "region": {"type": "string"}, + "postal_code": {"type": "string"}, + "source_date": {"type": "string"}, + "source_description": {"type": "string"}, + "grant_type": {"type": "string"}, + "commodity_class": {"type": "string"}, + "contract_number": {"type": "string"}, + "comments": {"type": "string"}, + "is_primary": {"type": "boolean"} + } +} + +def validate_info(extracted_info): + if ("transaction_type" not in extracted_info): + return False + if (len(extracted_info["transaction_type"]) == 0): + return False + if ("company_division" not in extracted_info): + return False + if (len(extracted_info["company_division"]) == 0): + return False + if ("recipient" not in extracted_info): + return False + if (len(extracted_info["recipient"]) == 0): + return False + return True + +def process_content_with_gemini(text_content): + """ + Sends the text to the Gemini API with the extraction prompt and + parses the JSON response. + """ + model = genai.GenerativeModel(MODEL_NAME) # type: ignore + prompt = EXTRACTION_PROMPT.format(text_content=text_content) + + try: + response = model.generate_content( + prompt, + generation_config={ + "response_schema": SCHEMA, + "response_mime_type": 'application/json', + } + ) + return json.loads(response.text) + except Exception as e: + print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}") + return {"error": str(e)} + + +async def main(): + """Main function to run the data extraction process.""" + if not GOOGLE_API_KEY: + print("❌ Error: GOOGLE_API_KEY environment variable not set.") + return + + genai.configure(api_key=GOOGLE_API_KEY) # type: ignore + + print("Retrieving all feed contents...") + scraped_pages = await get_all_feed_contents() + if not scraped_pages: + print("❌ Error: No scraper results found.") + return + print("✅ Successfully retrieved all feed contents.") + + all_extracted_deals = [] + total_pages = len(scraped_pages) + + print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...") + + for i, page in enumerate(scraped_pages): + print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}") + + # Avoid processing pages with very little text + if len(page.get('content', '')) < 150: + print(" ⏩ Skipping page due to insufficient content.") + continue + + extracted_info = process_content_with_gemini(page['content']) + + # Check if the extraction was successful and contains actual data + if extracted_info and "error" not in extracted_info: + if validate_info(extracted_info): + print(" ✔️ Found relevant info") + desc = "" + if "source_description" in extracted_info: + desc = extracted_info["source_description"] + extracted_info["source_description"] = f"Sourced from Google Alerts. Url: {page['url']}. {desc}" + all_extracted_deals.append(extracted_info) + else: + print(" ❌ insufficient info") + print(f" Extracted info: {extracted_info}") + + # Add a small delay to respect API rate limits (1 second is safe) + time.sleep(1) + + if all_extracted_deals: + for transaction in all_extracted_deals: + requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction) + else: + print("\nNo relevant deals were extracted from any of the pages.") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/docker/crawler_dorks/requirements.txt b/docker/crawler-google-alerts/requirements.txt similarity index 100% rename from docker/crawler_dorks/requirements.txt rename to docker/crawler-google-alerts/requirements.txt diff --git a/docker/crawler_dorks/.gitignore b/docker/crawler_dorks/.gitignore deleted file mode 100644 index 53d4418..0000000 --- a/docker/crawler_dorks/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -.env -page_content.json -__pycache__/ \ No newline at end of file diff --git a/docker/crawler_dorks/feeds.json b/docker/crawler_dorks/feeds.json deleted file mode 100644 index e4a3446..0000000 --- a/docker/crawler_dorks/feeds.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "Canadian Military Exports": "https://www.google.ca/alerts/feeds/02962857334213646081/4156920188674433267" -} \ No newline at end of file diff --git a/docker/crawler_dorks/fetch_site.py b/docker/crawler_dorks/fetch_site.py deleted file mode 100644 index 8528368..0000000 --- a/docker/crawler_dorks/fetch_site.py +++ /dev/null @@ -1,34 +0,0 @@ -import asyncio -from playwright.async_api import async_playwright -from bs4 import BeautifulSoup - -async def fetch_site(url: str) -> str | None: - """ - Fetches the text content of a URL using Playwright. - - Args: - url: The URL of the website to fetch. - - Returns: - A string containing the text content of the page, or None on error. - """ - async with async_playwright() as p: - browser = await p.chromium.launch() - page = await browser.new_page() - - try: - # Change 'networkidle' to 'domcontentloaded' and increase timeout as a fallback - await page.goto(url, wait_until='domcontentloaded', timeout=60000) - - content = await page.content() - soup = BeautifulSoup(content, 'html.parser') - - # .get_text() is the standard method in modern BeautifulSoup - return soup.get_text() - - except Exception as e: - print(f"An error occurred: {e}") - return None - - finally: - await browser.close() \ No newline at end of file diff --git a/docker/crawler_dorks/get_all_feed_contents.py b/docker/crawler_dorks/get_all_feed_contents.py deleted file mode 100644 index c1e2caf..0000000 --- a/docker/crawler_dorks/get_all_feed_contents.py +++ /dev/null @@ -1,31 +0,0 @@ -import asyncio -import json -from clean_string import clean_string -from fetch_site import fetch_site -from get_feeds import get_feeds -from get_links_from_feed import get_links_from_feed - - -async def get_all_feed_contents(): - feeds = get_feeds() - urls = [] - for keyword, feed in feeds: - alerts = get_links_from_feed(feed) - for alert in alerts: - urls.append(alert.url) - pages = [] - for url in urls: - content = await fetch_site(url) - if not content: - continue - pages.append({ - "url": url, - "content": clean_string(content) - }) - return pages - -async def main(): - print(await get_all_feed_contents()) - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/docker/crawler_dorks/get_feeds.py b/docker/crawler_dorks/get_feeds.py deleted file mode 100644 index 6bd756e..0000000 --- a/docker/crawler_dorks/get_feeds.py +++ /dev/null @@ -1,18 +0,0 @@ -import json -import os - -def get_feeds() -> list[tuple[str, str]]: - """Reads feed names and URLs from a local JSON file. - - This function opens 'feeds.json', which is expected to be in the - same directory as this script. It parses the JSON object, which - should contain string keys (feed names) and string values (URLs). - - Returns: - list[tuple[str, str]]: A list of tuples, where each tuple - contains a feed's name and its URL. - """ - file_path = os.path.join(os.path.dirname(__file__), "./feeds.json") - with open(file_path, "r") as f: - data: dict[str, str] = json.load(f) - return list(data.items()) \ No newline at end of file diff --git a/docker/crawler_dorks/main.py b/docker/crawler_dorks/main.py deleted file mode 100644 index eb5afd5..0000000 --- a/docker/crawler_dorks/main.py +++ /dev/null @@ -1,196 +0,0 @@ -import asyncio -from typing import Optional -import google.generativeai as genai -import json -import os -import time -from dotenv import load_dotenv -from pydantic import BaseModel, Field -import requests - -from get_all_feed_contents import get_all_feed_contents -load_dotenv() - -GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") - -INPUT_FILE = "./page_content.json" - -MODEL_NAME = "gemini-2.0-flash-lite" - -# TODO: refine -EXTRACTION_PROMPT = """ -From the document text provided below, extract key details about any military or arms exports. More specifically, look for the following fields: - - transaction_type - Type of transaction (e.g., "Purchase Order", "Subcontract") - company_division - Company or division name - recipient - Recipient of the transaction - amount - Transaction amount (defaults to 0) - description - Transaction description - address_1, address_2, city, province, region, postal_code - Address fields - source_date - Date in YYYY-MM-DD format - source_description - Source description - grant_type - Type of grant - commodity_class - Commodity classification - contract_number - Contract number - comments - Additional comments - is_primary - Boolean flag (defaults to false) - - -Do not hallucinate. If a field cannot be detemined from the text, leave it empty. - ---- -DOCUMENT TEXT: -{text_content} -""" - -SCHEMA = { - "type": "object", - "properties": { - "transaction_type": { - "type": "string", - "description": "Type of transaction (e.g., 'Purchase Order', 'Subcontract')" - }, - "company_division": { - "type": "string", - "description": "Company or division name" - }, - "recipient": { - "type": "string", - "description": "Recipient of the transaction" - }, - "amount": { - "type": "number", - "description": "Transaction amount", - }, - "description": { - "type": "string", - "description": "Transaction description" - }, - "address_1": { - "type": "string", - "description": "Address line 1" - }, - "address_2": { - "type": "string", - "description": "Address line 2" - }, - "city": { - "type": "string", - "description": "City" - }, - "province": { - "type": "string", - "description": "Province/State" - }, - "region": { - "type": "string", - "description": "Region" - }, - "postal_code": { - "type": "string", - "description": "Postal code" - }, - "source_date": { - "type": "string", - "format": "date-time", - "description": "Date in YYYY-MM-DD format" - }, - "source_description": { - "type": "string", - "description": "Source description" - }, - "grant_type": { - "type": "string", - "description": "Type of grant" - }, - "commodity_class": { - "type": "string", - "description": "Commodity classification" - }, - "contract_number": { - "type": "string", - "description": "Contract number" - }, - "comments": { - "type": "string", - "description": "Additional comments" - }, - "is_primary": { - "type": "boolean", - "description": "Boolean flag indicating if it's primary", - } - } -} - -def process_content_with_gemini(text_content): - """ - Sends the text to the Gemini API with the extraction prompt and - parses the JSON response. - """ - model = genai.GenerativeModel(MODEL_NAME) # type: ignore - prompt = EXTRACTION_PROMPT.format(text_content=text_content) - - try: - response = model.generate_content( - prompt, - generation_config={ - "response_schema": SCHEMA, - "response_mime_type": 'application/json', - } - ) - return json.loads(response.text) - except Exception as e: - print(f" ❌ An error occurred while calling Gemini or parsing its response: {e}") - return {"error": str(e)} - - -async def main(): - """Main function to run the data extraction process.""" - if not GOOGLE_API_KEY: - print("❌ Error: GOOGLE_API_KEY environment variable not set.") - return - - genai.configure(api_key=GOOGLE_API_KEY) # type: ignore - - print("Retrieving all feed contents...") - scraped_pages = await get_all_feed_contents() - if not scraped_pages: - print("❌ Error: No scraper results found.") - return - print("✅ Successfully retrieved all feed contents.") - - all_extracted_deals = [] - total_pages = len(scraped_pages) - - print(f"🤖 Starting information extraction with Gemini for {total_pages} pages...") - - for i, page in enumerate(scraped_pages): - print(f"\nProcessing page {i+1}/{total_pages}: {page['url']}") - - # Avoid processing pages with very little text - if len(page.get('content', '')) < 150: - print(" ⏩ Skipping page due to insufficient content.") - continue - - extracted_info = process_content_with_gemini(page['content']) - - # Check if the extraction was successful and contains actual data - if extracted_info and "error" not in extracted_info: - if ("transaction_type" in extracted_info) and ("company_division" in extracted_info) and ("recipient" in extracted_info): - print(" ✔️ Found relevant info") - all_extracted_deals.append(extracted_info) - else: - print(" ❌ insufficient info") - print(f" Extracted info: {extracted_info}") - - # Add a small delay to respect API rate limits (1 second is safe) - time.sleep(1) - - if all_extracted_deals: - for transaction in all_extracted_deals: - requests.post("https://ploughshares.nixc.us/api/transaction", json=transaction) - else: - print("\nNo relevant deals were extracted from any of the pages.") - -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file