191 lines
7.0 KiB
Python
191 lines
7.0 KiB
Python
import asyncio
|
|
import csv
|
|
from dataclasses import dataclass
|
|
import os
|
|
from typing import Dict, List, Tuple
|
|
import feedparser
|
|
import urllib.parse
|
|
|
|
import requests
|
|
from clean_string import clean_string
|
|
import xml.etree.ElementTree as ET
|
|
from playwright.async_api import async_playwright
|
|
from bs4 import BeautifulSoup
|
|
from seed_with_csv import seed_with_csv
|
|
|
|
@dataclass
|
|
class Alert:
|
|
"""A simple data class to hold information about a single alert."""
|
|
title: str
|
|
url: str
|
|
summary: str
|
|
|
|
def get_links_from_feed(rss_url: str) -> list[Alert]:
|
|
"""
|
|
Parses a Google Alerts RSS feed URL and extracts the data for each alert.
|
|
|
|
Args:
|
|
rss_url: The URL of the Google Alerts RSS feed.
|
|
|
|
Returns:
|
|
A list of Alert objects. Returns an empty list if the feed
|
|
cannot be parsed or is empty.
|
|
"""
|
|
alerts: list[Alert] = []
|
|
# Parse the RSS feed from the provided URL
|
|
feed = feedparser.parse(rss_url)
|
|
|
|
# Check if the feed was parsed successfully and has entries
|
|
if feed.bozo:
|
|
print(f"Error parsing feed: {feed.bozo_exception}")
|
|
return alerts
|
|
|
|
# Iterate over each entry in the feed
|
|
for entry in feed.entries:
|
|
# The title is directly available
|
|
title_soup = BeautifulSoup(entry.title, "html.parser") #type: ignore
|
|
title = title_soup.get_text()
|
|
|
|
# The summary often contains HTML, so we parse it to get clean text.
|
|
summary_soup = BeautifulSoup(entry.summary, 'html.parser') #type: ignore
|
|
summary = summary_soup.get_text()
|
|
|
|
# The link is a Google redirect URL; we extract the 'url' parameter.
|
|
link = entry.link
|
|
|
|
try:
|
|
# Parse the URL to easily access its components
|
|
parsed_url = urllib.parse.urlparse(link) #type: ignore
|
|
# Get the query parameters as a dictionary
|
|
query_params = urllib.parse.parse_qs(parsed_url.query)
|
|
# The actual destination URL is in the 'url' parameter
|
|
actual_url = query_params.get('url', [None])[0]
|
|
|
|
if actual_url:
|
|
# Append an Alert object instead of a tuple
|
|
alert_obj = Alert(title=title, url=actual_url, summary=summary)
|
|
alerts.append(alert_obj)
|
|
except Exception as e:
|
|
print(f"Could not parse URL for entry '{title}': {e}")
|
|
|
|
return alerts
|
|
|
|
def get_feeds() -> list[tuple[str, str]]:
|
|
"""Reads feed names and URLs from a local CSV file.
|
|
|
|
This function opens 'feeds.csv', which is expected to be in the
|
|
same directory as this script. The CSV must have two columns:
|
|
the first for the feed name and the second for the URL.
|
|
|
|
Returns:
|
|
list[tuple[str, str]]: A list of tuples, where each tuple
|
|
contains a feed's name and its URL.
|
|
"""
|
|
res = requests.get("http://ploughshares.nixc.us/api/sources")
|
|
json = res.json()
|
|
feeds = filter(lambda src: src["type"] == "Google Alert", json)
|
|
formatted_feeds = map(lambda feed: (feed["title"], feed["link"]), feeds)
|
|
res = list(formatted_feeds)
|
|
if len(res) == 0:
|
|
if seed_with_csv():
|
|
return get_feeds()
|
|
return list(formatted_feeds)
|
|
|
|
async def fetch_site(url: str) -> str | None:
|
|
"""
|
|
Fetches the main article text of a URL using Playwright and BeautifulSoup.
|
|
|
|
Args:
|
|
url: The URL of the website to fetch.
|
|
|
|
Returns:
|
|
A string containing the main text content of the page, or None on error.
|
|
"""
|
|
print(f"fetching {url}")
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch()
|
|
page = await browser.new_page()
|
|
|
|
try:
|
|
await page.goto(url, wait_until='domcontentloaded', timeout=60000)
|
|
|
|
content = await page.content()
|
|
soup = BeautifulSoup(content, 'html.parser')
|
|
|
|
# Strategy: Find the main content container
|
|
# First, try to find a <main> tag. If not, look for an <article> tag.
|
|
# You can add more fallbacks based on common website structures,
|
|
# e.g., soup.find('div', id='content')
|
|
main_content = soup.find('main')
|
|
if not main_content:
|
|
main_content = soup.find('article')
|
|
|
|
# If a main content area is found, extract text from it.
|
|
if main_content:
|
|
|
|
# (Optional) Remove unwanted elements like scripts or ads from within the main content
|
|
for element in main_content(['script', 'style', 'aside']): # type: ignore
|
|
element.decompose()
|
|
|
|
main_text = main_content.get_text(separator='\n', strip=True)
|
|
main_text = clean_string(main_text)
|
|
|
|
print(f"SUCCESSFUL FETCH: {url}")
|
|
print(f"FETCH CONTENT: {main_text[:140]}...")
|
|
# .get_text() with separator and strip for cleaner output
|
|
return main_text
|
|
else:
|
|
# Fallback if no specific container is found (less reliable)
|
|
print("WARNING: No main content container found. Falling back to body.")
|
|
if soup.body:
|
|
body_text = soup.body.get_text(separator='\n', strip=True)
|
|
body_text = clean_string(body_text)
|
|
print(f"SUCCESSFUL FETCH: {url}")
|
|
print(f"FETCH CONTENT: {body_text[:140]}...")
|
|
return body_text
|
|
|
|
except Exception as e:
|
|
print(f"FAILED FETCH: {url}")
|
|
print(f"An error occurred: {e}")
|
|
return None
|
|
|
|
finally:
|
|
await browser.close()
|
|
|
|
async def get_all_feed_contents() -> List[Dict[str, str]]:
|
|
"""
|
|
Asynchronously fetches and processes content from multiple RSS feeds.
|
|
|
|
This function first gets a list of RSS feeds, extracts all article URLs from them,
|
|
and then asynchronously fetches the content of each URL. The content is cleaned
|
|
and returned as a list of dictionaries.
|
|
|
|
Returns:
|
|
List[Dict[str, str]]: A list of dictionaries, where each dictionary
|
|
contains the 'url' and its cleaned 'content'.
|
|
"""
|
|
feeds: List[Tuple[str, str]] = get_feeds()
|
|
urls: List[str] = []
|
|
|
|
for keyword, feed in feeds:
|
|
alerts: List[Alert] = get_links_from_feed(feed)
|
|
for alert in alerts:
|
|
urls.append(alert.url)
|
|
print(f"{len(alerts)} links found for '{keyword}'")
|
|
|
|
print(f"\n{len(urls)} total links found. Starting fetch process.")
|
|
pages: List[Dict[str, str]] = []
|
|
|
|
# Create a list of tasks to run concurrently
|
|
tasks = [fetch_site(url) for url in urls]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
for url, content in zip(urls, results):
|
|
if content:
|
|
pages.append({
|
|
"url": url,
|
|
"content": content
|
|
})
|
|
|
|
print(f"\nSuccessfully fetched {len(pages)} webpages.")
|
|
return pages |