ploughshares/docker/ploughshares/app.py

452 lines
17 KiB
Python

import os
import psycopg2
from psycopg2.extras import RealDictCursor
from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, send_from_directory, abort
from werkzeug.utils import secure_filename
from datetime import datetime
import locale
import logging
from flask_talisman import Talisman
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('ploughshares')
# Get version from environment variable or VERSION file
VERSION = os.environ.get('APP_VERSION')
if not VERSION:
try:
# Try to read from VERSION file in the current directory first
if os.path.exists('VERSION'):
with open('VERSION', 'r') as f:
VERSION = f.read().strip()
# Fall back to the project root VERSION file
else:
with open(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'VERSION'), 'r') as f:
VERSION = f.read().strip()
except:
VERSION = "unknown"
# Initialize the Flask app
app = Flask(__name__)
app.secret_key = 'supersecretkey'
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['VERSION'] = VERSION
# Get domain configuration from environment
APP_DOMAIN = os.environ.get('APP_DOMAIN', '')
APP_ENV = os.environ.get('APP_ENV', 'development')
# Configure security headers with Talisman
# Base CSP settings - much more restrictive now that we use local resources
csp = {
'default-src': ["'self'"],
'script-src': ["'self'"],
'style-src': ["'self'"],
'img-src': ["'self'", "data:"],
'font-src': ["'self'"],
'connect-src': "'self'",
'object-src': "'none'",
'frame-ancestors': "'none'",
'base-uri': "'self'",
'form-action': "'self'"
}
# Add domain-specific CSP settings if domain is provided
if APP_DOMAIN:
logger.info(f"Configuring CSP for domain: {APP_DOMAIN}")
# Add domain to connect-src if needed
if APP_DOMAIN not in csp['connect-src']:
if isinstance(csp['connect-src'], list):
csp['connect-src'].append(APP_DOMAIN)
else:
csp['connect-src'] = [csp['connect-src'], APP_DOMAIN]
# Update form-action to include the domain
if isinstance(csp['form-action'], list):
if APP_DOMAIN not in csp['form-action']:
csp['form-action'].append(APP_DOMAIN)
else:
csp['form-action'] = [csp['form-action'], APP_DOMAIN]
# Configure Permissions-Policy (formerly Feature-Policy)
# Deny access to all browser features that we don't need
permissions_policy = {
'accelerometer': '()',
'ambient-light-sensor': '()',
'autoplay': '()',
'battery': '()',
'camera': '()',
'display-capture': '()',
'document-domain': '()',
'encrypted-media': '()',
'execution-while-not-rendered': '()',
'execution-while-out-of-viewport': '()',
'fullscreen': '()',
'geolocation': '()',
'gyroscope': '()',
'hid': '()',
'idle-detection': '()',
'magnetometer': '()',
'microphone': '()',
'midi': '()',
'navigation-override': '()',
'payment': '()',
'picture-in-picture': '()',
'publickey-credentials-get': '()',
'screen-wake-lock': '()',
'serial': '()',
'sync-xhr': '()',
'usb': '()',
'web-share': '()',
'xr-spatial-tracking': '()'
}
# Determine if HTTPS should be forced based on environment
force_https = APP_ENV != 'development'
logger.info(f"Environment: {APP_ENV}, Force HTTPS: {force_https}")
# Additional security headers
additional_headers = {
'Cross-Origin-Embedder-Policy': 'require-corp',
'Cross-Origin-Resource-Policy': 'same-origin',
'Cross-Origin-Opener-Policy': 'same-origin'
}
# Initialize Talisman
talisman = Talisman(
app,
content_security_policy=csp,
content_security_policy_nonce_in=['script-src'],
feature_policy=permissions_policy,
force_https=force_https,
force_https_permanent=force_https,
strict_transport_security=force_https,
strict_transport_security_preload=force_https,
strict_transport_security_max_age=31536000,
strict_transport_security_include_subdomains=force_https,
referrer_policy='strict-origin-when-cross-origin',
frame_options='DENY',
session_cookie_secure=force_https,
session_cookie_http_only=True
)
# Add additional security headers that Talisman doesn't support natively
@app.after_request
def add_security_headers(response):
for header, value in additional_headers.items():
response.headers[header] = value
return response
# Set locale for currency formatting
try:
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
except locale.Error:
try:
locale.setlocale(locale.LC_ALL, '') # Use system default locale
except locale.Error:
pass # If all fails, we'll use the fallback in the filter
# Custom filter for currency formatting
@app.template_filter('currency')
def currency_filter(value):
if value is None:
return "$0.00"
try:
# Convert to float first, then format to exactly 2 decimal places
float_value = float(value)
return f"${float_value:,.2f}"
except (ValueError, TypeError):
# Fallback formatting
return "$0.00"
# --- Database Connection ---
def get_db_connection():
host = os.environ.get('POSTGRES_HOST', 'db')
port = os.environ.get('POSTGRES_PORT', '5432')
dbname = os.environ.get('POSTGRES_DB', 'ploughshares')
user = os.environ.get('POSTGRES_USER', 'ploughshares')
password = os.environ.get('POSTGRES_PASSWORD', 'ploughshares_password')
try:
conn = psycopg2.connect(
host=host,
port=port,
dbname=dbname,
user=user,
password=password,
cursor_factory=RealDictCursor
)
conn.autocommit = True
logger.info(f"Ploughshares v{VERSION} - Connected to PostgreSQL at {host}:{port}")
return conn
except psycopg2.OperationalError as e:
logger.error(f"Error connecting to PostgreSQL: {e}")
return None
# --- Routes ---
@app.route('/')
def index():
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('index.html', transactions=[], version=VERSION)
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions ORDER BY id DESC')
transactions = cur.fetchall()
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
transactions = []
finally:
conn.close()
return render_template('index.html', transactions=transactions, version=VERSION)
@app.route('/api-docs')
def api_docs():
server_name = request.host
return render_template('api_docs.html', server_name=server_name, version=VERSION)
@app.route('/transaction/<int:id>')
def view_transaction(id):
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
abort(404)
transaction = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE id = %s', (id,))
transaction = cur.fetchone()
except Exception as e:
logger.error(f"Database error: {e}")
flash(f"Database error: {e}", "error")
finally:
conn.close()
if transaction is None:
abort(404)
return render_template('view_transaction.html', transaction=transaction, version=VERSION)
@app.route('/transaction/add', methods=['GET', 'POST'])
def create_transaction():
if request.method == 'POST':
# Get form data and set defaults for missing fields
data = request.form.to_dict()
default_fields = {
'transaction_type': '', 'company_division': '', 'address_1': '', 'address_2': '',
'city': '', 'province': '', 'region': '', 'postal_code': '', 'source_date': None,
'source_description': '', 'grant_type': '', 'description': '', 'amount': 0,
'recipient': '', 'commodity_class': '', 'contract_number': '', 'comments': ''
}
# Fill in missing fields with defaults
for field, default in default_fields.items():
if field not in data:
data[field] = default
# Handle checkbox fields
data['is_primary'] = 'is_primary' in data
# Convert amount to float for database storage
if 'amount' in data and data['amount']:
try:
# Remove currency symbols and commas
clean_amount = data['amount'].replace('$', '').replace(',', '')
data['amount'] = float(clean_amount)
except ValueError:
data['amount'] = 0.0
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
return render_template('transaction_form.html', version=VERSION)
try:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO transactions (
transaction_type, company_division, address_1, address_2,
city, province, region, postal_code, is_primary, source_date, source_description,
grant_type, description, amount, recipient, commodity_class, contract_number, comments
) VALUES (
%(transaction_type)s, %(company_division)s, %(address_1)s, %(address_2)s,
%(city)s, %(province)s, %(region)s, %(postal_code)s, %(is_primary)s, %(source_date)s,
%(source_description)s, %(grant_type)s, %(description)s, %(amount)s, %(recipient)s,
%(commodity_class)s, %(contract_number)s, %(comments)s
) RETURNING id
""",
data
)
result = cur.fetchone()
if result and 'id' in result:
new_transaction_id = result['id']
conn.commit()
else:
flash("Error creating transaction: Could not get transaction ID", "error")
return render_template('transaction_form.html', version=VERSION)
except Exception as e:
logger.error(f"Error creating transaction: {e}")
flash(f"Error creating transaction: {e}", "error")
return render_template('transaction_form.html', version=VERSION)
finally:
conn.close()
return redirect(url_for('view_transaction', id=new_transaction_id))
return render_template('transaction_form.html', version=VERSION)
@app.route('/transaction/<int:id>/edit', methods=['GET', 'POST'])
def update_transaction(id):
conn = get_db_connection()
if conn is None:
flash("Database connection error", "error")
abort(404)
if request.method == 'POST':
# Get form data and set defaults for missing fields
data = request.form.to_dict()
default_fields = {
'transaction_type': '', 'company_division': '', 'address_1': '', 'address_2': '',
'city': '', 'province': '', 'region': '', 'postal_code': '', 'source_date': None,
'source_description': '', 'grant_type': '', 'description': '', 'amount': 0,
'recipient': '', 'commodity_class': '', 'contract_number': '', 'comments': ''
}
# Fill in missing fields with defaults
for field, default in default_fields.items():
if field not in data:
data[field] = default
# Handle checkbox fields
data['is_primary'] = 'is_primary' in data
data['id'] = id
# Convert amount to float for database storage
if 'amount' in data and data['amount']:
try:
# Remove currency symbols and commas
clean_amount = data['amount'].replace('$', '').replace(',', '')
data['amount'] = float(clean_amount)
except ValueError:
data['amount'] = 0.0
try:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE transactions
SET transaction_type = %(transaction_type)s,
company_division = %(company_division)s,
address_1 = %(address_1)s,
address_2 = %(address_2)s,
city = %(city)s,
province = %(province)s,
region = %(region)s,
postal_code = %(postal_code)s,
is_primary = %(is_primary)s,
source_date = %(source_date)s,
source_description = %(source_description)s,
grant_type = %(grant_type)s,
description = %(description)s,
amount = %(amount)s,
recipient = %(recipient)s,
commodity_class = %(commodity_class)s,
contract_number = %(contract_number)s,
comments = %(comments)s
WHERE id = %(id)s
""",
data
)
conn.commit()
except Exception as e:
logger.error(f"Error updating transaction: {e}")
flash(f"Error updating transaction: {e}", "error")
return render_template('transaction_form.html', transaction=data, version=VERSION)
finally:
conn.close()
return redirect(url_for('view_transaction', id=id))
transaction = None
try:
with conn.cursor() as cur:
cur.execute('SELECT * FROM transactions WHERE id = %s', (id,))
transaction = cur.fetchone()
except Exception as e:
logger.error(f"Error retrieving transaction: {e}")
flash(f"Error retrieving transaction: {e}", "error")
finally:
conn.close()
if transaction is None:
abort(404)
return render_template('transaction_form.html', transaction=transaction, version=VERSION)
def bootstrap_database():
"""
Checks if the database is empty and initializes the schema if needed.
Test data can be loaded separately using the tests/generate_test_data.py script.
"""
logger.info(f"Ploughshares v{VERSION} - Checking database...")
conn = get_db_connection()
if conn is None:
logger.error("Database connection failed. Exiting.")
exit(1)
try:
# Check if the transactions table exists
with conn.cursor() as cur:
cur.execute("SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'transactions')")
result = cur.fetchone()
# If table doesn't exist, initialize the schema
if not result or not result.get('exists', False):
logger.info(f"Ploughshares v{VERSION} - Transactions table not found. Initializing schema...")
# Read schema.sql file
try:
with open('schema.sql', 'r') as f:
schema_sql = f.read()
# Execute schema SQL
with conn.cursor() as schema_cur:
schema_cur.execute(schema_sql)
logger.info(f"Ploughshares v{VERSION} - Schema initialized successfully.")
except Exception as schema_error:
logger.error(f"Error initializing schema: {schema_error}")
else:
# Check if the table is empty
try:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM transactions")
result = cur.fetchone()
if result and 'count' in result:
count = result['count']
if count == 0:
logger.info(f"Ploughshares v{VERSION} - Database is empty. You can populate it with test data using the tests/generate_test_data.py script.")
else:
logger.error("Could not get count from database")
except Exception as count_error:
logger.error(f"Error counting transactions: {count_error}")
except Exception as e:
logger.error(f"Error checking database: {e}")
finally:
conn.close()
if __name__ == '__main__':
logger.info(f"Starting Ploughshares v{VERSION}")
bootstrap_database()
port = int(os.environ.get('FLASK_RUN_PORT', 5001))
app.run(host='0.0.0.0', port=port)