import asyncio import csv from dataclasses import dataclass import os from typing import Dict, List, Tuple import feedparser import urllib.parse import requests from clean_string import clean_string import xml.etree.ElementTree as ET from playwright.async_api import async_playwright from bs4 import BeautifulSoup from seed_with_csv import seed_with_csv @dataclass class Alert: """A simple data class to hold information about a single alert.""" title: str url: str summary: str def get_links_from_feed(rss_url: str) -> list[Alert]: """ Parses a Google Alerts RSS feed URL and extracts the data for each alert. Args: rss_url: The URL of the Google Alerts RSS feed. Returns: A list of Alert objects. Returns an empty list if the feed cannot be parsed or is empty. """ alerts: list[Alert] = [] # Parse the RSS feed from the provided URL feed = feedparser.parse(rss_url) # Check if the feed was parsed successfully and has entries if feed.bozo: print(f"Error parsing feed: {feed.bozo_exception}") return alerts # Iterate over each entry in the feed for entry in feed.entries: # The title is directly available title_soup = BeautifulSoup(entry.title, "html.parser") #type: ignore title = title_soup.get_text() # The summary often contains HTML, so we parse it to get clean text. summary_soup = BeautifulSoup(entry.summary, 'html.parser') #type: ignore summary = summary_soup.get_text() # The link is a Google redirect URL; we extract the 'url' parameter. link = entry.link try: # Parse the URL to easily access its components parsed_url = urllib.parse.urlparse(link) #type: ignore # Get the query parameters as a dictionary query_params = urllib.parse.parse_qs(parsed_url.query) # The actual destination URL is in the 'url' parameter actual_url = query_params.get('url', [None])[0] if actual_url: # Append an Alert object instead of a tuple alert_obj = Alert(title=title, url=actual_url, summary=summary) alerts.append(alert_obj) except Exception as e: print(f"Could not parse URL for entry '{title}': {e}") return alerts def get_feeds() -> list[tuple[str, str]]: """Reads feed names and URLs from a local CSV file. This function opens 'feeds.csv', which is expected to be in the same directory as this script. The CSV must have two columns: the first for the feed name and the second for the URL. Returns: list[tuple[str, str]]: A list of tuples, where each tuple contains a feed's name and its URL. """ feeds = [] file_path = os.path.join(os.path.dirname(__file__), "feeds.csv") with open(file_path, mode="r", newline="", encoding="utf-8") as f: reader = csv.reader(f) # If your CSV has a header row, uncomment the next line to skip it # next(reader, None) for row in reader: # Ensure the row has exactly two columns to avoid errors if len(row) == 2: feeds.append((row[0], row[1])) return feeds async def fetch_site(url: str) -> str | None: """ Fetches the main article text of a URL using Playwright and BeautifulSoup. Args: url: The URL of the website to fetch. Returns: A string containing the main text content of the page, or None on error. """ print(f"fetching {url}") async with async_playwright() as p: browser = await p.chromium.launch() page = await browser.new_page() try: await page.goto(url, wait_until='domcontentloaded', timeout=60000) content = await page.content() soup = BeautifulSoup(content, 'html.parser') # Strategy: Find the main content container # First, try to find a
tag. If not, look for an
tag. # You can add more fallbacks based on common website structures, # e.g., soup.find('div', id='content') main_content = soup.find('main') if not main_content: main_content = soup.find('article') # If a main content area is found, extract text from it. if main_content: # (Optional) Remove unwanted elements like scripts or ads from within the main content for element in main_content(['script', 'style', 'aside']): # type: ignore element.decompose() main_text = main_content.get_text(separator='\n', strip=True) main_text = clean_string(main_text) print(f"SUCCESSFUL FETCH: {url}") print(f"FETCH CONTENT: {main_text[:140]}...") # .get_text() with separator and strip for cleaner output return main_text else: # Fallback if no specific container is found (less reliable) print("WARNING: No main content container found. Falling back to body.") if soup.body: body_text = soup.body.get_text(separator='\n', strip=True) body_text = clean_string(body_text) print(f"SUCCESSFUL FETCH: {url}") print(f"FETCH CONTENT: {body_text[:140]}...") return body_text except Exception as e: print(f"FAILED FETCH: {url}") print(f"An error occurred: {e}") return None finally: await browser.close() async def get_all_feed_contents() -> List[Dict[str, str]]: """ Asynchronously fetches and processes content from multiple RSS feeds. This function first gets a list of RSS feeds, extracts all article URLs from them, and then asynchronously fetches the content of each URL. The content is cleaned and returned as a list of dictionaries. Returns: List[Dict[str, str]]: A list of dictionaries, where each dictionary contains the 'url' and its cleaned 'content'. """ feeds: List[Tuple[str, str]] = get_feeds() urls: List[str] = [] for keyword, feed in feeds: alerts: List[Alert] = get_links_from_feed(feed) for alert in alerts: urls.append(alert.url) print(f"{len(alerts)} links found for '{keyword}'") print(f"\n{len(urls)} total links found. Starting fetch process.") pages: List[Dict[str, str]] = [] # Create a list of tasks to run concurrently tasks = [fetch_site(url) for url in urls] results = await asyncio.gather(*tasks) for url, content in zip(urls, results): if content: pages.append({ "url": url, "content": content }) print(f"\nSuccessfully fetched {len(pages)} webpages.") return pages