Revert "Reapply "Switch config to csv db""
ci/woodpecker/push/woodpecker Pipeline was successful Details

This reverts commit a599181b5f.
This commit is contained in:
jChenvan 2025-09-03 19:31:01 -04:00
parent a599181b5f
commit 7b4e0d92f6
4 changed files with 100 additions and 148 deletions

View File

@ -81,15 +81,19 @@ def get_feeds() -> list[tuple[str, str]]:
list[tuple[str, str]]: A list of tuples, where each tuple
contains a feed's name and its URL.
"""
res = requests.get("http://ploughshares.nixc.us/api/sources")
json = res.json()
feeds = filter(lambda src: src["type"] == "Google Alert", json)
formatted_feeds = map(lambda feed: (feed["title"], feed["link"]), feeds)
res = list(formatted_feeds)
if len(res) == 0:
if seed_with_csv():
return get_feeds()
return list(formatted_feeds)
feeds = []
file_path = os.path.join(os.path.dirname(__file__), "feeds.csv")
with open(file_path, mode="r", newline="", encoding="utf-8") as f:
reader = csv.reader(f)
# If your CSV has a header row, uncomment the next line to skip it
# next(reader, None)
for row in reader:
# Ensure the row has exactly two columns to avoid errors
if len(row) == 2:
feeds.append((row[0], row[1]))
return feeds
async def fetch_site(url: str) -> str | None:
"""

View File

@ -1 +0,0 @@
db.csv

View File

@ -1,7 +1,6 @@
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from db.db_methods import add_row, delete_row, get_rows
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_from_directory, abort
from werkzeug.utils import secure_filename
from datetime import datetime
@ -973,25 +972,45 @@ def api_delete_transaction(id):
@app.route('/sources')
def view_sources():
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('view_sources.html', sources=[], version=VERSION)
try:
sources = get_rows()
with conn.cursor() as cur:
cur.execute('SELECT * FROM sources ORDER BY src_id DESC')
sources = cur.fetchall()
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
sources = []
finally:
conn.close()
return render_template('view_sources.html', sources=sources, version=VERSION)
@app.route('/api/sources', methods=['GET'])
def get_all_sources():
"""API endpoint to get all sources"""
result = []
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
sources = []
try:
result = get_rows()
with conn.cursor() as cur:
cur.execute('SELECT * FROM sources ORDER BY src_id DESC')
sources = cur.fetchall()
# Convert transactions to a list of dictionaries
result = list(map(lambda src: dict(src), sources))
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
finally:
conn.close()
return jsonify(result)
@ -1006,12 +1025,35 @@ def api_create_source():
if field not in data or not data[field]:
return jsonify({"error": f"Missing required field: {field}"}), 400
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
add_row(data["title"], data["link"], data["type"])
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO sources (
title, link, type
) VALUES (
%(title)s, %(link)s, %(type)s
) RETURNING src_id
""",
{
'title': data['title'],
'link': data['link'],
'type': data['type']
}
)
result = cur.fetchone()
if result and 'src_id' in result:
conn.commit()
return jsonify({"message": "POST success!"}), 200
except Exception as e:
logger.error(f"Error creating source via API: {e}")
return jsonify({"error": e}), 400
finally:
conn.close()
@app.route('/source/add', methods=['POST'])
def create_source():
@ -1024,24 +1066,58 @@ def create_source():
if field not in data or not data[field]:
return jsonify({"error": f"Missing required field: {field}"}), 400
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
add_row(data["title"], data["link"], data["type"])
return jsonify({"message": "POST success!"}), 200
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO sources (
title, link, type
) VALUES (
%(title)s, %(link)s, %(type)s
) RETURNING src_id
""",
{
'title': data['title'],
'link': data['link'],
'type': data['type']
}
)
result = cur.fetchone()
if result and 'src_id' in result:
conn.commit()
except Exception as e:
logger.error(f"Error creating source via API: {e}")
return jsonify({"error": e}), 400
finally:
conn.close()
return redirect(url_for("view_sources"))
@app.route('/api/source/<int:id>', methods=['DELETE'])
def api_delete_source(id):
"""API endpoint to delete a source"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
delete_row(id)
with conn.cursor() as cur:
# Check if transaction exists
cur.execute('SELECT src_id FROM sources WHERE src_id = %s', (id,))
if cur.fetchone() is None:
return jsonify({"error": "Source not found"}), 404
# Delete the transaction
cur.execute('DELETE FROM sources WHERE src_id = %s', (id,))
conn.commit()
return jsonify({"message": "Source deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting transaction via API: {e}")
return jsonify({"error": f"Error deleting source: {str(e)}"}), 500
finally:
conn.close()
if __name__ == '__main__':
logger.info(f"Starting Ploughshares v{VERSION}")

View File

@ -1,127 +0,0 @@
import csv
import os
DB_PATH = './db/db.csv'
def get_rows():
"""
Returns all rows from DB.
"""
file_exists = os.path.isfile(DB_PATH)
# If the file exists, find the highest current ID to determine the next ID
if file_exists:
try:
with open(DB_PATH, 'r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
rows = [{
"src_id": int(row[0]),
"title": row[1],
"link": row[2],
"type": row[3]
} for row in reader if row]
return rows
except (IOError, ValueError, IndexError) as e:
print(f"Error reading the CSV file: {e}")
return []
def add_row(title: str, link: str, type: str):
"""
Adds new row to DB.
Args:
title: The title for the new row.
link: The link for the new row.
type: The type for the new row.
"""
next_id = 1
file_exists = os.path.isfile(DB_PATH)
# If the file exists, find the highest current ID to determine the next ID
if file_exists:
try:
with open(DB_PATH, 'r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
ids = [int(row[0]) for row in reader if row and row[0].isdigit()]
if ids:
next_id = max(ids) + 1
except (IOError, ValueError, IndexError) as e:
print(f"Error reading the CSV file: {e}")
# Decide how to handle error: maybe return or start fresh
# For this example, we'll proceed as if creating a new file
file_exists = False
next_id = 1
# Append the new row to the file
try:
with open(DB_PATH, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# Write the new data row
writer.writerow([next_id, title, link, type])
print(f"Successfully added row with ID: {next_id}")
except IOError as e:
print(f"Error writing to the CSV file: {e}")
def delete_row(row_id: int):
"""
Deletes a row from the CSV file based on its ID.
Args:
row_id: The integer ID of the row to delete.
"""
if not os.path.isfile(DB_PATH):
print("Error: db.csv not found.")
return
rows = []
row_found = False
try:
with open(DB_PATH, 'r', newline='', encoding='utf-8') as file:
reader = csv.reader(file)
for row in reader:
# Keep rows that do not match the ID
if row and row[0].isdigit() and int(row[0]) == row_id:
row_found = True # Mark that we found the row to delete
else:
rows.append(row)
if not row_found:
print(f"Row with ID {row_id} not found.")
return
# Write the filtered rows back to the file
with open(DB_PATH, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerows(rows)
print(f"Successfully deleted row with ID: {row_id}")
except (IOError, ValueError) as e:
print(f"An error occurred: {e}")
if __name__ == '__main__':
# Example usage:
# This block will run when the script is executed directly.
# You can remove the existing db.csv to test the file creation.
if os.path.exists('./db/db.csv'):
# pass
os.remove('./db/db.csv')
print("Adding first entry...")
add_row("Google", "https://www.google.com", "Search Engine")
print("\nAdding second entry...")
add_row("GitHub", "https://www.github.com", "Code Hosting")
print("\nAdding third entry...")
add_row("Stack Overflow", "https://stackoverflow.com", "Q&A")
print("\nContents of ./db/db.csv:")
with open('./db/db.csv', 'r', newline='', encoding='utf-8') as f:
print(f.read())