ploughshares/docker/ploughshares/app.py

1052 lines
39 KiB
Python

import os
import psycopg2
from psycopg2.extras import RealDictCursor
from db.db_methods import add_row, delete_row, get_rows
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_from_directory, abort
from werkzeug.utils import secure_filename
from datetime import datetime
import locale
import logging
from flask_talisman import Talisman
import re
from migrate_approval import migrate_database
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ploughshares')
# Get version from environment variable or VERSION file
VERSION = os.environ.get('APP_VERSION')
if not VERSION:
try:
# Try to read from VERSION file in the current directory first
if os.path.exists('VERSION'):
with open('VERSION', 'r') as f:
VERSION = f.read().strip()
# Fall back to the project root VERSION file
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'VERSION'), 'r') as f:
VERSION = f.read().strip()
except:
VERSION = "unknown"
# Initialize the Flask app
app = Flask(__name__)
# Use environment variable for Flask secret key with a development-safe default
app.secret_key = os.environ.get('SECRET_KEY', 'dev-insecure-secret-key')
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['VERSION'] = VERSION
# Get domain configuration from environment
APP_DOMAIN = os.environ.get('APP_DOMAIN', '')
APP_ENV = os.environ.get('APP_ENV', 'development')
# Get CSP hashes from environment variables
CSP_JS_HASH = os.environ.get('CSP_JS_HASH', '')
CSP_CSS_HASH = os.environ.get('CSP_CSS_HASH', '')
CSP_CUSTOM_CSS_HASH = os.environ.get('CSP_CUSTOM_CSS_HASH', '')
# Log the CSP hashes
if CSP_JS_HASH:
logger.info(f"Using pre-calculated CSP hash for JS: sha256-{CSP_JS_HASH}")
if CSP_CSS_HASH:
logger.info(f"Using pre-calculated CSP hash for CSS: sha256-{CSP_CSS_HASH}")
if CSP_CUSTOM_CSS_HASH:
logger.info(f"Using pre-calculated CSP hash for custom CSS: sha256-{CSP_CUSTOM_CSS_HASH}")
# Define a more permissive CSP that works for both UI and API routes
# This is less secure but ensures compatibility with all clients
csp = {
'default-src': ["'self'", "'unsafe-inline'", "'unsafe-eval'", "data:", "blob:"],
'script-src': ["'self'", "'unsafe-inline'", "'unsafe-eval'"],
'style-src': ["'self'", "'unsafe-inline'", "cdn.jsdelivr.net"],
'img-src': ["'self'", "data:", "blob:"],
'font-src': ["'self'", "data:", "cdn.jsdelivr.net"],
'connect-src': ["'self'", "*"],
'manifest-src': "'self'",
'object-src': "'none'", # Still explicitly disallow objects
'frame-ancestors': "'none'", # Still prevent framing
'base-uri': "'self'",
'form-action': "'self'"
}
# Add domain-specific CSP settings if domain is provided
if APP_DOMAIN:
logger.info(f"Configuring CSP for domain: {APP_DOMAIN}")
# Add domain to connect-src if needed
if APP_DOMAIN not in csp['connect-src']:
if isinstance(csp['connect-src'], list):
csp['connect-src'].append(APP_DOMAIN)
# Update form-action to include the domain
if isinstance(csp['form-action'], list):
if APP_DOMAIN not in csp['form-action']:
csp['form-action'].append(APP_DOMAIN)
else:
csp['form-action'] = [csp['form-action'], APP_DOMAIN]
# Configure Permissions-Policy (formerly Feature-Policy)
# Deny access to all browser features that we don't need
permissions_policy = {
'accelerometer': '()',
'ambient-light-sensor': '()',
'autoplay': '()',
'battery': '()',
'camera': '()',
'display-capture': '()',
'document-domain': '()',
'encrypted-media': '()',
'execution-while-not-rendered': '()',
'execution-while-out-of-viewport': '()',
'fullscreen': '()',
'geolocation': '()',
'gyroscope': '()',
'hid': '()',
'idle-detection': '()',
'magnetometer': '()',
'microphone': '()',
'midi': '()',
'navigation-override': '()',
'payment': '()',
'picture-in-picture': '()',
'publickey-credentials-get': '()',
'screen-wake-lock': '()',
'serial': '()',
'sync-xhr': '()',
'usb': '()',
'web-share': '()',
'xr-spatial-tracking': '()'
}
# Determine if HTTPS should be forced based on environment
force_https = APP_ENV != 'development'
logger.info(f"Environment: {APP_ENV}, Force HTTPS: {force_https}")
# Additional security headers
additional_headers = {
'Cross-Origin-Embedder-Policy': 'require-corp',
'Cross-Origin-Resource-Policy': 'same-origin',
'Cross-Origin-Opener-Policy': 'same-origin'
}
# Initialize Talisman with the permissive CSP configuration
talisman = Talisman(
app,
content_security_policy=csp,
content_security_policy_nonce_in=['script-src'],
feature_policy=permissions_policy,
force_https=force_https,
force_https_permanent=force_https,
strict_transport_security=force_https,
strict_transport_security_preload=force_https,
strict_transport_security_max_age=31536000,
strict_transport_security_include_subdomains=force_https,
referrer_policy='strict-origin-when-cross-origin',
frame_options='DENY',
session_cookie_secure=force_https,
session_cookie_http_only=True
)
# Add CORS headers for API routes
@app.after_request
def add_api_headers(response):
if request.path.startswith('/api/'):
# Add CORS headers for API routes
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-Requested-With, Accept, Origin, Cache-Control'
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Max-Age'] = '3600'
response.headers['Cross-Origin-Resource-Policy'] = 'cross-origin'
# Remove COEP for API routes as it can block some clients
if 'Cross-Origin-Embedder-Policy' in response.headers:
del response.headers['Cross-Origin-Embedder-Policy']
else:
# For UI routes, add additional security headers
for header, value in additional_headers.items():
response.headers[header] = value
# Make sure Cross-Origin-Embedder-Policy doesn't block resources
response.headers['Cross-Origin-Embedder-Policy'] = 'unsafe-none'
return response
# API route handler for OPTIONS requests (CORS preflight)
@app.route('/api/<path:path>', methods=['OPTIONS'])
def api_options(path):
response = jsonify({'status': 'ok'})
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization, X-Requested-With, Accept, Origin, Cache-Control'
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Max-Age'] = '3600'
return response
# Create a test API endpoint to verify the CSP settings
@app.route('/api/test', methods=['GET'])
def api_test():
return jsonify({
'status': 'ok',
'message': 'API endpoint is working correctly',
'version': VERSION
})
# Set locale for currency formatting
try:
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
except locale.Error:
try:
locale.setlocale(locale.LC_ALL, '') # Use system default locale
except locale.Error:
pass # If all fails, we'll use the fallback in the filter
# Custom filter for currency formatting
@app.template_filter('currency')
def currency_filter(value):
if value is None:
return "$0.00"
try:
# Convert to float first, then format to exactly 2 decimal places
float_value = float(value)
return f"${float_value:,.2f}"
except (ValueError, TypeError):
# Fallback formatting
return "$0.00"
# --- Database Connection ---
def get_db_connection():
host = os.environ.get('POSTGRES_HOST', 'db')
port = os.environ.get('POSTGRES_PORT', '5432')
dbname = os.environ.get('POSTGRES_DB', 'ploughshares')
user = os.environ.get('POSTGRES_USER', 'ploughshares')
password = os.environ.get('POSTGRES_PASSWORD', 'ploughshares_password')
try:
conn = psycopg2.connect(
host=host,
port=port,
dbname=dbname,
user=user,
password=password,
cursor_factory=RealDictCursor
)
conn.autocommit = True
logger.info(f"Ploughshares v{VERSION} - Connected to PostgreSQL at {host}:{port}")
return conn
except psycopg2.OperationalError as e:
logger.error(f"Error connecting to PostgreSQL: {e}")
return None
# --- Routes ---
@app.route('/')
def index():
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('index.html', transactions=[], version=VERSION)
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions ORDER BY id DESC')
transactions = cur.fetchall()
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
transactions = []
finally:
conn.close()
return render_template('index.html', transactions=transactions, version=VERSION)
@app.route('/api-docs')
def api_docs():
server_name = request.host
return render_template('api_docs.html', server_name=server_name, version=VERSION)
def get_transaction_documents(transaction_id):
"""
Get all documents for a transaction
"""
conn = get_db_connection()
if conn is None:
return []
documents = []
try:
with conn.cursor() as cur:
cur.execute('''
SELECT document_id, filename, file_path, document_type, description, note, upload_date
FROM transaction_documents
WHERE transaction_id = %s
ORDER BY upload_date DESC
''', (transaction_id,))
documents = cur.fetchall()
except Exception as e:
logger.error(f"Error fetching documents: {e}")
finally:
conn.close()
return documents
@app.route('/transaction/<int:id>')
def view_transaction(id):
"""
Get Source (Either homepage or pending page)
"""
source = request.args.get("src")
"""
View a transaction
"""
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
abort(404)
transaction = None
prev_id = None
next_id = None
prev_pending_id = None
next_pending_id = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE id = %s', (id,))
transaction = cur.fetchone()
if transaction is None:
abort(404)
# Get previous and next transaction IDs
cur.execute('SELECT id FROM transactions WHERE id < %s ORDER BY id DESC LIMIT 1', (id,))
prev_result = cur.fetchone()
prev_id = prev_result['id'] if prev_result else None
cur.execute('SELECT id FROM transactions WHERE id > %s ORDER BY id ASC LIMIT 1', (id,))
next_result = cur.fetchone()
next_id = next_result['id'] if next_result else None
cur.execute('SELECT id FROM transactions WHERE id < %s AND approved = FALSE ORDER BY id DESC LIMIT 1', (id,))
prev_pending_result = cur.fetchone()
prev_pending_id = prev_pending_result['id'] if prev_pending_result else None
cur.execute('SELECT id FROM transactions WHERE id > %s AND approved = FALSE ORDER BY id ASC LIMIT 1', (id,))
next_pending_result = cur.fetchone()
next_pending_id = next_pending_result['id'] if next_pending_result else None
except Exception as e:
logger.error(f"Database error: {e}")
abort(404)
finally:
conn.close()
# Get documents for this transaction
documents = get_transaction_documents(id)
if transaction is None:
abort(404)
return render_template('view_transaction.html', transaction=transaction, documents=documents, prev_id=prev_id, next_id=next_id, prev_pending_id=prev_pending_id, next_pending_id=next_pending_id, version=VERSION, source = source)
@app.route('/document/<int:document_id>')
def view_document(document_id):
"""
View a document
"""
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
abort(404)
document = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transaction_documents WHERE document_id = %s', (document_id,))
document = cur.fetchone()
if document is None:
abort(404)
except Exception as e:
logger.error(f"Database error: {e}")
abort(404)
finally:
conn.close()
if document is None:
abort(404)
# Serve the file from the uploads directory
file_path = os.path.join(app.config['UPLOAD_FOLDER'], document['file_path'])
return send_from_directory(os.path.dirname(file_path), os.path.basename(file_path), as_attachment=False)
@app.route('/pending-approval')
def pending_approval():
"""
Display transactions pending approval
"""
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('pending_approval.html', transactions=[], version=VERSION)
transactions = []
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE approved = FALSE ORDER BY id DESC')
transactions = cur.fetchall()
# For each transaction, count the number of documents
for transaction in transactions:
cur.execute('SELECT COUNT(*) as doc_count FROM transaction_documents WHERE transaction_id = %s',
(transaction['id'],))
doc_count_result = cur.fetchone()
transaction['document_count'] = doc_count_result['doc_count'] if doc_count_result else 0
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
transactions = []
finally:
conn.close()
return render_template('pending_approval.html', transactions=transactions, version=VERSION)
@app.route('/transaction/<int:id>/approve', methods=['POST'])
def approve_transaction(id):
"""
Approve a transaction
"""
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return redirect(url_for('pending_approval'))
try:
with conn.cursor() as cur:
# Use a default approver name instead of requiring user input
approver = "System Administrator"
# Update the transaction
cur.execute(
"""
UPDATE transactions
SET approved = TRUE,
approved_at = CURRENT_TIMESTAMP,
approved_by = %s
WHERE id = %s
""",
(approver, id)
)
conn.commit()
flash(f"Transaction #{id} approved successfully", "success")
except Exception as e:
logger.error(f"Error approving transaction: {e}")
flash(f"Error approving transaction: {e}", "error")
finally:
conn.close()
return redirect(url_for('pending_approval'))
@app.route('/api/transactions/pending', methods=['GET'])
def api_get_pending_transactions():
"""API endpoint to get all pending (unapproved) transactions"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
transactions = []
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE approved = FALSE ORDER BY id DESC')
transactions = cur.fetchall()
# Convert transactions to a list of dictionaries
result = []
for transaction in transactions:
# Format the date if it exists
if transaction.get('source_date'):
transaction['source_date'] = transaction['source_date'].strftime('%Y-%m-%d')
# Format created_at if it exists
if transaction.get('created_at'):
transaction['created_at'] = transaction['created_at'].strftime('%Y-%m-%dT%H:%M:%S.%f')
result.append(dict(transaction))
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
finally:
conn.close()
return jsonify(result)
@app.route('/api/transaction/<int:id>/approve', methods=['POST'])
def api_approve_transaction(id):
"""API endpoint to approve a transaction"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
# Use the same default approver name for consistency
approver = "System Administrator"
try:
with conn.cursor() as cur:
# Check if transaction exists
cur.execute('SELECT id FROM transactions WHERE id = %s', (id,))
if cur.fetchone() is None:
return jsonify({"error": "Transaction not found"}), 404
# Update the transaction
cur.execute(
"""
UPDATE transactions
SET approved = TRUE,
approved_at = CURRENT_TIMESTAMP,
approved_by = %s
WHERE id = %s
""",
(approver, id)
)
conn.commit()
return jsonify({"message": "Transaction approved successfully"}), 200
except Exception as e:
logger.error(f"Error approving transaction via API: {e}")
return jsonify({"error": f"Error approving transaction: {str(e)}"}), 500
finally:
conn.close()
@app.route('/transaction/add', methods=['GET', 'POST'])
def create_transaction():
if request.method == 'POST':
# Get form data and set defaults for missing fields
data = request.form.to_dict()
default_fields = {
'transaction_type': '', 'company_division': '', 'address_1': '', 'address_2': '',
'city': '', 'province': '', 'region': '', 'postal_code': '', 'source_date': None,
'source_description': '', 'grant_type': '', 'description': '', 'amount': 0,
'recipient': '', 'commodity_class': '', 'contract_number': '', 'comments': ''
}
# Fill in missing fields with defaults
for field, default in default_fields.items():
if field not in data:
data[field] = default
# Handle checkbox fields
data['is_primary'] = 'is_primary' in data
# Convert amount to float for database storage
if 'amount' in data and data['amount']:
try:
# Remove currency symbols and commas
clean_amount = data['amount'].replace('$', '').replace(',', '')
data['amount'] = float(clean_amount)
except ValueError:
data['amount'] = 0.0
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('transaction_form.html', version=VERSION)
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO transactions (
transaction_type, company_division, address_1, address_2,
city, province, region, postal_code, is_primary, source_date, source_description,
grant_type, description, amount, recipient, commodity_class, contract_number, comments,
approved
) VALUES (
%(transaction_type)s, %(company_division)s, %(address_1)s, %(address_2)s,
%(city)s, %(province)s, %(region)s, %(postal_code)s, %(is_primary)s, %(source_date)s,
%(source_description)s, %(grant_type)s, %(description)s, %(amount)s, %(recipient)s,
%(commodity_class)s, %(contract_number)s, %(comments)s, FALSE
) RETURNING id
""",
data
)
result = cur.fetchone()
if result and 'id' in result:
new_transaction_id = result['id']
conn.commit()
else:
flash("Error creating transaction: Could not get transaction ID", "error")
return render_template('transaction_form.html', version=VERSION)
except Exception as e:
logger.error(f"Error creating transaction: {e}")
flash(f"Error creating transaction: {e}", "error")
return render_template('transaction_form.html', version=VERSION)
finally:
conn.close()
flash("Transaction created successfully and is pending approval", "success")
return redirect(url_for('view_transaction', id=new_transaction_id))
return render_template('transaction_form.html', version=VERSION)
@app.route('/transaction/<int:id>/edit', methods=['GET', 'POST'])
def update_transaction(id):
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
abort(404)
if request.method == 'POST':
# Get form data and set defaults for missing fields
data = request.form.to_dict()
default_fields = {
'transaction_type': '', 'company_division': '', 'address_1': '', 'address_2': '',
'city': '', 'province': '', 'region': '', 'postal_code': '', 'source_date': None,
'source_description': '', 'grant_type': '', 'description': '', 'amount': 0,
'recipient': '', 'commodity_class': '', 'contract_number': '', 'comments': ''
}
# Fill in missing fields with defaults
for field, default in default_fields.items():
if field not in data:
data[field] = default
# Handle checkbox fields
data['is_primary'] = 'is_primary' in data
data['id'] = id
# Convert amount to float for database storage
if 'amount' in data and data['amount']:
try:
# Remove currency symbols and commas
clean_amount = data['amount'].replace('$', '').replace(',', '')
data['amount'] = float(clean_amount)
except ValueError:
data['amount'] = 0.0
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE transactions
SET transaction_type = %(transaction_type)s,
company_division = %(company_division)s,
address_1 = %(address_1)s,
address_2 = %(address_2)s,
city = %(city)s,
province = %(province)s,
region = %(region)s,
postal_code = %(postal_code)s,
is_primary = %(is_primary)s,
source_date = %(source_date)s,
source_description = %(source_description)s,
grant_type = %(grant_type)s,
description = %(description)s,
amount = %(amount)s,
recipient = %(recipient)s,
commodity_class = %(commodity_class)s,
contract_number = %(contract_number)s,
comments = %(comments)s
WHERE id = %(id)s
""",
data
)
conn.commit()
except Exception as e:
logger.error(f"Error updating transaction: {e}")
flash(f"Error updating transaction: {e}", "error")
return render_template('transaction_form.html', transaction=data, version=VERSION)
finally:
conn.close()
return redirect(url_for('view_transaction', id=id))
transaction = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE id = %s', (id,))
transaction = cur.fetchone()
except Exception as e:
logger.error(f"Error retrieving transaction: {e}")
flash(f"Error retrieving transaction: {e}", "error")
finally:
conn.close()
if transaction is None:
abort(404)
return render_template('transaction_form.html', transaction=transaction, version=VERSION)
@app.route('/accessibility')
def accessibility_test():
"""
Route for testing accessibility features
"""
return render_template('accessibility.html', version=app.config['VERSION'])
def bootstrap_database():
"""
Checks if the database is empty and initializes the schema if needed.
Test data can be loaded separately using the tests/generate_test_data.py script.
"""
logger.info(f"Ploughshares v{VERSION} - Checking database...")
conn = get_db_connection()
if conn is None:
logger.error("Database connection failed. Exiting.")
exit(1)
try:
# Check if the transactions table exists
with conn.cursor() as cur:
cur.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'transactions')")
result = cur.fetchone()
# If table doesn't exist, initialize the schema
if not result or not result.get('exists', False):
logger.info(f"Ploughshares v{VERSION} - Transactions table not found. Initializing schema...")
# Read schema.sql file
try:
with open('schema.sql', 'r') as f:
schema_sql = f.read()
# Execute schema SQL
with conn.cursor() as schema_cur:
schema_cur.execute(schema_sql)
logger.info(f"Ploughshares v{VERSION} - Schema initialized successfully.")
except Exception as schema_error:
logger.error(f"Error initializing schema: {schema_error}")
else:
# Check if the table is empty
try:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM transactions")
result = cur.fetchone()
if result and 'count' in result:
count = result['count']
if count == 0:
logger.info(f"Ploughshares v{VERSION} - Database is empty. You can populate it with test data using the tests/generate_test_data.py script.")
else:
logger.error("Could not get count from database")
except Exception as count_error:
logger.error(f"Error counting transactions: {count_error}")
# Run database migrations to ensure all columns exist
logger.info(f"Ploughshares v{VERSION} - Running database migrations...")
migrate_database()
logger.info(f"Ploughshares v{VERSION} - Database migrations completed.")
except Exception as e:
logger.error(f"Error checking database: {e}")
finally:
conn.close()
@app.route('/api/transactions', methods=['GET'])
def api_get_transactions():
"""API endpoint to get all transactions"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
transactions = []
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions ORDER BY id DESC')
transactions = cur.fetchall()
# Convert transactions to a list of dictionaries
result = []
for transaction in transactions:
# Format the date if it exists
if transaction.get('source_date'):
transaction['source_date'] = transaction['source_date'].strftime('%Y-%m-%d')
# Format created_at if it exists
if transaction.get('created_at'):
transaction['created_at'] = transaction['created_at'].strftime('%Y-%m-%dT%H:%M:%S.%f')
result.append(dict(transaction))
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
finally:
conn.close()
return jsonify(result)
@app.route('/api/transaction/<int:id>', methods=['GET'])
def api_get_transaction(id):
"""API endpoint to get a specific transaction"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
transaction = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE id = %s', (id,))
transaction = cur.fetchone()
if transaction is None:
return jsonify({"error": "Transaction not found"}), 404
# Format the date if it exists
if transaction.get('source_date'):
transaction['source_date'] = transaction['source_date'].strftime('%Y-%m-%d')
# Format created_at if it exists
if transaction.get('created_at'):
transaction['created_at'] = transaction['created_at'].strftime('%Y-%m-%dT%H:%M:%S.%f')
# Get previous and next transaction IDs
cur.execute('SELECT id FROM transactions WHERE id < %s ORDER BY id DESC LIMIT 1', (id,))
prev_result = cur.fetchone()
prev_id = prev_result['id'] if prev_result else None
cur.execute('SELECT id FROM transactions WHERE id > %s ORDER BY id ASC LIMIT 1', (id,))
next_result = cur.fetchone()
next_id = next_result['id'] if next_result else None
# Add navigation info to the response
result = {
"transaction": dict(transaction),
"navigation": {
"prev_id": prev_id,
"next_id": next_id
}
}
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
finally:
conn.close()
return jsonify(result)
@app.route('/api/transaction', methods=['POST'])
def api_create_transaction():
"""API endpoint to create a transaction"""
if not request.is_json:
return jsonify({"error": "Request must be JSON"}), 400
data = request.get_json()
# Validate required fields
required_fields = ['transaction_type', 'company_division', 'recipient']
for field in required_fields:
if field not in data:
return jsonify({"error": f"Missing required field: {field}"}), 400
# Set defaults for missing fields
default_fields = {
'address_1': '', 'address_2': '', 'city': '', 'province': '',
'region': '', 'postal_code': '', 'source_date': None,
'source_description': '', 'grant_type': '', 'description': '',
'amount': 0, 'commodity_class': '', 'contract_number': '', 'comments': ''
}
for field, default in default_fields.items():
if field not in data:
data[field] = default
# Set is_primary to False if not provided
data['is_primary'] = data.get('is_primary', False)
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO transactions (
transaction_type, company_division, address_1, address_2,
city, province, region, postal_code, is_primary, source_date, source_description,
grant_type, description, amount, recipient, commodity_class, contract_number, comments,
approved
) VALUES (
%(transaction_type)s, %(company_division)s, %(address_1)s, %(address_2)s,
%(city)s, %(province)s, %(region)s, %(postal_code)s, %(is_primary)s, %(source_date)s,
%(source_description)s, %(grant_type)s, %(description)s, %(amount)s, %(recipient)s,
%(commodity_class)s, %(contract_number)s, %(comments)s, FALSE
) RETURNING id
""",
data
)
result = cur.fetchone()
if result and 'id' in result:
new_transaction_id = result['id']
conn.commit()
return jsonify({"message": "Transaction created successfully and is pending approval", "transaction_id": new_transaction_id}), 201
else:
return jsonify({"error": "Could not create transaction"}), 500
except Exception as e:
logger.error(f"Error creating transaction via API: {e}")
return jsonify({"error": f"Error creating transaction: {str(e)}"}), 500
finally:
conn.close()
@app.route('/api/transaction/<int:id>', methods=['PUT'])
def api_update_transaction(id):
"""API endpoint to update a transaction"""
if not request.is_json:
return jsonify({"error": "Request must be JSON"}), 400
data = request.get_json()
data['id'] = id
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
# Check if transaction exists
try:
with conn.cursor() as cur:
cur.execute('SELECT id FROM transactions WHERE id = %s', (id,))
if cur.fetchone() is None:
return jsonify({"error": "Transaction not found"}), 404
except Exception as e:
logger.error(f"Error checking transaction existence via API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
# Update fields that are provided
update_fields = []
update_values = {}
# Fields that can be updated
allowed_fields = [
'transaction_type', 'company_division', 'address_1', 'address_2',
'city', 'province', 'region', 'postal_code', 'is_primary', 'source_date',
'source_description', 'grant_type', 'description', 'amount', 'recipient',
'commodity_class', 'contract_number', 'comments'
]
for field in allowed_fields:
if field in data:
update_fields.append(f"{field} = %({field})s")
update_values[field] = data[field]
if not update_fields:
return jsonify({"error": "No fields to update"}), 400
update_values['id'] = id
try:
with conn.cursor() as cur:
query = f"UPDATE transactions SET {', '.join(update_fields)} WHERE id = %(id)s"
cur.execute(query, update_values)
conn.commit()
return jsonify({"message": "Transaction updated successfully"}), 200
except Exception as e:
logger.error(f"Error updating transaction via API: {e}")
return jsonify({"error": f"Error updating transaction: {str(e)}"}), 500
finally:
conn.close()
@app.route('/api/transaction/<int:id>', methods=['DELETE'])
def api_delete_transaction(id):
"""API endpoint to delete a transaction"""
conn = get_db_connection()
if conn is None:
return jsonify({"error": "Database connection error"}), 500
try:
with conn.cursor() as cur:
# Check if transaction exists
cur.execute('SELECT id FROM transactions WHERE id = %s', (id,))
if cur.fetchone() is None:
return jsonify({"error": "Transaction not found"}), 404
# Delete the transaction
cur.execute('DELETE FROM transactions WHERE id = %s', (id,))
conn.commit()
return jsonify({"message": "Transaction deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting transaction via API: {e}")
return jsonify({"error": f"Error deleting transaction: {str(e)}"}), 500
finally:
conn.close()
@app.route('/sources')
def view_sources():
try:
sources = get_rows()
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
sources = []
return render_template('view_sources.html', sources=sources, version=VERSION)
@app.route('/api/sources', methods=['GET'])
def get_all_sources():
"""API endpoint to get all sources"""
result = []
try:
result = get_rows()
except Exception as e:
logger.error(f"Database error in API: {e}")
return jsonify({"error": f"Database error: {str(e)}"}), 500
return jsonify(result)
@app.route('/api/source', methods=['POST'])
def api_create_source():
"""API endpoint to create a source"""
data = request.json
# Validate required fields
required_fields = ['title', 'link', 'type']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Missing required field: {field}"}), 400
try:
add_row(data["title"], data["link"], data["type"])
return jsonify({"message": "POST success!"}), 200
except Exception as e:
logger.error(f"Error creating source via API: {e}")
return jsonify({"error": e}), 400
@app.route('/source/add', methods=['POST'])
def create_source():
"""API endpoint to create a source"""
data = request.form.to_dict()
# Validate required fields
required_fields = ['title', 'link', 'type']
for field in required_fields:
if field not in data or not data[field]:
return jsonify({"error": f"Missing required field: {field}"}), 400
try:
add_row(data["title"], data["link"], data["type"])
return jsonify({"message": "POST success!"}), 200
except Exception as e:
logger.error(f"Error creating source via API: {e}")
return jsonify({"error": e}), 400
finally:
return redirect(url_for("view_sources"))
@app.route('/api/source/<int:id>', methods=['DELETE'])
def api_delete_source(id):
"""API endpoint to delete a source"""
try:
delete_row(id)
return jsonify({"message": "Source deleted successfully"}), 200
except Exception as e:
logger.error(f"Error deleting transaction via API: {e}")
return jsonify({"error": f"Error deleting source: {str(e)}"}), 500
if __name__ == '__main__':
logger.info(f"Starting Ploughshares v{VERSION}")
bootstrap_database()
port = int(os.environ.get('FLASK_RUN_PORT', 5001))
host = os.environ.get('FLASK_RUN_HOST', '0.0.0.0')
app.run(host=host, port=port)