mirror of
https://github.com/Zie619/n8n-workflows.git
synced 2025-11-24 19:12:59 +08:00
fix: restore workflow_db.py module required by application
The workflow_db.py module is required by both run.py and api_server.py for database operations. It was mistakenly moved to archive during cleanup. This fixes the CI/CD test failure: ❌ Database setup error: No module named 'workflow_db' 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
763
workflow_db.py
Normal file
763
workflow_db.py
Normal file
@@ -0,0 +1,763 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fast N8N Workflow Database
|
||||
SQLite-based workflow indexer and search engine for instant performance.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
import os
|
||||
import glob
|
||||
import datetime
|
||||
import hashlib
|
||||
from typing import Dict, List, Any, Optional, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
class WorkflowDatabase:
|
||||
"""High-performance SQLite database for workflow metadata and search."""
|
||||
|
||||
def __init__(self, db_path: str = None):
|
||||
# Use environment variable if no path provided
|
||||
if db_path is None:
|
||||
db_path = os.environ.get('WORKFLOW_DB_PATH', 'workflows.db')
|
||||
self.db_path = db_path
|
||||
self.workflows_dir = "workflows"
|
||||
self.init_database()
|
||||
|
||||
def init_database(self):
|
||||
"""Initialize SQLite database with optimized schema and indexes."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.execute("PRAGMA journal_mode=WAL") # Write-ahead logging for performance
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
conn.execute("PRAGMA cache_size=10000")
|
||||
conn.execute("PRAGMA temp_store=MEMORY")
|
||||
|
||||
# Create main workflows table
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS workflows (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
filename TEXT UNIQUE NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
workflow_id TEXT,
|
||||
active BOOLEAN DEFAULT 0,
|
||||
description TEXT,
|
||||
trigger_type TEXT,
|
||||
complexity TEXT,
|
||||
node_count INTEGER DEFAULT 0,
|
||||
integrations TEXT, -- JSON array
|
||||
tags TEXT, -- JSON array
|
||||
created_at TEXT,
|
||||
updated_at TEXT,
|
||||
file_hash TEXT,
|
||||
file_size INTEGER,
|
||||
analyzed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
# Create FTS5 table for full-text search
|
||||
conn.execute("""
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS workflows_fts USING fts5(
|
||||
filename,
|
||||
name,
|
||||
description,
|
||||
integrations,
|
||||
tags,
|
||||
content=workflows,
|
||||
content_rowid=id
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for fast filtering
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_trigger_type ON workflows(trigger_type)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_complexity ON workflows(complexity)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_active ON workflows(active)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_node_count ON workflows(node_count)")
|
||||
conn.execute("CREATE INDEX IF NOT EXISTS idx_filename ON workflows(filename)")
|
||||
|
||||
# Create triggers to keep FTS table in sync
|
||||
conn.execute("""
|
||||
CREATE TRIGGER IF NOT EXISTS workflows_ai AFTER INSERT ON workflows BEGIN
|
||||
INSERT INTO workflows_fts(rowid, filename, name, description, integrations, tags)
|
||||
VALUES (new.id, new.filename, new.name, new.description, new.integrations, new.tags);
|
||||
END
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TRIGGER IF NOT EXISTS workflows_ad AFTER DELETE ON workflows BEGIN
|
||||
INSERT INTO workflows_fts(workflows_fts, rowid, filename, name, description, integrations, tags)
|
||||
VALUES ('delete', old.id, old.filename, old.name, old.description, old.integrations, old.tags);
|
||||
END
|
||||
""")
|
||||
|
||||
conn.execute("""
|
||||
CREATE TRIGGER IF NOT EXISTS workflows_au AFTER UPDATE ON workflows BEGIN
|
||||
INSERT INTO workflows_fts(workflows_fts, rowid, filename, name, description, integrations, tags)
|
||||
VALUES ('delete', old.id, old.filename, old.name, old.description, old.integrations, old.tags);
|
||||
INSERT INTO workflows_fts(rowid, filename, name, description, integrations, tags)
|
||||
VALUES (new.id, new.filename, new.name, new.description, new.integrations, new.tags);
|
||||
END
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def get_file_hash(self, file_path: str) -> str:
|
||||
"""Get MD5 hash of file for change detection."""
|
||||
hash_md5 = hashlib.md5()
|
||||
with open(file_path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(4096), b""):
|
||||
hash_md5.update(chunk)
|
||||
return hash_md5.hexdigest()
|
||||
|
||||
def format_workflow_name(self, filename: str) -> str:
|
||||
"""Convert filename to readable workflow name."""
|
||||
# Remove .json extension
|
||||
name = filename.replace('.json', '')
|
||||
|
||||
# Split by underscores
|
||||
parts = name.split('_')
|
||||
|
||||
# Skip the first part if it's just a number
|
||||
if len(parts) > 1 and parts[0].isdigit():
|
||||
parts = parts[1:]
|
||||
|
||||
# Convert parts to title case and join with spaces
|
||||
readable_parts = []
|
||||
for part in parts:
|
||||
# Special handling for common terms
|
||||
if part.lower() == 'http':
|
||||
readable_parts.append('HTTP')
|
||||
elif part.lower() == 'api':
|
||||
readable_parts.append('API')
|
||||
elif part.lower() == 'webhook':
|
||||
readable_parts.append('Webhook')
|
||||
elif part.lower() == 'automation':
|
||||
readable_parts.append('Automation')
|
||||
elif part.lower() == 'automate':
|
||||
readable_parts.append('Automate')
|
||||
elif part.lower() == 'scheduled':
|
||||
readable_parts.append('Scheduled')
|
||||
elif part.lower() == 'triggered':
|
||||
readable_parts.append('Triggered')
|
||||
elif part.lower() == 'manual':
|
||||
readable_parts.append('Manual')
|
||||
else:
|
||||
# Capitalize first letter
|
||||
readable_parts.append(part.capitalize())
|
||||
|
||||
return ' '.join(readable_parts)
|
||||
|
||||
def analyze_workflow_file(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Analyze a single workflow file and extract metadata."""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
||||
print(f"Error reading {file_path}: {str(e)}")
|
||||
return None
|
||||
|
||||
filename = os.path.basename(file_path)
|
||||
file_size = os.path.getsize(file_path)
|
||||
file_hash = self.get_file_hash(file_path)
|
||||
|
||||
# Extract basic metadata
|
||||
workflow = {
|
||||
'filename': filename,
|
||||
'name': self.format_workflow_name(filename),
|
||||
'workflow_id': data.get('id', ''),
|
||||
'active': data.get('active', False),
|
||||
'nodes': data.get('nodes', []),
|
||||
'connections': data.get('connections', {}),
|
||||
'tags': data.get('tags', []),
|
||||
'created_at': data.get('createdAt', ''),
|
||||
'updated_at': data.get('updatedAt', ''),
|
||||
'file_hash': file_hash,
|
||||
'file_size': file_size
|
||||
}
|
||||
|
||||
# Use JSON name if available and meaningful, otherwise use formatted filename
|
||||
json_name = data.get('name', '').strip()
|
||||
if json_name and json_name != filename.replace('.json', '') and not json_name.startswith('My workflow'):
|
||||
workflow['name'] = json_name
|
||||
# If no meaningful JSON name, use formatted filename (already set above)
|
||||
|
||||
# Analyze nodes
|
||||
node_count = len(workflow['nodes'])
|
||||
workflow['node_count'] = node_count
|
||||
|
||||
# Determine complexity
|
||||
if node_count <= 5:
|
||||
complexity = 'low'
|
||||
elif node_count <= 15:
|
||||
complexity = 'medium'
|
||||
else:
|
||||
complexity = 'high'
|
||||
workflow['complexity'] = complexity
|
||||
|
||||
# Find trigger type and integrations
|
||||
trigger_type, integrations = self.analyze_nodes(workflow['nodes'])
|
||||
workflow['trigger_type'] = trigger_type
|
||||
workflow['integrations'] = list(integrations)
|
||||
|
||||
# Use JSON description if available, otherwise generate one
|
||||
json_description = data.get('description', '').strip()
|
||||
if json_description:
|
||||
workflow['description'] = json_description
|
||||
else:
|
||||
workflow['description'] = self.generate_description(workflow, trigger_type, integrations)
|
||||
|
||||
return workflow
|
||||
|
||||
def analyze_nodes(self, nodes: List[Dict]) -> Tuple[str, set]:
|
||||
"""Analyze nodes to determine trigger type and integrations."""
|
||||
trigger_type = 'Manual'
|
||||
integrations = set()
|
||||
|
||||
# Enhanced service mapping for better recognition
|
||||
service_mappings = {
|
||||
# Messaging & Communication
|
||||
'telegram': 'Telegram',
|
||||
'telegramTrigger': 'Telegram',
|
||||
'discord': 'Discord',
|
||||
'slack': 'Slack',
|
||||
'whatsapp': 'WhatsApp',
|
||||
'mattermost': 'Mattermost',
|
||||
'teams': 'Microsoft Teams',
|
||||
'rocketchat': 'Rocket.Chat',
|
||||
|
||||
# Email
|
||||
'gmail': 'Gmail',
|
||||
'mailjet': 'Mailjet',
|
||||
'emailreadimap': 'Email (IMAP)',
|
||||
'emailsendsmt': 'Email (SMTP)',
|
||||
'outlook': 'Outlook',
|
||||
|
||||
# Cloud Storage
|
||||
'googledrive': 'Google Drive',
|
||||
'googledocs': 'Google Docs',
|
||||
'googlesheets': 'Google Sheets',
|
||||
'dropbox': 'Dropbox',
|
||||
'onedrive': 'OneDrive',
|
||||
'box': 'Box',
|
||||
|
||||
# Databases
|
||||
'postgres': 'PostgreSQL',
|
||||
'mysql': 'MySQL',
|
||||
'mongodb': 'MongoDB',
|
||||
'redis': 'Redis',
|
||||
'airtable': 'Airtable',
|
||||
'notion': 'Notion',
|
||||
|
||||
# Project Management
|
||||
'jira': 'Jira',
|
||||
'github': 'GitHub',
|
||||
'gitlab': 'GitLab',
|
||||
'trello': 'Trello',
|
||||
'asana': 'Asana',
|
||||
'mondaycom': 'Monday.com',
|
||||
|
||||
# AI/ML Services
|
||||
'openai': 'OpenAI',
|
||||
'anthropic': 'Anthropic',
|
||||
'huggingface': 'Hugging Face',
|
||||
|
||||
# Social Media
|
||||
'linkedin': 'LinkedIn',
|
||||
'twitter': 'Twitter/X',
|
||||
'facebook': 'Facebook',
|
||||
'instagram': 'Instagram',
|
||||
|
||||
# E-commerce
|
||||
'shopify': 'Shopify',
|
||||
'stripe': 'Stripe',
|
||||
'paypal': 'PayPal',
|
||||
|
||||
# Analytics
|
||||
'googleanalytics': 'Google Analytics',
|
||||
'mixpanel': 'Mixpanel',
|
||||
|
||||
# Calendar & Tasks
|
||||
'googlecalendar': 'Google Calendar',
|
||||
'googletasks': 'Google Tasks',
|
||||
'cal': 'Cal.com',
|
||||
'calendly': 'Calendly',
|
||||
|
||||
# Forms & Surveys
|
||||
'typeform': 'Typeform',
|
||||
'googleforms': 'Google Forms',
|
||||
'form': 'Form Trigger',
|
||||
|
||||
# Development Tools
|
||||
'webhook': 'Webhook',
|
||||
'httpRequest': 'HTTP Request',
|
||||
'graphql': 'GraphQL',
|
||||
'sse': 'Server-Sent Events',
|
||||
|
||||
# Utility nodes (exclude from integrations)
|
||||
'set': None,
|
||||
'function': None,
|
||||
'code': None,
|
||||
'if': None,
|
||||
'switch': None,
|
||||
'merge': None,
|
||||
'split': None,
|
||||
'stickynote': None,
|
||||
'stickyNote': None,
|
||||
'wait': None,
|
||||
'schedule': None,
|
||||
'cron': None,
|
||||
'manual': None,
|
||||
'stopanderror': None,
|
||||
'noop': None,
|
||||
'noOp': None,
|
||||
'error': None,
|
||||
'limit': None,
|
||||
'aggregate': None,
|
||||
'summarize': None,
|
||||
'filter': None,
|
||||
'sort': None,
|
||||
'removeDuplicates': None,
|
||||
'dateTime': None,
|
||||
'extractFromFile': None,
|
||||
'convertToFile': None,
|
||||
'readBinaryFile': None,
|
||||
'readBinaryFiles': None,
|
||||
'executionData': None,
|
||||
'executeWorkflow': None,
|
||||
'executeCommand': None,
|
||||
'respondToWebhook': None,
|
||||
}
|
||||
|
||||
for node in nodes:
|
||||
node_type = node.get('type', '')
|
||||
node_name = node.get('name', '').lower()
|
||||
|
||||
# Determine trigger type
|
||||
if 'webhook' in node_type.lower() or 'webhook' in node_name:
|
||||
trigger_type = 'Webhook'
|
||||
elif 'cron' in node_type.lower() or 'schedule' in node_type.lower():
|
||||
trigger_type = 'Scheduled'
|
||||
elif 'trigger' in node_type.lower() and trigger_type == 'Manual':
|
||||
if 'manual' not in node_type.lower():
|
||||
trigger_type = 'Webhook'
|
||||
|
||||
# Extract integrations with enhanced mapping
|
||||
service_name = None
|
||||
|
||||
# Handle n8n-nodes-base nodes
|
||||
if node_type.startswith('n8n-nodes-base.'):
|
||||
raw_service = node_type.replace('n8n-nodes-base.', '').lower()
|
||||
raw_service = raw_service.replace('trigger', '')
|
||||
service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None)
|
||||
|
||||
# Handle @n8n/ namespaced nodes
|
||||
elif node_type.startswith('@n8n/'):
|
||||
raw_service = node_type.split('.')[-1].lower() if '.' in node_type else node_type.lower()
|
||||
raw_service = raw_service.replace('trigger', '')
|
||||
service_name = service_mappings.get(raw_service, raw_service.title() if raw_service else None)
|
||||
|
||||
# Handle custom nodes
|
||||
elif '-' in node_type or '@' in node_type:
|
||||
# Try to extract service name from custom node names like "n8n-nodes-youtube-transcription-kasha.youtubeTranscripter"
|
||||
parts = node_type.lower().split('.')
|
||||
for part in parts:
|
||||
if 'youtube' in part:
|
||||
service_name = 'YouTube'
|
||||
break
|
||||
elif 'telegram' in part:
|
||||
service_name = 'Telegram'
|
||||
break
|
||||
elif 'discord' in part:
|
||||
service_name = 'Discord'
|
||||
break
|
||||
elif 'calcslive' in part:
|
||||
service_name = 'CalcsLive'
|
||||
break
|
||||
|
||||
# Also check node names for service hints (but avoid false positives)
|
||||
for service_key, service_value in service_mappings.items():
|
||||
if service_key in node_name and service_value:
|
||||
# Avoid false positive: "cal" in calcslive-related terms should not match "Cal.com"
|
||||
if service_key == 'cal' and any(term in node_name.lower() for term in ['calcslive', 'calc', 'calculation']):
|
||||
continue
|
||||
service_name = service_value
|
||||
break
|
||||
|
||||
# Add to integrations if valid service found
|
||||
if service_name and service_name not in ['None', None]:
|
||||
integrations.add(service_name)
|
||||
|
||||
# Determine if complex based on node variety and count
|
||||
if len(nodes) > 10 and len(integrations) > 3:
|
||||
trigger_type = 'Complex'
|
||||
|
||||
return trigger_type, integrations
|
||||
|
||||
def generate_description(self, workflow: Dict, trigger_type: str, integrations: set) -> str:
|
||||
"""Generate a descriptive summary of the workflow."""
|
||||
name = workflow['name']
|
||||
node_count = workflow['node_count']
|
||||
|
||||
# Start with trigger description
|
||||
trigger_descriptions = {
|
||||
'Webhook': "Webhook-triggered automation that",
|
||||
'Scheduled': "Scheduled automation that",
|
||||
'Complex': "Complex multi-step automation that",
|
||||
}
|
||||
desc = trigger_descriptions.get(trigger_type, "Manual workflow that")
|
||||
|
||||
# Add functionality based on name and integrations
|
||||
if integrations:
|
||||
main_services = list(integrations)[:3]
|
||||
if len(main_services) == 1:
|
||||
desc += f" integrates with {main_services[0]}"
|
||||
elif len(main_services) == 2:
|
||||
desc += f" connects {main_services[0]} and {main_services[1]}"
|
||||
else:
|
||||
desc += f" orchestrates {', '.join(main_services[:-1])}, and {main_services[-1]}"
|
||||
|
||||
# Add workflow purpose hints from name
|
||||
name_lower = name.lower()
|
||||
if 'create' in name_lower:
|
||||
desc += " to create new records"
|
||||
elif 'update' in name_lower:
|
||||
desc += " to update existing data"
|
||||
elif 'sync' in name_lower:
|
||||
desc += " to synchronize data"
|
||||
elif 'notification' in name_lower or 'alert' in name_lower:
|
||||
desc += " for notifications and alerts"
|
||||
elif 'backup' in name_lower:
|
||||
desc += " for data backup operations"
|
||||
elif 'monitor' in name_lower:
|
||||
desc += " for monitoring and reporting"
|
||||
else:
|
||||
desc += " for data processing"
|
||||
|
||||
desc += f". Uses {node_count} nodes"
|
||||
if len(integrations) > 3:
|
||||
desc += f" and integrates with {len(integrations)} services"
|
||||
|
||||
return desc + "."
|
||||
|
||||
def index_all_workflows(self, force_reindex: bool = False) -> Dict[str, int]:
|
||||
"""Index all workflow files. Only reprocesses changed files unless force_reindex=True."""
|
||||
if not os.path.exists(self.workflows_dir):
|
||||
print(f"Warning: Workflows directory '{self.workflows_dir}' not found.")
|
||||
return {'processed': 0, 'skipped': 0, 'errors': 0}
|
||||
|
||||
workflows_path = Path(self.workflows_dir)
|
||||
json_files = [str(p) for p in workflows_path.rglob("*.json")]
|
||||
|
||||
if not json_files:
|
||||
print(f"Warning: No JSON files found in '{self.workflows_dir}' directory.")
|
||||
return {'processed': 0, 'skipped': 0, 'errors': 0}
|
||||
|
||||
print(f"Indexing {len(json_files)} workflow files...")
|
||||
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
stats = {'processed': 0, 'skipped': 0, 'errors': 0}
|
||||
|
||||
for file_path in json_files:
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
try:
|
||||
# Check if file needs to be reprocessed
|
||||
if not force_reindex:
|
||||
current_hash = self.get_file_hash(file_path)
|
||||
cursor = conn.execute(
|
||||
"SELECT file_hash FROM workflows WHERE filename = ?",
|
||||
(filename,)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if row and row['file_hash'] == current_hash:
|
||||
stats['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Analyze workflow
|
||||
workflow_data = self.analyze_workflow_file(file_path)
|
||||
if not workflow_data:
|
||||
stats['errors'] += 1
|
||||
continue
|
||||
|
||||
# Insert or update in database
|
||||
conn.execute("""
|
||||
INSERT OR REPLACE INTO workflows (
|
||||
filename, name, workflow_id, active, description, trigger_type,
|
||||
complexity, node_count, integrations, tags, created_at, updated_at,
|
||||
file_hash, file_size, analyzed_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
|
||||
""", (
|
||||
workflow_data['filename'],
|
||||
workflow_data['name'],
|
||||
workflow_data['workflow_id'],
|
||||
workflow_data['active'],
|
||||
workflow_data['description'],
|
||||
workflow_data['trigger_type'],
|
||||
workflow_data['complexity'],
|
||||
workflow_data['node_count'],
|
||||
json.dumps(workflow_data['integrations']),
|
||||
json.dumps(workflow_data['tags']),
|
||||
workflow_data['created_at'],
|
||||
workflow_data['updated_at'],
|
||||
workflow_data['file_hash'],
|
||||
workflow_data['file_size']
|
||||
))
|
||||
|
||||
stats['processed'] += 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing {file_path}: {str(e)}")
|
||||
stats['errors'] += 1
|
||||
continue
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"✅ Indexing complete: {stats['processed']} processed, {stats['skipped']} skipped, {stats['errors']} errors")
|
||||
return stats
|
||||
|
||||
def search_workflows(self, query: str = "", trigger_filter: str = "all",
|
||||
complexity_filter: str = "all", active_only: bool = False,
|
||||
limit: int = 50, offset: int = 0) -> Tuple[List[Dict], int]:
|
||||
"""Fast search with filters and pagination."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Build WHERE clause
|
||||
where_conditions = []
|
||||
params = []
|
||||
|
||||
if active_only:
|
||||
where_conditions.append("w.active = 1")
|
||||
|
||||
if trigger_filter != "all":
|
||||
where_conditions.append("w.trigger_type = ?")
|
||||
params.append(trigger_filter)
|
||||
|
||||
if complexity_filter != "all":
|
||||
where_conditions.append("w.complexity = ?")
|
||||
params.append(complexity_filter)
|
||||
|
||||
# Use FTS search if query provided
|
||||
if query.strip():
|
||||
# FTS search with ranking
|
||||
base_query = """
|
||||
SELECT w.*, rank
|
||||
FROM workflows_fts fts
|
||||
JOIN workflows w ON w.id = fts.rowid
|
||||
WHERE workflows_fts MATCH ?
|
||||
"""
|
||||
params.insert(0, query)
|
||||
else:
|
||||
# Regular query without FTS
|
||||
base_query = """
|
||||
SELECT w.*, 0 as rank
|
||||
FROM workflows w
|
||||
WHERE 1=1
|
||||
"""
|
||||
|
||||
if where_conditions:
|
||||
base_query += " AND " + " AND ".join(where_conditions)
|
||||
|
||||
# Count total results
|
||||
count_query = f"SELECT COUNT(*) as total FROM ({base_query}) t"
|
||||
cursor = conn.execute(count_query, params)
|
||||
total = cursor.fetchone()['total']
|
||||
|
||||
# Get paginated results
|
||||
if query.strip():
|
||||
base_query += " ORDER BY rank"
|
||||
else:
|
||||
base_query += " ORDER BY w.analyzed_at DESC"
|
||||
|
||||
base_query += f" LIMIT {limit} OFFSET {offset}"
|
||||
|
||||
cursor = conn.execute(base_query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Convert to dictionaries and parse JSON fields
|
||||
results = []
|
||||
for row in rows:
|
||||
workflow = dict(row)
|
||||
workflow['integrations'] = json.loads(workflow['integrations'] or '[]')
|
||||
|
||||
# Parse tags and convert dict tags to strings
|
||||
raw_tags = json.loads(workflow['tags'] or '[]')
|
||||
clean_tags = []
|
||||
for tag in raw_tags:
|
||||
if isinstance(tag, dict):
|
||||
# Extract name from tag dict if available
|
||||
clean_tags.append(tag.get('name', str(tag.get('id', 'tag'))))
|
||||
else:
|
||||
clean_tags.append(str(tag))
|
||||
workflow['tags'] = clean_tags
|
||||
|
||||
results.append(workflow)
|
||||
|
||||
conn.close()
|
||||
return results, total
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get database statistics."""
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Basic counts
|
||||
cursor = conn.execute("SELECT COUNT(*) as total FROM workflows")
|
||||
total = cursor.fetchone()['total']
|
||||
|
||||
cursor = conn.execute("SELECT COUNT(*) as active FROM workflows WHERE active = 1")
|
||||
active = cursor.fetchone()['active']
|
||||
|
||||
# Trigger type breakdown
|
||||
cursor = conn.execute("""
|
||||
SELECT trigger_type, COUNT(*) as count
|
||||
FROM workflows
|
||||
GROUP BY trigger_type
|
||||
""")
|
||||
triggers = {row['trigger_type']: row['count'] for row in cursor.fetchall()}
|
||||
|
||||
# Complexity breakdown
|
||||
cursor = conn.execute("""
|
||||
SELECT complexity, COUNT(*) as count
|
||||
FROM workflows
|
||||
GROUP BY complexity
|
||||
""")
|
||||
complexity = {row['complexity']: row['count'] for row in cursor.fetchall()}
|
||||
|
||||
# Node stats
|
||||
cursor = conn.execute("SELECT SUM(node_count) as total_nodes FROM workflows")
|
||||
total_nodes = cursor.fetchone()['total_nodes'] or 0
|
||||
|
||||
# Unique integrations count
|
||||
cursor = conn.execute("SELECT integrations FROM workflows WHERE integrations != '[]'")
|
||||
all_integrations = set()
|
||||
for row in cursor.fetchall():
|
||||
integrations = json.loads(row['integrations'])
|
||||
all_integrations.update(integrations)
|
||||
|
||||
conn.close()
|
||||
|
||||
return {
|
||||
'total': total,
|
||||
'active': active,
|
||||
'inactive': total - active,
|
||||
'triggers': triggers,
|
||||
'complexity': complexity,
|
||||
'total_nodes': total_nodes,
|
||||
'unique_integrations': len(all_integrations),
|
||||
'last_indexed': datetime.datetime.now().isoformat()
|
||||
}
|
||||
|
||||
def get_service_categories(self) -> Dict[str, List[str]]:
|
||||
"""Get service categories for enhanced filtering."""
|
||||
return {
|
||||
'messaging': ['Telegram', 'Discord', 'Slack', 'WhatsApp', 'Mattermost', 'Microsoft Teams', 'Rocket.Chat'],
|
||||
'email': ['Gmail', 'Mailjet', 'Email (IMAP)', 'Email (SMTP)', 'Outlook'],
|
||||
'cloud_storage': ['Google Drive', 'Google Docs', 'Google Sheets', 'Dropbox', 'OneDrive', 'Box'],
|
||||
'database': ['PostgreSQL', 'MySQL', 'MongoDB', 'Redis', 'Airtable', 'Notion'],
|
||||
'project_management': ['Jira', 'GitHub', 'GitLab', 'Trello', 'Asana', 'Monday.com'],
|
||||
'ai_ml': ['OpenAI', 'Anthropic', 'Hugging Face', 'CalcsLive'],
|
||||
'social_media': ['LinkedIn', 'Twitter/X', 'Facebook', 'Instagram'],
|
||||
'ecommerce': ['Shopify', 'Stripe', 'PayPal'],
|
||||
'analytics': ['Google Analytics', 'Mixpanel'],
|
||||
'calendar_tasks': ['Google Calendar', 'Google Tasks', 'Cal.com', 'Calendly'],
|
||||
'forms': ['Typeform', 'Google Forms', 'Form Trigger'],
|
||||
'development': ['Webhook', 'HTTP Request', 'GraphQL', 'Server-Sent Events', 'YouTube']
|
||||
}
|
||||
|
||||
def search_by_category(self, category: str, limit: int = 50, offset: int = 0) -> Tuple[List[Dict], int]:
|
||||
"""Search workflows by service category."""
|
||||
categories = self.get_service_categories()
|
||||
if category not in categories:
|
||||
return [], 0
|
||||
|
||||
services = categories[category]
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Build OR conditions for all services in category
|
||||
service_conditions = []
|
||||
params = []
|
||||
for service in services:
|
||||
service_conditions.append("integrations LIKE ?")
|
||||
params.append(f'%"{service}"%')
|
||||
|
||||
where_clause = " OR ".join(service_conditions)
|
||||
|
||||
# Count total results
|
||||
count_query = f"SELECT COUNT(*) as total FROM workflows WHERE {where_clause}"
|
||||
cursor = conn.execute(count_query, params)
|
||||
total = cursor.fetchone()['total']
|
||||
|
||||
# Get paginated results
|
||||
query = f"""
|
||||
SELECT * FROM workflows
|
||||
WHERE {where_clause}
|
||||
ORDER BY analyzed_at DESC
|
||||
LIMIT {limit} OFFSET {offset}
|
||||
"""
|
||||
|
||||
cursor = conn.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Convert to dictionaries and parse JSON fields
|
||||
results = []
|
||||
for row in rows:
|
||||
workflow = dict(row)
|
||||
workflow['integrations'] = json.loads(workflow['integrations'] or '[]')
|
||||
raw_tags = json.loads(workflow['tags'] or '[]')
|
||||
clean_tags = []
|
||||
for tag in raw_tags:
|
||||
if isinstance(tag, dict):
|
||||
clean_tags.append(tag.get('name', str(tag.get('id', 'tag'))))
|
||||
else:
|
||||
clean_tags.append(str(tag))
|
||||
workflow['tags'] = clean_tags
|
||||
results.append(workflow)
|
||||
|
||||
conn.close()
|
||||
return results, total
|
||||
|
||||
|
||||
def main():
|
||||
"""Command-line interface for workflow database."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description='N8N Workflow Database')
|
||||
parser.add_argument('--index', action='store_true', help='Index all workflows')
|
||||
parser.add_argument('--force', action='store_true', help='Force reindex all files')
|
||||
parser.add_argument('--search', help='Search workflows')
|
||||
parser.add_argument('--stats', action='store_true', help='Show database statistics')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
db = WorkflowDatabase()
|
||||
|
||||
if args.index:
|
||||
stats = db.index_all_workflows(force_reindex=args.force)
|
||||
print(f"Indexed {stats['processed']} workflows")
|
||||
|
||||
elif args.search:
|
||||
results, total = db.search_workflows(args.search, limit=10)
|
||||
print(f"Found {total} workflows:")
|
||||
for workflow in results:
|
||||
print(f" - {workflow['name']} ({workflow['trigger_type']}, {workflow['node_count']} nodes)")
|
||||
|
||||
elif args.stats:
|
||||
stats = db.get_stats()
|
||||
print(f"Database Statistics:")
|
||||
print(f" Total workflows: {stats['total']}")
|
||||
print(f" Active: {stats['active']}")
|
||||
print(f" Total nodes: {stats['total_nodes']}")
|
||||
print(f" Unique integrations: {stats['unique_integrations']}")
|
||||
print(f" Trigger types: {stats['triggers']}")
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user