Merge branch 'main' of https://git.nixc.us/colin/ploughshares
ci/woodpecker/push/woodpecker Pipeline was successful Details

This commit is contained in:
coleWesterveld 2025-09-03 19:31:48 -04:00
commit 431d235e3b
8 changed files with 262 additions and 205 deletions

View File

@ -1,61 +0,0 @@
import asyncio
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
async def fetch_site(url: str) -> str | None:
"""
Fetches the main article text of a URL using Playwright and BeautifulSoup.
Args:
url: The URL of the website to fetch.
Returns:
A string containing the main text content of the page, or None on error.
"""
print(f"fetching {url}")
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
try:
await page.goto(url, wait_until='domcontentloaded', timeout=60000)
content = await page.content()
soup = BeautifulSoup(content, 'html.parser')
# Strategy: Find the main content container
# First, try to find a <main> tag. If not, look for an <article> tag.
# You can add more fallbacks based on common website structures,
# e.g., soup.find('div', id='content')
main_content = soup.find('main')
if not main_content:
main_content = soup.find('article')
# If a main content area is found, extract text from it.
if main_content:
# (Optional) Remove unwanted elements like scripts or ads from within the main content
for element in main_content(['script', 'style', 'aside']): # type: ignore
element.decompose()
print(f"SUCCESSFUL FETCH: {url}")
# .get_text() with separator and strip for cleaner output
return main_content.get_text(separator='\n', strip=True)
else:
# Fallback if no specific container is found (less reliable)
print("WARNING: No main content container found. Falling back to body.")
if soup.body:
body_text = soup.body.get_text(separator='\n', strip=True)
print(f"SUCCESSFUL FETCH: {url}")
return body_text
except Exception as e:
print(f"FAILED FETCH: {url}")
print(f"An error occurred: {e}")
return None
finally:
await browser.close()
# Example usage:
# asyncio.run(fetch_site("https://www.example.com"))

View File

@ -1,11 +1,160 @@
import asyncio
import csv
from dataclasses import dataclass
import os
from typing import Dict, List, Tuple
from clean_string import clean_string
from fetch_site import fetch_site
from get_feeds import get_feeds
from get_links_from_feed import Alert, get_links_from_feed
import xml.etree.ElementTree as ET
import feedparser
import urllib.parse
import requests
from clean_string import clean_string
import xml.etree.ElementTree as ET
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup
from seed_with_csv import seed_with_csv
@dataclass
class Alert:
"""A simple data class to hold information about a single alert."""
title: str
url: str
summary: str
def get_links_from_feed(rss_url: str) -> list[Alert]:
"""
Parses a Google Alerts RSS feed URL and extracts the data for each alert.
Args:
rss_url: The URL of the Google Alerts RSS feed.
Returns:
A list of Alert objects. Returns an empty list if the feed
cannot be parsed or is empty.
"""
alerts: list[Alert] = []
# Parse the RSS feed from the provided URL
feed = feedparser.parse(rss_url)
# Check if the feed was parsed successfully and has entries
if feed.bozo:
print(f"Error parsing feed: {feed.bozo_exception}")
return alerts
# Iterate over each entry in the feed
for entry in feed.entries:
# The title is directly available
title_soup = BeautifulSoup(entry.title, "html.parser") #type: ignore
title = title_soup.get_text()
# The summary often contains HTML, so we parse it to get clean text.
summary_soup = BeautifulSoup(entry.summary, 'html.parser') #type: ignore
summary = summary_soup.get_text()
# The link is a Google redirect URL; we extract the 'url' parameter.
link = entry.link
try:
# Parse the URL to easily access its components
parsed_url = urllib.parse.urlparse(link) #type: ignore
# Get the query parameters as a dictionary
query_params = urllib.parse.parse_qs(parsed_url.query)
# The actual destination URL is in the 'url' parameter
actual_url = query_params.get('url', [None])[0]
if actual_url:
# Append an Alert object instead of a tuple
alert_obj = Alert(title=title, url=actual_url, summary=summary)
alerts.append(alert_obj)
except Exception as e:
print(f"Could not parse URL for entry '{title}': {e}")
return alerts
def get_feeds() -> list[tuple[str, str]]:
"""Reads feed names and URLs from a local CSV file.
This function opens 'feeds.csv', which is expected to be in the
same directory as this script. The CSV must have two columns:
the first for the feed name and the second for the URL.
Returns:
list[tuple[str, str]]: A list of tuples, where each tuple
contains a feed's name and its URL.
"""
feeds = []
file_path = os.path.join(os.path.dirname(__file__), "feeds.csv")
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
# If your CSV has a header row, uncomment the next line to skip it
# next(reader, None)
for row in reader:
# Ensure the row has exactly two columns to avoid errors
if len(row) == 2:
feeds.append((row[0], row[1]))
return feeds
async def fetch_site(url: str) -> str | None:
"""
Fetches the main article text of a URL using Playwright and BeautifulSoup.
Args:
url: The URL of the website to fetch.
Returns:
A string containing the main text content of the page, or None on error.
"""
print(f"fetching {url}")
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
try:
await page.goto(url, wait_until='domcontentloaded', timeout=60000)
content = await page.content()
soup = BeautifulSoup(content, 'html.parser')
# Strategy: Find the main content container
# First, try to find a <main> tag. If not, look for an <article> tag.
# You can add more fallbacks based on common website structures,
# e.g., soup.find('div', id='content')
main_content = soup.find('main')
if not main_content:
main_content = soup.find('article')
# If a main content area is found, extract text from it.
if main_content:
# (Optional) Remove unwanted elements like scripts or ads from within the main content
for element in main_content(['script', 'style', 'aside']): # type: ignore
element.decompose()
main_text = main_content.get_text(separator='\n', strip=True)
main_text = clean_string(main_text)
print(f"SUCCESSFUL FETCH: {url}")
print(f"FETCH CONTENT: {main_text[:140]}...")
# .get_text() with separator and strip for cleaner output
return main_text
else:
# Fallback if no specific container is found (less reliable)
print("WARNING: No main content container found. Falling back to body.")
if soup.body:
body_text = soup.body.get_text(separator='\n', strip=True)
body_text = clean_string(body_text)
print(f"SUCCESSFUL FETCH: {url}")
print(f"FETCH CONTENT: {body_text[:140]}...")
return body_text
except Exception as e:
print(f"FAILED FETCH: {url}")
print(f"An error occurred: {e}")
return None
finally:
await browser.close()
async def get_all_feed_contents() -> List[Dict[str, str]]:
"""
@ -39,54 +188,8 @@ async def get_all_feed_contents() -> List[Dict[str, str]]:
if content:
pages.append({
"url": url,
"content": clean_string(content)
"content": content
})
print(f"\nSuccessfully fetched {len(pages)} webpages.")
return pages
def write_to_xml(pages: List[Dict[str, str]], filename: str) -> None:
"""
Writes a list of page data to an XML file.
The XML structure will be:
<pages>
<page>
<url>http://...</url>
<content>...</content>
</page>
...
</pages>
Args:
pages (List[Dict[str, str]]): The list of page data to write.
filename (str): The name of the output XML file.
"""
root = ET.Element("pages")
for page_data in pages:
page_element = ET.SubElement(root, "page")
url_element = ET.SubElement(page_element, "url")
url_element.text = page_data.get("url")
content_element = ET.SubElement(page_element, "content")
content_element.text = page_data.get("content")
tree = ET.ElementTree(root)
# The 'xml_declaration' and 'encoding' arguments ensure it's a well-formed XML file.
tree.write(filename, encoding='utf-8', xml_declaration=True)
print(f"Data successfully written to {filename}")
async def main() -> None:
"""
Main entry point for the script.
"""
all_pages = await get_all_feed_contents()
if all_pages:
write_to_xml(all_pages, "feed_contents.xml")
else:
print("No pages were fetched. XML file not created.")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,27 +0,0 @@
import csv
import os
def get_feeds() -> list[tuple[str, str]]:
"""Reads feed names and URLs from a local CSV file.
This function opens 'feeds.csv', which is expected to be in the
same directory as this script. The CSV must have two columns:
the first for the feed name and the second for the URL.
Returns:
list[tuple[str, str]]: A list of tuples, where each tuple
contains a feed's name and its URL.
"""
feeds = []
file_path = os.path.join(os.path.dirname(__file__), "feeds.csv")
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
# If your CSV has a header row, uncomment the next line to skip it
# next(reader, None)
for row in reader:
# Ensure the row has exactly two columns to avoid errors
if len(row) == 2:
feeds.append((row[0], row[1]))
return feeds

View File

@ -1,61 +0,0 @@
from dataclasses import dataclass
from bs4 import BeautifulSoup
import feedparser
import urllib.parse
@dataclass
class Alert:
"""A simple data class to hold information about a single alert."""
title: str
url: str
summary: str
def get_links_from_feed(rss_url: str) -> list[Alert]:
"""
Parses a Google Alerts RSS feed URL and extracts the data for each alert.
Args:
rss_url: The URL of the Google Alerts RSS feed.
Returns:
A list of Alert objects. Returns an empty list if the feed
cannot be parsed or is empty.
"""
alerts: list[Alert] = []
# Parse the RSS feed from the provided URL
feed = feedparser.parse(rss_url)
# Check if the feed was parsed successfully and has entries
if feed.bozo:
print(f"Error parsing feed: {feed.bozo_exception}")
return alerts
# Iterate over each entry in the feed
for entry in feed.entries:
# The title is directly available
title_soup = BeautifulSoup(entry.title, "html.parser") #type: ignore
title = title_soup.get_text()
# The summary often contains HTML, so we parse it to get clean text.
summary_soup = BeautifulSoup(entry.summary, 'html.parser') #type: ignore
summary = summary_soup.get_text()
# The link is a Google redirect URL; we extract the 'url' parameter.
link = entry.link
try:
# Parse the URL to easily access its components
parsed_url = urllib.parse.urlparse(link) #type: ignore
# Get the query parameters as a dictionary
query_params = urllib.parse.parse_qs(parsed_url.query)
# The actual destination URL is in the 'url' parameter
actual_url = query_params.get('url', [None])[0]
if actual_url:
# Append an Alert object instead of a tuple
alert_obj = Alert(title=title, url=actual_url, summary=summary)
alerts.append(alert_obj)
except Exception as e:
print(f"Could not parse URL for entry '{title}': {e}")
return alerts

View File

@ -239,7 +239,7 @@ async def main():
# If model says 'none', skip by default (these are the irrelevant ones like US missile contracts)
if relevance == "none":
print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", explanation[:200])
print(" ⚪ Skipping — model marked this as non-Canadian. Explanation:", tx)
continue
# basic required-field check (we want the API-required fields present)
@ -250,7 +250,7 @@ async def main():
# Optionally normalize some fields (convert "amount" to a canonical string) - keep simple for now
# Save the item
all_extracted_deals.append(tx)
print(f" ✔️ Kept transaction: {tx.get('company_division')}{tx.get('recipient')} ({relevance})") # type: ignore
print(f" ✔️ Kept transaction: {tx}") # type: ignore
# Respect rate limit
time.sleep(1)

View File

@ -0,0 +1,38 @@
import csv
import os
import requests
def seed_with_csv():
print("reading csv...")
file_path = os.path.join(os.path.dirname(__file__), "feeds.csv")
has_written = False
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
# If your CSV has a header row, uncomment the next line to skip it
# next(reader, None)
for row in reader:
print("reading row...")
# Ensure the row has exactly two columns to avoid errors
if len(row) == 2:
try:
res = requests.post("http://ploughshares.nixc.us/api/source", headers={
"Content-Type": "application/json"
}, json={
"title":row[0],
"link":row[1],
"type":"Google Alert",
})
if "error" not in res.json():
has_written = True
except Exception as e:
print(e)
else:
print("row has incorrect length.")
return has_written
if __name__ == "__main__":
seed_with_csv()

View File

@ -990,8 +990,73 @@ def view_sources():
return render_template('view_sources.html', sources=sources, version=VERSION)
@app.route('/api/sources', methods=['GET'])
def get_all_sources():
"""API endpoint to get all sources"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
sources = []
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM sources ORDER BY src_id DESC')
sources = cur.fetchall()
# Convert transactions to a list of dictionaries
result = list(map(lambda src: dict(src), sources))
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
finally:
conn.close()
return jsonify(result)
@app.route('/api/source', methods=['POST'])
def api_create_source():
"""API endpoint to create a source"""
data = request.json
# Validate required fields
required_fields = ['title', 'link', 'type']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Missing required field: {field}"}), 400
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO sources (
title, link, type
) VALUES (
%(title)s, %(link)s, %(type)s
) RETURNING src_id
""",
{
'title': data['title'],
'link': data['link'],
'type': data['type']
}
)
result = cur.fetchone()
if result and 'src_id' in result:
conn.commit()
return jsonify({"message": "POST success!"}), 200
except Exception as e:
logger.error(f"Error creating source via API: {e}")
return jsonify({"error": e}), 400
finally:
conn.close()
@app.route('/source/add', methods=['POST'])
def create_source():
"""API endpoint to create a source"""
data = request.form.to_dict()

View File

@ -9,7 +9,7 @@
<h2>Sources</h2>
</div>
<div class="card-body">
<form action="{{ url_for('api_create_source') }}" method="post" class="needs-validation" novalidate>
<form action="{{ url_for('create_source') }}" method="post" class="needs-validation" novalidate>
<h1>Add Source</h1>
<label>
title