296 lines
13 KiB
Python
296 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
|
|
print("🚀 APP.PY STARTING - IMMEDIATE FEEDBACK", flush=True)
|
|
|
|
import sys
|
|
import os
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
|
|
print("📦 BASIC IMPORTS COMPLETE", flush=True)
|
|
|
|
def generate_toc(markdown_content):
|
|
"""Generate a Table of Contents from markdown headers"""
|
|
print(" 📋 Generating Table of Contents...", flush=True)
|
|
lines = markdown_content.split('\n')
|
|
toc_lines = []
|
|
toc_lines.append("## Table of Contents")
|
|
toc_lines.append("")
|
|
|
|
header_count = 0
|
|
for line in lines:
|
|
# Match headers (##, ###, etc.)
|
|
header_match = re.match(r'^(#{2,})\s+(.+)$', line)
|
|
if header_match:
|
|
header_count += 1
|
|
level = len(header_match.group(1)) - 2 # Convert ## to 0, ### to 1, etc.
|
|
title = header_match.group(2)
|
|
|
|
# Create anchor link
|
|
anchor = re.sub(r'[^a-zA-Z0-9\s-]', '', title.lower())
|
|
anchor = re.sub(r'\s+', '-', anchor.strip())
|
|
|
|
# Add indentation based on header level
|
|
indent = " " * level
|
|
toc_lines.append(f"{indent}- [{title}](#{anchor})")
|
|
|
|
toc_lines.append("")
|
|
toc_lines.append("---")
|
|
toc_lines.append("")
|
|
|
|
print(f" ✅ Generated TOC with {header_count} headers", flush=True)
|
|
return '\n'.join(toc_lines)
|
|
|
|
def main():
|
|
"""Simple pitch deck analyzer with comprehensive debugging"""
|
|
print("🚀 PITCH DECK ANALYZER MAIN FUNCTION STARTING", flush=True)
|
|
print("=" * 50, flush=True)
|
|
|
|
if len(sys.argv) < 2:
|
|
print("❌ Usage: python app.py <pdf_file>", flush=True)
|
|
return
|
|
|
|
pdf_path = sys.argv[1]
|
|
if not os.path.exists(pdf_path):
|
|
print(f"❌ Error: File '{pdf_path}' not found", flush=True)
|
|
return
|
|
|
|
print(f"📁 Processing file: {pdf_path}", flush=True)
|
|
print(f"📁 File exists: {os.path.exists(pdf_path)}", flush=True)
|
|
print(f"📁 File size: {os.path.getsize(pdf_path)} bytes", flush=True)
|
|
|
|
# Import what we need directly (avoid __init__.py issues)
|
|
print("\n📦 IMPORTING MODULES", flush=True)
|
|
print("-" * 30, flush=True)
|
|
|
|
sys.path.append('modules')
|
|
|
|
print(" 🔄 Importing client module...", flush=True)
|
|
from client import get_openrouter_client
|
|
print(" ✅ client module imported successfully", flush=True)
|
|
|
|
print(" 🔄 Importing pdf_processor module...", flush=True)
|
|
from pdf_processor import extract_slides_from_pdf
|
|
print(" ✅ pdf_processor module imported successfully", flush=True)
|
|
|
|
print(" 🔄 Importing analysis module...", flush=True)
|
|
from analysis import analyze_slides_batch
|
|
print(" ✅ analysis module imported successfully", flush=True)
|
|
|
|
print(" 🔄 Importing markdown_utils module...", flush=True)
|
|
from markdown_utils import send_to_api_and_get_haste_link
|
|
print(" ✅ markdown_utils module imported successfully", flush=True)
|
|
|
|
print("✅ ALL MODULES IMPORTED SUCCESSFULLY", flush=True)
|
|
|
|
# Extract slides
|
|
print("\n📄 EXTRACTING SLIDES", flush=True)
|
|
print("-" * 30, flush=True)
|
|
print(" 🔄 Calling extract_slides_from_pdf...", flush=True)
|
|
start_time = time.time()
|
|
|
|
slides = extract_slides_from_pdf(pdf_path, "processed", Path(pdf_path).stem)
|
|
extraction_time = time.time() - start_time
|
|
print(f" ✅ extract_slides_from_pdf completed in {extraction_time:.2f}s", flush=True)
|
|
print(f" 📊 Extracted {len(slides)} slides", flush=True)
|
|
|
|
# LIMIT TO FIRST 3 SLIDES FOR TESTING
|
|
print(f" 🔄 Limiting to first 3 slides for testing...", flush=True)
|
|
slides = slides[:3]
|
|
print(f" 📊 Processing {len(slides)} slides", flush=True)
|
|
|
|
# Analyze slides
|
|
print("\n🧠 ANALYZING SLIDES", flush=True)
|
|
print("-" * 30, flush=True)
|
|
print(" 🔄 Initializing API client...", flush=True)
|
|
|
|
client = get_openrouter_client()
|
|
print(" ✅ API client initialized successfully", flush=True)
|
|
|
|
print(" 🔄 Calling analyze_slides_batch...", flush=True)
|
|
analysis_start_time = time.time()
|
|
|
|
analysis_results = analyze_slides_batch(client, slides)
|
|
analysis_time = time.time() - analysis_start_time
|
|
print(f" ✅ analyze_slides_batch completed in {analysis_time:.2f}s", flush=True)
|
|
print(f" 📊 Analysis results: {len(analysis_results)} slides analyzed", flush=True)
|
|
|
|
# Create report
|
|
print("\n📝 CREATING REPORT", flush=True)
|
|
print("-" * 30, flush=True)
|
|
print(" 🔄 Building markdown content...", flush=True)
|
|
|
|
markdown_content = f"# Pitch Deck Analysis: {Path(pdf_path).stem}\n\n"
|
|
|
|
# Add analysis metadata
|
|
markdown_content += "This analysis was generated using multiple AI agents, each specialized in different aspects of slide evaluation.\n\n"
|
|
markdown_content += f"**Source File:** `{Path(pdf_path).name}` (PDF)\n"
|
|
markdown_content += f"**Analysis Generated:** {len(slides)} slides processed (limited for testing)\n"
|
|
markdown_content += "**Processing Method:** Individual processing with specialized AI agents\n"
|
|
markdown_content += "**Text Extraction:** Docling-powered text transcription\n\n"
|
|
|
|
# Add executive summary at the top (model-assisted with heuristic fallback)
|
|
print(" 🔄 Generating executive summary...", flush=True)
|
|
def _build_heuristic_summary(analysis_results_local):
|
|
categories = [
|
|
('problem_analyzer', 'Problem Analysis'),
|
|
('solution_evaluator', 'Solution Evaluation'),
|
|
('market_opportunity_assessor', 'Market Opportunity'),
|
|
('traction_evaluator', 'Traction'),
|
|
('funding_analyzer', 'Funding & Ask')
|
|
]
|
|
lines = []
|
|
lines.append("## Executive Summary\n")
|
|
# Overall one-liner assembled from first sentences
|
|
overall_bits = []
|
|
for slide_num in sorted(analysis_results_local.keys()):
|
|
slide_agents = analysis_results_local.get(slide_num, {})
|
|
pa = slide_agents.get('problem_analyzer', {}).get('analysis', '')
|
|
if pa:
|
|
first_sentence = pa.split('. ')[0].strip()
|
|
if first_sentence:
|
|
overall_bits.append(first_sentence)
|
|
if len(overall_bits) >= 3:
|
|
break
|
|
if overall_bits:
|
|
lines.append("" + " ".join(overall_bits) + "\n")
|
|
# Coverage table-like bullets
|
|
lines.append("### Coverage of Points of Interest\n")
|
|
for key, title in categories:
|
|
coverage_note = "Covered"
|
|
for slide_agents in analysis_results_local.values():
|
|
if key in slide_agents and slide_agents[key].get('analysis'):
|
|
coverage_note = "Covered"
|
|
break
|
|
lines.append(f"- {title}: {coverage_note}")
|
|
lines.append("\n### Slide Snapshots\n")
|
|
for slide_num in sorted(analysis_results_local.keys()):
|
|
slide_agents = analysis_results_local.get(slide_num, {})
|
|
pa = slide_agents.get('problem_analyzer', {}).get('analysis', '')
|
|
one_liner = (pa.split('\n')[0].split('. ')[0]).strip() if pa else "No clear problem statement identified."
|
|
lines.append(f"- Slide {slide_num}: {one_liner}")
|
|
lines.append("\n")
|
|
return "\n".join(lines)
|
|
def _build_model_summary(client_local, analysis_results_local):
|
|
try:
|
|
# Aggregate content for model
|
|
blocks = []
|
|
for slide_num in sorted(analysis_results_local.keys()):
|
|
slide_agents = analysis_results_local[slide_num]
|
|
parts = []
|
|
for k, v in slide_agents.items():
|
|
agent_name = v.get('agent', k)
|
|
analysis_text = v.get('analysis', '')
|
|
parts.append(f"{agent_name}: {analysis_text}")
|
|
blocks.append(f"Slide {slide_num}:\n" + "\n".join(parts))
|
|
aggregate_text = "\n\n".join(blocks)
|
|
messages = [
|
|
{"role": "system", "content": "You are a senior pitch deck analyst. Create a concise executive summary."},
|
|
{"role": "user", "content": [
|
|
{"type": "text", "text": "Summarize this deck. Provide: 1) 2-3 sentence overall summary of what the deck accomplishes; 2) A bullet list rating coverage of these points of interest: Problem, Solution, Market Opportunity, Traction, Funding & Ask (ratings: Strong/Covered/Weak/Not covered) with one short note each; 3) A one-line snapshot per slide. Return Markdown only."},
|
|
{"type": "text", "text": aggregate_text}
|
|
]}
|
|
]
|
|
response = client_local.chat.completions.create(
|
|
model="gpt-4o-mini",
|
|
messages=messages,
|
|
max_tokens=700
|
|
)
|
|
content = response.choices[0].message.content.strip()
|
|
if content:
|
|
return content + "\n\n"
|
|
except Exception as _e:
|
|
print(f" ⚠️ Model summary generation failed: {_e}", flush=True)
|
|
return None
|
|
summary_md = _build_model_summary(client, analysis_results) or _build_heuristic_summary(analysis_results)
|
|
markdown_content += summary_md
|
|
|
|
print(f" 📊 Building markdown for {len(slides)} slides...", flush=True)
|
|
|
|
for i, slide_data in enumerate(slides):
|
|
slide_num = i + 1
|
|
print(f" 🔄 Processing slide {slide_num}/{len(slides)}...", flush=True)
|
|
|
|
analysis = analysis_results.get(slide_num, {})
|
|
|
|
markdown_content += f"# Slide {slide_num}\n\n"
|
|
markdown_content += f"\n\n"
|
|
|
|
if analysis:
|
|
markdown_content += "## Agentic Analysis\n\n"
|
|
|
|
# Format each agent's analysis
|
|
agent_count = 0
|
|
for agent_key, agent_data in analysis.items():
|
|
if isinstance(agent_data, dict) and 'agent' in agent_data and 'analysis' in agent_data:
|
|
agent_count += 1
|
|
agent_name = agent_data['agent']
|
|
agent_analysis = agent_data['analysis']
|
|
|
|
markdown_content += f"### {agent_name}\n\n"
|
|
markdown_content += f"{agent_analysis}\n\n"
|
|
|
|
print(f" ✅ Added {agent_count} agent analyses for slide {slide_num}", flush=True)
|
|
else:
|
|
markdown_content += "## Agentic Analysis\n\n"
|
|
markdown_content += "No analysis available\n\n"
|
|
print(f" ⚠️ No analysis available for slide {slide_num}", flush=True)
|
|
|
|
markdown_content += "---\n\n"
|
|
|
|
print(" ✅ Markdown content built successfully", flush=True)
|
|
|
|
# Generate Table of Contents
|
|
print(" 🔄 Generating Table of Contents...", flush=True)
|
|
toc = generate_toc(markdown_content)
|
|
|
|
# Insert TOC after the main title
|
|
print(" 🔄 Inserting TOC into document...", flush=True)
|
|
lines = markdown_content.split('\n')
|
|
final_content = []
|
|
final_content.append(lines[0]) # Main title
|
|
final_content.append("") # Empty line
|
|
final_content.append(toc) # TOC
|
|
final_content.extend(lines[2:]) # Rest of content
|
|
|
|
final_markdown = '\n'.join(final_content)
|
|
print(f" ✅ Final markdown created: {len(final_markdown)} characters", flush=True)
|
|
|
|
# Save report
|
|
print("\n💾 SAVING REPORT", flush=True)
|
|
print("-" * 30, flush=True)
|
|
output_file = f"processed/{Path(pdf_path).stem}_analysis.md"
|
|
print(f" 🔄 Saving to: {output_file}", flush=True)
|
|
|
|
os.makedirs("processed", exist_ok=True)
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
f.write(final_markdown)
|
|
print(f" ✅ Report saved successfully ({len(final_markdown)} characters)", flush=True)
|
|
|
|
# Always upload the report
|
|
print("\n🌐 UPLOADING REPORT", flush=True)
|
|
print("-" * 30, flush=True)
|
|
print(" 🔄 Calling send_to_api_and_get_haste_link...", flush=True)
|
|
|
|
upload_result = send_to_api_and_get_haste_link(final_markdown, Path(pdf_path).stem)
|
|
if isinstance(upload_result, tuple) or isinstance(upload_result, list):
|
|
raw_url, html_url = upload_result if len(upload_result) >= 2 else (upload_result[0], None)
|
|
if raw_url:
|
|
print(f" ✅ Raw markdown URL: {raw_url}", flush=True)
|
|
if html_url:
|
|
print(f" ✅ HTML URL: {html_url}", flush=True)
|
|
if not raw_url and not html_url:
|
|
print(" ❌ Upload failed - no URLs returned", flush=True)
|
|
elif upload_result:
|
|
print(f" ✅ Report uploaded successfully: {upload_result}", flush=True)
|
|
else:
|
|
print(" ❌ Upload failed - no URL returned", flush=True)
|
|
|
|
print("\n🎉 PROCESSING COMPLETE!", flush=True)
|
|
print("=" * 50, flush=True)
|
|
|
|
if __name__ == "__main__":
|
|
print("🎯 __main__ BLOCK ENTERED", flush=True)
|
|
main()
|