From 346ea28100b58d87a4dff59962f3b177f5488243 Mon Sep 17 00:00:00 2001 From: zie619 Date: Mon, 3 Nov 2025 21:30:51 +0200 Subject: [PATCH] fix: restore workflow_db.py module required by application MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The workflow_db.py module is required by both run.py and api_server.py for database operations. It was mistakenly moved to archive during cleanup. This fixes the CI/CD test failure: ❌ Database setup error: No module named 'workflow_db' 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- workflow_db.py | 763 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 763 insertions(+) create mode 100644 workflow_db.py diff --git a/workflow_db.py b/workflow_db.py new file mode 100644 index 00000000..2bfa22b2 --- /dev/null +++ b/workflow_db.py @@ -0,0 +1,763 @@ +#!/usr/bin/env python3 +""" +Fast N8N Workflow Database +SQLite-based workflow indexer and search engine for instant performance. +""" + +import sqlite3 +import json +import os +import glob +import datetime +import hashlib +from typing import Dict, List, Any, Optional, Tuple +from pathlib import Path + +class WorkflowDatabase: + """High-performance SQLite database for workflow metadata and search.""" + + def __init__(self, db_path: str = None): + # Use environment variable if no path provided + if db_path is None: + db_path = os.environ.get('WORKFLOW_DB_PATH', 'workflows.db') + self.db_path = db_path + self.workflows_dir = "workflows" + self.init_database() + + def init_database(self): + """Initialize SQLite database with optimized schema and indexes.""" + conn = sqlite3.connect(self.db_path) + conn.execute("PRAGMA journal_mode=WAL") # Write-ahead logging for performance + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA cache_size=10000") + conn.execute("PRAGMA temp_store=MEMORY") + + # Create main workflows table + conn.execute(""" + CREATE TABLE IF NOT EXISTS workflows ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + filename TEXT UNIQUE NOT NULL, + name TEXT NOT NULL, + workflow_id TEXT, + active BOOLEAN DEFAULT 0, + description TEXT, + trigger_type TEXT, + complexity TEXT, + node_count INTEGER DEFAULT 0, + integrations TEXT, -- JSON array + tags TEXT, -- JSON array + created_at TEXT, + updated_at TEXT, + file_hash TEXT, + file_size INTEGER, + analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Create FTS5 table for full-text search + conn.execute(""" + CREATE VIRTUAL TABLE IF NOT EXISTS workflows_fts USING fts5( + filename, + name, + description, + integrations, + tags, + content=workflows, + content_rowid=id + ) + """) + + # Create indexes for fast filtering + conn.execute("CREATE INDEX IF NOT EXISTS idx_trigger_type ON workflows(trigger_type)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_complexity ON workflows(complexity)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_active ON workflows(active)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_node_count ON workflows(node_count)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_filename ON workflows(filename)") + + # Create triggers to keep FTS table in sync + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS workflows_ai AFTER INSERT ON workflows BEGIN + INSERT INTO workflows_fts(rowid, filename, name, description, integrations, tags) + VALUES (new.id, new.filename, new.name, new.description, new.integrations, new.tags); + END + """) + + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS workflows_ad AFTER DELETE ON workflows BEGIN + INSERT INTO workflows_fts(workflows_fts, rowid, filename, name, description, integrations, tags) + VALUES ('delete', old.id, old.filename, old.name, old.description, old.integrations, old.tags); + END + """) + + conn.execute(""" + CREATE TRIGGER IF NOT EXISTS workflows_au AFTER UPDATE ON workflows BEGIN + INSERT INTO workflows_fts(workflows_fts, rowid, filename, name, description, integrations, tags) + VALUES ('delete', old.id, old.filename, old.name, old.description, old.integrations, old.tags); + INSERT INTO workflows_fts(rowid, filename, name, description, integrations, tags) + VALUES (new.id, new.filename, new.name, new.description, new.integrations, new.tags); + END + """) + + conn.commit() + conn.close() + + def get_file_hash(self, file_path: str) -> str: + """Get MD5 hash of file for change detection.""" + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() + + def format_workflow_name(self, filename: str) -> str: + """Convert filename to readable workflow name.""" + # Remove .json extension + name = filename.replace('.json', '') + + # Split by underscores + parts = name.split('_') + + # Skip the first part if it's just a number + if len(parts) > 1 and parts[0].isdigit(): + parts = parts[1:] + + # Convert parts to title case and join with spaces + readable_parts = [] + for part in parts: + # Special handling for common terms + if part.lower() == 'http': + readable_parts.append('HTTP') + elif part.lower() == 'api': + readable_parts.append('API') + elif part.lower() == 'webhook': + readable_parts.append('Webhook') + elif part.lower() == 'automation': + readable_parts.append('Automation') + elif part.lower() == 'automate': + readable_parts.append('Automate') + elif part.lower() == 'scheduled': + readable_parts.append('Scheduled') + elif part.lower() == 'triggered': + readable_parts.append('Triggered') + elif part.lower() == 'manual': + readable_parts.append('Manual') + else: + # Capitalize first letter + readable_parts.append(part.capitalize()) + + return ' '.join(readable_parts) + + def analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]: + """Analyze a single workflow file and extract metadata.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + print(f"Error reading {file_path}: {str(e)}") + return None + + filename = os.path.basename(file_path) + file_size = os.path.getsize(file_path) + file_hash = self.get_file_hash(file_path) + + # Extract basic metadata + workflow = { + 'filename': filename, + 'name': self.format_workflow_name(filename), + 'workflow_id': data.get('id', ''), + 'active': data.get('active', False), + 'nodes': data.get('nodes', []), + 'connections': data.get('connections', {}), + 'tags': data.get('tags', []), + 'created_at': data.get('createdAt', ''), + 'updated_at': data.get('updatedAt', ''), + 'file_hash': file_hash, + 'file_size': file_size + } + + # Use JSON name if available and meaningful, otherwise use formatted filename + json_name = data.get('name', '').strip() + if json_name and json_name != filename.replace('.json', '') and not json_name.startswith('My workflow'): + workflow['name'] = json_name + # If no meaningful JSON name, use formatted filename (already set above) + + # Analyze nodes + node_count = len(workflow['nodes']) + workflow['node_count'] = node_count + + # Determine complexity + if node_count <= 5: + complexity = 'low' + elif node_count <= 15: + complexity = 'medium' + else: + complexity = 'high' + workflow['complexity'] = complexity + + # Find trigger type and integrations + trigger_type, integrations = self.analyze_nodes(workflow['nodes']) + workflow['trigger_type'] = trigger_type + workflow['integrations'] = list(integrations) + + # Use JSON description if available, otherwise generate one + json_description = data.get('description', '').strip() + if json_description: + workflow['description'] = json_description + else: + workflow['description'] = self.generate_description(workflow, trigger_type, integrations) + + return workflow + + def analyze_nodes(self, nodes: List[Dict]) -> Tuple[str, set]: + """Analyze nodes to determine trigger type and integrations.""" + trigger_type = 'Manual' + integrations = set() + + # Enhanced service mapping for better recognition + service_mappings = { + # Messaging & Communication + 'telegram': 'Telegram', + 'telegramTrigger': 'Telegram', + 'discord': 'Discord', + 'slack': 'Slack', + 'whatsapp': 'WhatsApp', + 'mattermost': 'Mattermost', + 'teams': 'Microsoft Teams', + 'rocketchat': 'Rocket.Chat', + + # Email + 'gmail': 'Gmail', + 'mailjet': 'Mailjet', + 'emailreadimap': 'Email (IMAP)', + 'emailsendsmt': 'Email (SMTP)', + 'outlook': 'Outlook', + + # Cloud Storage + 'googledrive': 'Google Drive', + 'googledocs': 'Google Docs', + 'googlesheets': 'Google Sheets', + 'dropbox': 'Dropbox', + 'onedrive': 'OneDrive', + 'box': 'Box', + + # Databases + 'postgres': 'PostgreSQL', + 'mysql': 'MySQL', + 'mongodb': 'MongoDB', + 'redis': 'Redis', + 'airtable': 'Airtable', + 'notion': 'Notion', + + # Project Management + 'jira': 'Jira', + 'github': 'GitHub', + 'gitlab': 'GitLab', + 'trello': 'Trello', + 'asana': 'Asana', + 'mondaycom': 'Monday.com', + + # AI/ML Services + 'openai': 'OpenAI', + 'anthropic': 'Anthropic', + 'huggingface': 'Hugging Face', + + # Social Media + 'linkedin': 'LinkedIn', + 'twitter': 'Twitter/X', + 'facebook': 'Facebook', + 'instagram': 'Instagram', + + # E-commerce + 'shopify': 'Shopify', + 'stripe': 'Stripe', + 'paypal': 'PayPal', + + # Analytics + 'googleanalytics': 'Google Analytics', + 'mixpanel': 'Mixpanel', + + # Calendar & Tasks + 'googlecalendar': 'Google Calendar', + 'googletasks': 'Google Tasks', + 'cal': 'Cal.com', + 'calendly': 'Calendly', + + # Forms & Surveys + 'typeform': 'Typeform', + 'googleforms': 'Google Forms', + 'form': 'Form Trigger', + + # Development Tools + 'webhook': 'Webhook', + 'httpRequest': 'HTTP Request', + 'graphql': 'GraphQL', + 'sse': 'Server-Sent Events', + + # Utility nodes (exclude from integrations) + 'set': None, + 'function': None, + 'code': None, + 'if': None, + 'switch': None, + 'merge': None, + 'split': None, + 'stickynote': None, + 'stickyNote': None, + 'wait': None, + 'schedule': None, + 'cron': None, + 'manual': None, + 'stopanderror': None, + 'noop': None, + 'noOp': None, + 'error': None, + 'limit': None, + 'aggregate': None, + 'summarize': None, + 'filter': None, + 'sort': None, + 'removeDuplicates': None, + 'dateTime': None, + 'extractFromFile': None, + 'convertToFile': None, + 'readBinaryFile': None, + 'readBinaryFiles': None, + 'executionData': None, + 'executeWorkflow': None, + 'executeCommand': None, + 'respondToWebhook': None, + } + + for node in nodes: + node_type = node.get('type', '') + node_name = node.get('name', '').lower() + + # Determine trigger type + if 'webhook' in node_type.lower() or 'webhook' in node_name: + trigger_type = 'Webhook' + elif 'cron' in node_type.lower() or 'schedule' in node_type.lower(): + trigger_type = 'Scheduled' + elif 'trigger' in node_type.lower() and trigger_type == 'Manual': + if 'manual' not in node_type.lower(): + trigger_type = 'Webhook' + + # Extract integrations with enhanced mapping + service_name = None + + # Handle n8n-nodes-base nodes + if node_type.startswith('n8n-nodes-base.'): + raw_service = node_type.replace('n8n-nodes-base.', '').lower() + raw_service = raw_service.replace('trigger', '') + service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None) + + # Handle @n8n/ namespaced nodes + elif node_type.startswith('@n8n/'): + raw_service = node_type.split('.')[-1].lower() if '.' in node_type else node_type.lower() + raw_service = raw_service.replace('trigger', '') + service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None) + + # Handle custom nodes + elif '-' in node_type or '@' in node_type: + # Try to extract service name from custom node names like "n8n-nodes-youtube-transcription-kasha.youtubeTranscripter" + parts = node_type.lower().split('.') + for part in parts: + if 'youtube' in part: + service_name = 'YouTube' + break + elif 'telegram' in part: + service_name = 'Telegram' + break + elif 'discord' in part: + service_name = 'Discord' + break + elif 'calcslive' in part: + service_name = 'CalcsLive' + break + + # Also check node names for service hints (but avoid false positives) + for service_key, service_value in service_mappings.items(): + if service_key in node_name and service_value: + # Avoid false positive: "cal" in calcslive-related terms should not match "Cal.com" + if service_key == 'cal' and any(term in node_name.lower() for term in ['calcslive', 'calc', 'calculation']): + continue + service_name = service_value + break + + # Add to integrations if valid service found + if service_name and service_name not in ['None', None]: + integrations.add(service_name) + + # Determine if complex based on node variety and count + if len(nodes) > 10 and len(integrations) > 3: + trigger_type = 'Complex' + + return trigger_type, integrations + + def generate_description(self, workflow: Dict, trigger_type: str, integrations: set) -> str: + """Generate a descriptive summary of the workflow.""" + name = workflow['name'] + node_count = workflow['node_count'] + + # Start with trigger description + trigger_descriptions = { + 'Webhook': "Webhook-triggered automation that", + 'Scheduled': "Scheduled automation that", + 'Complex': "Complex multi-step automation that", + } + desc = trigger_descriptions.get(trigger_type, "Manual workflow that") + + # Add functionality based on name and integrations + if integrations: + main_services = list(integrations)[:3] + if len(main_services) == 1: + desc += f" integrates with {main_services[0]}" + elif len(main_services) == 2: + desc += f" connects {main_services[0]} and {main_services[1]}" + else: + desc += f" orchestrates {', '.join(main_services[:-1])}, and {main_services[-1]}" + + # Add workflow purpose hints from name + name_lower = name.lower() + if 'create' in name_lower: + desc += " to create new records" + elif 'update' in name_lower: + desc += " to update existing data" + elif 'sync' in name_lower: + desc += " to synchronize data" + elif 'notification' in name_lower or 'alert' in name_lower: + desc += " for notifications and alerts" + elif 'backup' in name_lower: + desc += " for data backup operations" + elif 'monitor' in name_lower: + desc += " for monitoring and reporting" + else: + desc += " for data processing" + + desc += f". Uses {node_count} nodes" + if len(integrations) > 3: + desc += f" and integrates with {len(integrations)} services" + + return desc + "." + + def index_all_workflows(self, force_reindex: bool = False) -> Dict[str, int]: + """Index all workflow files. Only reprocesses changed files unless force_reindex=True.""" + if not os.path.exists(self.workflows_dir): + print(f"Warning: Workflows directory '{self.workflows_dir}' not found.") + return {'processed': 0, 'skipped': 0, 'errors': 0} + + workflows_path = Path(self.workflows_dir) + json_files = [str(p) for p in workflows_path.rglob("*.json")] + + if not json_files: + print(f"Warning: No JSON files found in '{self.workflows_dir}' directory.") + return {'processed': 0, 'skipped': 0, 'errors': 0} + + print(f"Indexing {len(json_files)} workflow files...") + + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + + stats = {'processed': 0, 'skipped': 0, 'errors': 0} + + for file_path in json_files: + filename = os.path.basename(file_path) + + try: + # Check if file needs to be reprocessed + if not force_reindex: + current_hash = self.get_file_hash(file_path) + cursor = conn.execute( + "SELECT file_hash FROM workflows WHERE filename = ?", + (filename,) + ) + row = cursor.fetchone() + if row and row['file_hash'] == current_hash: + stats['skipped'] += 1 + continue + + # Analyze workflow + workflow_data = self.analyze_workflow_file(file_path) + if not workflow_data: + stats['errors'] += 1 + continue + + # Insert or update in database + conn.execute(""" + INSERT OR REPLACE INTO workflows ( + filename, name, workflow_id, active, description, trigger_type, + complexity, node_count, integrations, tags, created_at, updated_at, + file_hash, file_size, analyzed_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + """, ( + workflow_data['filename'], + workflow_data['name'], + workflow_data['workflow_id'], + workflow_data['active'], + workflow_data['description'], + workflow_data['trigger_type'], + workflow_data['complexity'], + workflow_data['node_count'], + json.dumps(workflow_data['integrations']), + json.dumps(workflow_data['tags']), + workflow_data['created_at'], + workflow_data['updated_at'], + workflow_data['file_hash'], + workflow_data['file_size'] + )) + + stats['processed'] += 1 + + except Exception as e: + print(f"Error processing {file_path}: {str(e)}") + stats['errors'] += 1 + continue + + conn.commit() + conn.close() + + print(f"✅ Indexing complete: {stats['processed']} processed, {stats['skipped']} skipped, {stats['errors']} errors") + return stats + + def search_workflows(self, query: str = "", trigger_filter: str = "all", + complexity_filter: str = "all", active_only: bool = False, + limit: int = 50, offset: int = 0) -> Tuple[List[Dict], int]: + """Fast search with filters and pagination.""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + + # Build WHERE clause + where_conditions = [] + params = [] + + if active_only: + where_conditions.append("w.active = 1") + + if trigger_filter != "all": + where_conditions.append("w.trigger_type = ?") + params.append(trigger_filter) + + if complexity_filter != "all": + where_conditions.append("w.complexity = ?") + params.append(complexity_filter) + + # Use FTS search if query provided + if query.strip(): + # FTS search with ranking + base_query = """ + SELECT w.*, rank + FROM workflows_fts fts + JOIN workflows w ON w.id = fts.rowid + WHERE workflows_fts MATCH ? + """ + params.insert(0, query) + else: + # Regular query without FTS + base_query = """ + SELECT w.*, 0 as rank + FROM workflows w + WHERE 1=1 + """ + + if where_conditions: + base_query += " AND " + " AND ".join(where_conditions) + + # Count total results + count_query = f"SELECT COUNT(*) as total FROM ({base_query}) t" + cursor = conn.execute(count_query, params) + total = cursor.fetchone()['total'] + + # Get paginated results + if query.strip(): + base_query += " ORDER BY rank" + else: + base_query += " ORDER BY w.analyzed_at DESC" + + base_query += f" LIMIT {limit} OFFSET {offset}" + + cursor = conn.execute(base_query, params) + rows = cursor.fetchall() + + # Convert to dictionaries and parse JSON fields + results = [] + for row in rows: + workflow = dict(row) + workflow['integrations'] = json.loads(workflow['integrations'] or '[]') + + # Parse tags and convert dict tags to strings + raw_tags = json.loads(workflow['tags'] or '[]') + clean_tags = [] + for tag in raw_tags: + if isinstance(tag, dict): + # Extract name from tag dict if available + clean_tags.append(tag.get('name', str(tag.get('id', 'tag')))) + else: + clean_tags.append(str(tag)) + workflow['tags'] = clean_tags + + results.append(workflow) + + conn.close() + return results, total + + def get_stats(self) -> Dict[str, Any]: + """Get database statistics.""" + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + + # Basic counts + cursor = conn.execute("SELECT COUNT(*) as total FROM workflows") + total = cursor.fetchone()['total'] + + cursor = conn.execute("SELECT COUNT(*) as active FROM workflows WHERE active = 1") + active = cursor.fetchone()['active'] + + # Trigger type breakdown + cursor = conn.execute(""" + SELECT trigger_type, COUNT(*) as count + FROM workflows + GROUP BY trigger_type + """) + triggers = {row['trigger_type']: row['count'] for row in cursor.fetchall()} + + # Complexity breakdown + cursor = conn.execute(""" + SELECT complexity, COUNT(*) as count + FROM workflows + GROUP BY complexity + """) + complexity = {row['complexity']: row['count'] for row in cursor.fetchall()} + + # Node stats + cursor = conn.execute("SELECT SUM(node_count) as total_nodes FROM workflows") + total_nodes = cursor.fetchone()['total_nodes'] or 0 + + # Unique integrations count + cursor = conn.execute("SELECT integrations FROM workflows WHERE integrations != '[]'") + all_integrations = set() + for row in cursor.fetchall(): + integrations = json.loads(row['integrations']) + all_integrations.update(integrations) + + conn.close() + + return { + 'total': total, + 'active': active, + 'inactive': total - active, + 'triggers': triggers, + 'complexity': complexity, + 'total_nodes': total_nodes, + 'unique_integrations': len(all_integrations), + 'last_indexed': datetime.datetime.now().isoformat() + } + + def get_service_categories(self) -> Dict[str, List[str]]: + """Get service categories for enhanced filtering.""" + return { + 'messaging': ['Telegram', 'Discord', 'Slack', 'WhatsApp', 'Mattermost', 'Microsoft Teams', 'Rocket.Chat'], + 'email': ['Gmail', 'Mailjet', 'Email (IMAP)', 'Email (SMTP)', 'Outlook'], + 'cloud_storage': ['Google Drive', 'Google Docs', 'Google Sheets', 'Dropbox', 'OneDrive', 'Box'], + 'database': ['PostgreSQL', 'MySQL', 'MongoDB', 'Redis', 'Airtable', 'Notion'], + 'project_management': ['Jira', 'GitHub', 'GitLab', 'Trello', 'Asana', 'Monday.com'], + 'ai_ml': ['OpenAI', 'Anthropic', 'Hugging Face', 'CalcsLive'], + 'social_media': ['LinkedIn', 'Twitter/X', 'Facebook', 'Instagram'], + 'ecommerce': ['Shopify', 'Stripe', 'PayPal'], + 'analytics': ['Google Analytics', 'Mixpanel'], + 'calendar_tasks': ['Google Calendar', 'Google Tasks', 'Cal.com', 'Calendly'], + 'forms': ['Typeform', 'Google Forms', 'Form Trigger'], + 'development': ['Webhook', 'HTTP Request', 'GraphQL', 'Server-Sent Events', 'YouTube'] + } + + def search_by_category(self, category: str, limit: int = 50, offset: int = 0) -> Tuple[List[Dict], int]: + """Search workflows by service category.""" + categories = self.get_service_categories() + if category not in categories: + return [], 0 + + services = categories[category] + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + + # Build OR conditions for all services in category + service_conditions = [] + params = [] + for service in services: + service_conditions.append("integrations LIKE ?") + params.append(f'%"{service}"%') + + where_clause = " OR ".join(service_conditions) + + # Count total results + count_query = f"SELECT COUNT(*) as total FROM workflows WHERE {where_clause}" + cursor = conn.execute(count_query, params) + total = cursor.fetchone()['total'] + + # Get paginated results + query = f""" + SELECT * FROM workflows + WHERE {where_clause} + ORDER BY analyzed_at DESC + LIMIT {limit} OFFSET {offset} + """ + + cursor = conn.execute(query, params) + rows = cursor.fetchall() + + # Convert to dictionaries and parse JSON fields + results = [] + for row in rows: + workflow = dict(row) + workflow['integrations'] = json.loads(workflow['integrations'] or '[]') + raw_tags = json.loads(workflow['tags'] or '[]') + clean_tags = [] + for tag in raw_tags: + if isinstance(tag, dict): + clean_tags.append(tag.get('name', str(tag.get('id', 'tag')))) + else: + clean_tags.append(str(tag)) + workflow['tags'] = clean_tags + results.append(workflow) + + conn.close() + return results, total + + +def main(): + """Command-line interface for workflow database.""" + import argparse + + parser = argparse.ArgumentParser(description='N8N Workflow Database') + parser.add_argument('--index', action='store_true', help='Index all workflows') + parser.add_argument('--force', action='store_true', help='Force reindex all files') + parser.add_argument('--search', help='Search workflows') + parser.add_argument('--stats', action='store_true', help='Show database statistics') + + args = parser.parse_args() + + db = WorkflowDatabase() + + if args.index: + stats = db.index_all_workflows(force_reindex=args.force) + print(f"Indexed {stats['processed']} workflows") + + elif args.search: + results, total = db.search_workflows(args.search, limit=10) + print(f"Found {total} workflows:") + for workflow in results: + print(f" - {workflow['name']} ({workflow['trigger_type']}, {workflow['node_count']} nodes)") + + elif args.stats: + stats = db.get_stats() + print(f"Database Statistics:") + print(f" Total workflows: {stats['total']}") + print(f" Active: {stats['active']}") + print(f" Total nodes: {stats['total_nodes']}") + print(f" Unique integrations: {stats['unique_integrations']}") + print(f" Trigger types: {stats['triggers']}") + + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file