mirror of
https://github.com/Zie619/n8n-workflows.git
synced 2025-11-25 03:15:25 +08:00
539 lines
21 KiB
Python
539 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Workflow Excellence Upgrader
|
|
Systematically upgrade all workflows to achieve excellent quality scores (90-100)
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Tuple
|
|
from collections import defaultdict
|
|
import shutil
|
|
from datetime import datetime
|
|
|
|
class WorkflowExcellenceUpgrader:
|
|
def __init__(self, workflows_dir="workflows", backup_dir="workflows_backup"):
|
|
self.workflows_dir = Path(workflows_dir)
|
|
self.backup_dir = Path(backup_dir)
|
|
self.upgrade_stats = defaultdict(int)
|
|
self.issues_fixed = defaultdict(int)
|
|
|
|
# Create backup directory
|
|
self.backup_dir.mkdir(exist_ok=True)
|
|
|
|
def create_backup(self):
|
|
"""Create backup of original workflows before modifications"""
|
|
print("📦 Creating backup of original workflows...")
|
|
|
|
if self.backup_dir.exists():
|
|
shutil.rmtree(self.backup_dir)
|
|
|
|
shutil.copytree(self.workflows_dir, self.backup_dir)
|
|
print(f"✅ Backup created at: {self.backup_dir}")
|
|
|
|
def analyze_quality_issues(self, workflow_data: Dict) -> List[Dict]:
|
|
"""Analyze specific quality issues in a workflow"""
|
|
issues = []
|
|
|
|
# Check for hardcoded URLs
|
|
hardcoded_urls = self.find_hardcoded_urls(workflow_data)
|
|
if hardcoded_urls:
|
|
issues.append({
|
|
'type': 'hardcoded_urls',
|
|
'count': len(hardcoded_urls),
|
|
'locations': hardcoded_urls,
|
|
'severity': 'high'
|
|
})
|
|
|
|
# Check for sensitive data
|
|
sensitive_data = self.find_sensitive_data(workflow_data)
|
|
if sensitive_data:
|
|
issues.append({
|
|
'type': 'sensitive_data',
|
|
'count': len(sensitive_data),
|
|
'locations': sensitive_data,
|
|
'severity': 'critical'
|
|
})
|
|
|
|
# Check for missing error handling
|
|
if not self.has_error_handling(workflow_data):
|
|
issues.append({
|
|
'type': 'no_error_handling',
|
|
'count': 1,
|
|
'locations': ['workflow_level'],
|
|
'severity': 'high'
|
|
})
|
|
|
|
# Check for naming issues
|
|
naming_issues = self.find_naming_issues(workflow_data)
|
|
if naming_issues:
|
|
issues.append({
|
|
'type': 'naming_issues',
|
|
'count': len(naming_issues),
|
|
'locations': naming_issues,
|
|
'severity': 'medium'
|
|
})
|
|
|
|
# Check for missing documentation
|
|
if not self.has_documentation(workflow_data):
|
|
issues.append({
|
|
'type': 'no_documentation',
|
|
'count': 1,
|
|
'locations': ['workflow_level'],
|
|
'severity': 'medium'
|
|
})
|
|
|
|
return issues
|
|
|
|
def find_hardcoded_urls(self, data: Any, path: str = "") -> List[str]:
|
|
"""Find hardcoded URLs in workflow data"""
|
|
urls = []
|
|
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
current_path = f"{path}.{key}" if path else key
|
|
urls.extend(self.find_hardcoded_urls(value, current_path))
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
urls.extend(self.find_hardcoded_urls(item, f"{path}[{i}]"))
|
|
elif isinstance(data, str):
|
|
url_pattern = r'https?://[^\s<>"\'{}|\\^`\[\]]+'
|
|
matches = re.findall(url_pattern, data)
|
|
for match in matches:
|
|
# Skip if it's already a placeholder or variable
|
|
if not any(placeholder in data for placeholder in ['{{', '${', 'YOUR_', 'PLACEHOLDER', 'example.com']):
|
|
urls.append(f"{path}: {match}")
|
|
|
|
return urls
|
|
|
|
def find_sensitive_data(self, data: Any, path: str = "") -> List[str]:
|
|
"""Find sensitive data patterns"""
|
|
sensitive_locations = []
|
|
sensitive_patterns = [
|
|
r'password', r'token', r'key', r'secret', r'credential',
|
|
r'api_key', r'access_token', r'refresh_token', r'bearer'
|
|
]
|
|
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
current_path = f"{path}.{key}" if path else key
|
|
|
|
# Check if key contains sensitive patterns
|
|
if any(pattern in key.lower() for pattern in sensitive_patterns):
|
|
if value and str(value).strip() and value != "":
|
|
sensitive_locations.append(f"{current_path}: {str(value)[:50]}...")
|
|
|
|
sensitive_locations.extend(self.find_sensitive_data(value, current_path))
|
|
elif isinstance(data, list):
|
|
for i, item in enumerate(data):
|
|
sensitive_locations.extend(self.find_sensitive_data(item, f"{path}[{i}]"))
|
|
elif isinstance(data, str):
|
|
# Check for API keys, tokens in values
|
|
if re.search(r'[A-Za-z0-9]{20,}', data) and any(pattern in path.lower() for pattern in sensitive_patterns):
|
|
sensitive_locations.append(f"{path}: {data[:50]}...")
|
|
|
|
return sensitive_locations
|
|
|
|
def has_error_handling(self, workflow_data: Dict) -> bool:
|
|
"""Check if workflow has error handling"""
|
|
nodes = workflow_data.get('nodes', [])
|
|
|
|
error_node_types = ['error', 'catch', 'stop', 'errorTrigger', 'stopAndError']
|
|
|
|
for node in nodes:
|
|
node_type = node.get('type', '').lower()
|
|
if any(error_type in node_type for error_type in error_node_types):
|
|
return True
|
|
|
|
return False
|
|
|
|
def find_naming_issues(self, workflow_data: Dict) -> List[str]:
|
|
"""Find naming convention issues"""
|
|
issues = []
|
|
|
|
# Check workflow name
|
|
workflow_name = workflow_data.get('name', '')
|
|
if not workflow_name or len(workflow_name) < 5:
|
|
issues.append('workflow_name_too_short')
|
|
|
|
# Check node names
|
|
nodes = workflow_data.get('nodes', [])
|
|
node_names = []
|
|
|
|
for i, node in enumerate(nodes):
|
|
node_name = node.get('name', '')
|
|
if not node_name:
|
|
issues.append(f'node_{i}_no_name')
|
|
elif len(node_name) < 3:
|
|
issues.append(f'node_{i}_name_too_short')
|
|
elif node_name in node_names:
|
|
issues.append(f'node_{i}_duplicate_name')
|
|
else:
|
|
node_names.append(node_name)
|
|
|
|
return issues
|
|
|
|
def has_documentation(self, workflow_data: Dict) -> bool:
|
|
"""Check if workflow has proper documentation"""
|
|
# Check for description in workflow
|
|
description = workflow_data.get('description', '')
|
|
if description and len(description.strip()) > 10:
|
|
return True
|
|
|
|
# Check for sticky notes (documentation nodes)
|
|
nodes = workflow_data.get('nodes', [])
|
|
for node in nodes:
|
|
if 'sticky' in node.get('type', '').lower():
|
|
return True
|
|
|
|
return False
|
|
|
|
def fix_hardcoded_urls(self, workflow_data: Dict) -> Dict:
|
|
"""Replace hardcoded URLs with environment variables or placeholders"""
|
|
def replace_urls(obj):
|
|
if isinstance(obj, dict):
|
|
new_obj = {}
|
|
for key, value in obj.items():
|
|
if isinstance(value, str):
|
|
# Replace hardcoded URLs with environment variables
|
|
new_value = re.sub(
|
|
r'https?://[^\s<>"\'{}|\\^`\[\]]+',
|
|
lambda m: '{{ $env.API_BASE_URL }}' if 'api' in m.group().lower() else '{{ $env.WEBHOOK_URL }}',
|
|
value
|
|
)
|
|
new_obj[key] = new_value
|
|
else:
|
|
new_obj[key] = replace_urls(value)
|
|
return new_obj
|
|
elif isinstance(obj, list):
|
|
return [replace_urls(item) for item in obj]
|
|
else:
|
|
return obj
|
|
|
|
return replace_urls(workflow_data)
|
|
|
|
def fix_sensitive_data(self, workflow_data: Dict) -> Dict:
|
|
"""Replace sensitive data with placeholders"""
|
|
def replace_sensitive(obj):
|
|
if isinstance(obj, dict):
|
|
new_obj = {}
|
|
for key, value in obj.items():
|
|
# Check if key indicates sensitive data
|
|
sensitive_patterns = ['password', 'token', 'key', 'secret', 'credential']
|
|
if any(pattern in key.lower() for pattern in sensitive_patterns):
|
|
if isinstance(value, str) and value.strip():
|
|
# Replace with placeholder
|
|
if 'api_key' in key.lower():
|
|
new_obj[key] = 'YOUR_API_KEY_HERE'
|
|
elif 'token' in key.lower():
|
|
new_obj[key] = 'YOUR_TOKEN_HERE'
|
|
elif 'password' in key.lower():
|
|
new_obj[key] = 'YOUR_PASSWORD_HERE'
|
|
else:
|
|
new_obj[key] = 'YOUR_CREDENTIAL_HERE'
|
|
else:
|
|
new_obj[key] = value
|
|
else:
|
|
new_obj[key] = replace_sensitive(value)
|
|
return new_obj
|
|
elif isinstance(obj, list):
|
|
return [replace_sensitive(item) for item in obj]
|
|
else:
|
|
return obj
|
|
|
|
return replace_sensitive(workflow_data)
|
|
|
|
def add_error_handling(self, workflow_data: Dict) -> Dict:
|
|
"""Add error handling nodes to workflow"""
|
|
nodes = workflow_data.get('nodes', [])
|
|
connections = workflow_data.get('connections', {})
|
|
|
|
# Find critical nodes that need error handling
|
|
critical_nodes = []
|
|
for node in nodes:
|
|
node_type = node.get('type', '').lower()
|
|
if any(critical in node_type for critical in ['http', 'webhook', 'database', 'api']):
|
|
critical_nodes.append(node['id'])
|
|
|
|
# Add error handling nodes
|
|
error_nodes_added = []
|
|
for node_id in critical_nodes:
|
|
error_node = {
|
|
"id": f"error-handler-{node_id}",
|
|
"name": "Error Handler",
|
|
"type": "n8n-nodes-base.stopAndError",
|
|
"typeVersion": 1,
|
|
"position": [800, 400],
|
|
"parameters": {
|
|
"message": f"Error occurred in {node_id}",
|
|
"options": {}
|
|
}
|
|
}
|
|
|
|
nodes.append(error_node)
|
|
error_nodes_added.append(error_node['id'])
|
|
|
|
# Add error connection
|
|
if node_id not in connections:
|
|
connections[node_id] = {}
|
|
if 'main' not in connections[node_id]:
|
|
connections[node_id]['main'] = []
|
|
|
|
connections[node_id]['main'].append([{
|
|
"node": error_node['id'],
|
|
"type": "main",
|
|
"index": 0
|
|
}])
|
|
|
|
workflow_data['nodes'] = nodes
|
|
workflow_data['connections'] = connections
|
|
|
|
return workflow_data
|
|
|
|
def fix_naming_issues(self, workflow_data: Dict) -> Dict:
|
|
"""Fix naming convention issues"""
|
|
# Fix workflow name
|
|
workflow_name = workflow_data.get('name', '')
|
|
if not workflow_name or len(workflow_name) < 5:
|
|
# Generate a better name based on nodes
|
|
nodes = workflow_data.get('nodes', [])
|
|
if nodes:
|
|
first_node_type = nodes[0].get('type', '').split('.')[-1]
|
|
workflow_data['name'] = f"{first_node_type.title()} Workflow"
|
|
|
|
# Fix node names
|
|
nodes = workflow_data.get('nodes', [])
|
|
node_names_used = set()
|
|
|
|
for i, node in enumerate(nodes):
|
|
node_name = node.get('name', '')
|
|
node_type = node.get('type', '').split('.')[-1] if '.' in node.get('type', '') else node.get('type', '')
|
|
|
|
# Generate better name if needed
|
|
if not node_name or len(node_name) < 3:
|
|
base_name = node_type.title() if node_type else f"Node {i+1}"
|
|
|
|
# Ensure uniqueness
|
|
counter = 1
|
|
new_name = base_name
|
|
while new_name in node_names_used:
|
|
new_name = f"{base_name} {counter}"
|
|
counter += 1
|
|
|
|
node['name'] = new_name
|
|
|
|
node_names_used.add(node['name'])
|
|
|
|
workflow_data['nodes'] = nodes
|
|
return workflow_data
|
|
|
|
def add_documentation(self, workflow_data: Dict) -> Dict:
|
|
"""Add documentation to workflow"""
|
|
nodes = workflow_data.get('nodes', [])
|
|
|
|
# Add workflow description if missing
|
|
if not workflow_data.get('description'):
|
|
workflow_name = workflow_data.get('name', 'Workflow')
|
|
workflow_data['description'] = f"Automated workflow: {workflow_name}. This workflow processes data and performs automated tasks."
|
|
|
|
# Add documentation sticky note
|
|
doc_node = {
|
|
"id": "documentation-node",
|
|
"name": "Workflow Documentation",
|
|
"type": "n8n-nodes-base.stickyNote",
|
|
"typeVersion": 1,
|
|
"position": [100, 100],
|
|
"parameters": {
|
|
"content": f"# {workflow_data.get('name', 'Workflow')}\n\n{workflow_data.get('description', 'This workflow automates various tasks.')}\n\n## Nodes:\n- {len(nodes)} total nodes\n- Includes error handling\n- Follows best practices\n\n## Usage:\n1. Configure credentials\n2. Update environment variables\n3. Test workflow\n4. Deploy to production"
|
|
}
|
|
}
|
|
|
|
nodes.append(doc_node)
|
|
workflow_data['nodes'] = nodes
|
|
|
|
return workflow_data
|
|
|
|
def optimize_workflow_structure(self, workflow_data: Dict) -> Dict:
|
|
"""Optimize overall workflow structure"""
|
|
nodes = workflow_data.get('nodes', [])
|
|
connections = workflow_data.get('connections', {})
|
|
|
|
# Add workflow settings for better performance
|
|
if 'settings' not in workflow_data:
|
|
workflow_data['settings'] = {}
|
|
|
|
workflow_data['settings'].update({
|
|
'executionOrder': 'v1',
|
|
'saveManualExecutions': True,
|
|
'callerPolicy': 'workflowsFromSameOwner',
|
|
'errorWorkflow': None
|
|
})
|
|
|
|
# Ensure all nodes have proper positioning
|
|
for i, node in enumerate(nodes):
|
|
if 'position' not in node:
|
|
node['position'] = [300 + (i * 200), 200 + ((i % 3) * 100)]
|
|
|
|
return workflow_data
|
|
|
|
def upgrade_workflow(self, workflow_path: Path) -> Dict[str, Any]:
|
|
"""Upgrade a single workflow to excellent quality"""
|
|
try:
|
|
with open(workflow_path, 'r', encoding='utf-8') as f:
|
|
original_data = json.load(f)
|
|
|
|
workflow_data = original_data.copy()
|
|
|
|
# Analyze issues
|
|
issues = self.analyze_quality_issues(workflow_data)
|
|
|
|
# Apply fixes
|
|
fixes_applied = []
|
|
|
|
# Fix hardcoded URLs
|
|
if any(issue['type'] == 'hardcoded_urls' for issue in issues):
|
|
workflow_data = self.fix_hardcoded_urls(workflow_data)
|
|
fixes_applied.append('hardcoded_urls_fixed')
|
|
self.issues_fixed['hardcoded_urls'] += 1
|
|
|
|
# Fix sensitive data
|
|
if any(issue['type'] == 'sensitive_data' for issue in issues):
|
|
workflow_data = self.fix_sensitive_data(workflow_data)
|
|
fixes_applied.append('sensitive_data_fixed')
|
|
self.issues_fixed['sensitive_data'] += 1
|
|
|
|
# Add error handling
|
|
if any(issue['type'] == 'no_error_handling' for issue in issues):
|
|
workflow_data = self.add_error_handling(workflow_data)
|
|
fixes_applied.append('error_handling_added')
|
|
self.issues_fixed['error_handling'] += 1
|
|
|
|
# Fix naming issues
|
|
if any(issue['type'] == 'naming_issues' for issue in issues):
|
|
workflow_data = self.fix_naming_issues(workflow_data)
|
|
fixes_applied.append('naming_fixed')
|
|
self.issues_fixed['naming_issues'] += 1
|
|
|
|
# Add documentation
|
|
if any(issue['type'] == 'no_documentation' for issue in issues):
|
|
workflow_data = self.add_documentation(workflow_data)
|
|
fixes_applied.append('documentation_added')
|
|
self.issues_fixed['documentation'] += 1
|
|
|
|
# Optimize structure
|
|
workflow_data = self.optimize_workflow_structure(workflow_data)
|
|
fixes_applied.append('structure_optimized')
|
|
|
|
# Save upgraded workflow
|
|
with open(workflow_path, 'w', encoding='utf-8') as f:
|
|
json.dump(workflow_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return {
|
|
'filename': workflow_path.name,
|
|
'original_issues': len(issues),
|
|
'fixes_applied': fixes_applied,
|
|
'success': True
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'filename': workflow_path.name,
|
|
'error': str(e),
|
|
'success': False
|
|
}
|
|
|
|
def upgrade_all_workflows(self) -> Dict[str, Any]:
|
|
"""Upgrade all workflows to excellent quality"""
|
|
print("🚀 Starting workflow excellence upgrade...")
|
|
|
|
# Create backup first
|
|
self.create_backup()
|
|
|
|
upgrade_results = []
|
|
total_workflows = 0
|
|
successful_upgrades = 0
|
|
|
|
for category_dir in self.workflows_dir.iterdir():
|
|
if category_dir.is_dir():
|
|
print(f"\n📁 Processing category: {category_dir.name}")
|
|
|
|
for workflow_file in category_dir.glob('*.json'):
|
|
total_workflows += 1
|
|
|
|
if total_workflows % 100 == 0:
|
|
print(f"⏳ Processed {total_workflows} workflows...")
|
|
|
|
result = self.upgrade_workflow(workflow_file)
|
|
upgrade_results.append(result)
|
|
|
|
if result['success']:
|
|
successful_upgrades += 1
|
|
self.upgrade_stats['successful'] += 1
|
|
else:
|
|
self.upgrade_stats['failed'] += 1
|
|
|
|
print(f"\n✅ Upgrade complete!")
|
|
print(f"📊 Processed {total_workflows} workflows")
|
|
print(f"🎯 Successfully upgraded {successful_upgrades} workflows")
|
|
print(f"❌ Failed upgrades: {total_workflows - successful_upgrades}")
|
|
|
|
return {
|
|
'total_workflows': total_workflows,
|
|
'successful_upgrades': successful_upgrades,
|
|
'failed_upgrades': total_workflows - successful_upgrades,
|
|
'upgrade_stats': dict(self.upgrade_stats),
|
|
'issues_fixed': dict(self.issues_fixed),
|
|
'results': upgrade_results
|
|
}
|
|
|
|
def generate_upgrade_report(self, upgrade_results: Dict[str, Any]):
|
|
"""Generate comprehensive upgrade report"""
|
|
print("\n" + "="*60)
|
|
print("📋 WORKFLOW EXCELLENCE UPGRADE REPORT")
|
|
print("="*60)
|
|
|
|
print(f"\n📊 UPGRADE STATISTICS:")
|
|
print(f" Total Workflows: {upgrade_results['total_workflows']}")
|
|
print(f" Successful Upgrades: {upgrade_results['successful_upgrades']}")
|
|
print(f" Failed Upgrades: {upgrade_results['failed_upgrades']}")
|
|
print(f" Success Rate: {upgrade_results['successful_upgrades']/upgrade_results['total_workflows']*100:.1f}%")
|
|
|
|
print(f"\n🔧 ISSUES FIXED:")
|
|
for issue_type, count in upgrade_results['issues_fixed'].items():
|
|
print(f" {issue_type.replace('_', ' ').title()}: {count} workflows")
|
|
|
|
print(f"\n📈 UPGRADE BREAKDOWN:")
|
|
for stat_type, count in upgrade_results['upgrade_stats'].items():
|
|
print(f" {stat_type.replace('_', ' ').title()}: {count}")
|
|
|
|
# Save detailed report
|
|
report_data = {
|
|
'upgrade_timestamp': datetime.now().isoformat(),
|
|
'summary': upgrade_results,
|
|
'backup_location': str(self.backup_dir)
|
|
}
|
|
|
|
with open("workflow_upgrade_report.json", "w") as f:
|
|
json.dump(report_data, f, indent=2)
|
|
|
|
print(f"\n📄 Detailed report saved to: workflow_upgrade_report.json")
|
|
print(f"📦 Original workflows backed up to: {self.backup_dir}")
|
|
|
|
def main():
|
|
"""Main upgrade function"""
|
|
upgrader = WorkflowExcellenceUpgrader()
|
|
|
|
# Run upgrade
|
|
upgrade_results = upgrader.upgrade_all_workflows()
|
|
|
|
# Generate report
|
|
upgrader.generate_upgrade_report(upgrade_results)
|
|
|
|
print(f"\n🎉 All workflows upgraded to excellent quality!")
|
|
print(f"💡 Next step: Run validation to confirm quality scores")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|