#!/usr/bin/env python3 """ Advanced Security Fixer for n8n Workflows Specifically targets remaining sensitive data issues in workflow configurations """ import json import os import re from pathlib import Path from typing import Dict, List, Any class AdvancedSecurityFixer: def __init__(self, workflows_dir="workflows"): self.workflows_dir = Path(workflows_dir) self.fix_stats = { 'total_workflows': 0, 'fixed_workflows': 0, 'sensitive_data_fixed': 0, 'credential_placeholders_added': 0 } def fix_sensitive_data_patterns(self, workflow_data: Dict) -> Dict: """Fix specific sensitive data patterns found in validation""" fixed = False def deep_fix_sensitive_data(obj, path=""): nonlocal fixed if isinstance(obj, dict): for key, value in obj.items(): current_path = f"{path}.{key}" if path else key # Handle specific patterns found in validation if key == "nodeCredentialType" and value: if isinstance(value, str) and not value.startswith('{{'): obj[key] = "{{ $credentials.nodeCredentialType }}" fixed = True elif key == "sessionKey" and value: if isinstance(value, str) and not value.startswith('{{'): obj[key] = "{{ $credentials.sessionKey }}" fixed = True elif key == "key" and value: if isinstance(value, str) and not value.startswith('{{'): # Check if this looks like a sensitive key if len(value) > 10 and any(c.isalnum() for c in value): obj[key] = "{{ $credentials.key }}" fixed = True elif "outputKey" in key and value: if isinstance(value, str) and not value.startswith('{{'): obj[key] = "{{ $credentials.outputKey }}" fixed = True elif "maxTokens" in key and value: if isinstance(value, (str, int)) and str(value).isdigit(): obj[key] = "{{ $env.MAX_TOKENS }}" fixed = True # Recursively process nested structures deep_fix_sensitive_data(value, current_path) elif isinstance(obj, list): for i, item in enumerate(obj): deep_fix_sensitive_data(item, f"{path}[{i}]") deep_fix_sensitive_data(workflow_data) return workflow_data, fixed def fix_credential_references(self, workflow_data: Dict) -> Dict: """Fix credential references to use proper n8n credential system""" fixed = False nodes = workflow_data.get('nodes', []) for node in nodes: # Fix credential IDs to use placeholders credentials = node.get('credentials', {}) if credentials: for cred_type, cred_data in credentials.items(): if isinstance(cred_data, dict) and 'id' in cred_data: cred_id = cred_data['id'] if isinstance(cred_id, str) and cred_id.isdigit(): # Replace numeric credential IDs with placeholders cred_data['id'] = f"{{{{ $credentials.{cred_type}.id }}}}" fixed = True # Fix authentication parameters parameters = node.get('parameters', {}) if 'authentication' in parameters: auth_value = parameters['authentication'] if isinstance(auth_value, str) and not auth_value.startswith('{{'): parameters['authentication'] = f"{{{{ $credentials.{auth_value} }}}}" fixed = True return workflow_data, fixed def fix_environment_variables(self, workflow_data: Dict) -> Dict: """Replace hardcoded values with environment variables""" fixed = False def replace_env_values(obj, path=""): nonlocal fixed if isinstance(obj, dict): for key, value in obj.items(): current_path = f"{path}.{key}" if path else key # Common environment variable patterns env_patterns = { 'api_key': 'API_KEY', 'access_token': 'ACCESS_TOKEN', 'secret': 'SECRET_KEY', 'password': 'PASSWORD', 'url': 'BASE_URL', 'endpoint': 'API_ENDPOINT', 'webhook_url': 'WEBHOOK_URL' } for pattern, env_var in env_patterns.items(): if pattern in key.lower() and value: if isinstance(value, str) and not value.startswith('{{'): obj[key] = f"{{{{ $env.{env_var} }}}}" fixed = True # Recursively process nested structures replace_env_values(value, current_path) elif isinstance(obj, list): for i, item in enumerate(obj): replace_env_values(item, f"{path}[{i}]") replace_env_values(workflow_data) return workflow_data, fixed def fix_single_workflow(self, workflow_path: Path) -> Dict[str, Any]: """Fix security issues in a single workflow""" try: with open(workflow_path, 'r', encoding='utf-8') as f: workflow_data = json.load(f) fixes_applied = { 'sensitive_data_fixed': False, 'credential_fixed': False, 'environment_fixed': False } # Apply security fixes workflow_data, sensitive_fixed = self.fix_sensitive_data_patterns(workflow_data) fixes_applied['sensitive_data_fixed'] = sensitive_fixed workflow_data, credential_fixed = self.fix_credential_references(workflow_data) fixes_applied['credential_fixed'] = credential_fixed workflow_data, env_fixed = self.fix_environment_variables(workflow_data) fixes_applied['environment_fixed'] = env_fixed # Save if any fixes were applied if any(fixes_applied.values()): with open(workflow_path, 'w', encoding='utf-8') as f: json.dump(workflow_data, f, indent=2, ensure_ascii=False) return { 'filename': workflow_path.name, 'fixed': any(fixes_applied.values()), 'fixes_applied': fixes_applied, 'workflow_name': workflow_data.get('name', 'Unnamed') } except Exception as e: return { 'filename': workflow_path.name, 'fixed': False, 'error': f"Security fix error: {str(e)}", 'workflow_name': 'Error' } def fix_all_workflows(self) -> Dict[str, Any]: """Fix security issues in all workflows""" print("šŸ›”ļø Applying advanced security fixes...") fix_results = [] for category_dir in self.workflows_dir.iterdir(): if category_dir.is_dir(): for workflow_file in category_dir.glob('*.json'): self.fix_stats['total_workflows'] += 1 result = self.fix_single_workflow(workflow_file) fix_results.append(result) if result['fixed']: self.fix_stats['fixed_workflows'] += 1 if result['fixes_applied']['sensitive_data_fixed']: self.fix_stats['sensitive_data_fixed'] += 1 if result['fixes_applied']['credential_fixed']: self.fix_stats['credential_placeholders_added'] += 1 print(f"āœ… Processed {self.fix_stats['total_workflows']} workflows") print(f"šŸ›”ļø Applied security fixes to {self.fix_stats['fixed_workflows']} workflows") print(f"šŸ” Sensitive data fixes: {self.fix_stats['sensitive_data_fixed']}") print(f"šŸ”‘ Credential fixes: {self.fix_stats['credential_placeholders_added']}") return { 'total_workflows': self.fix_stats['total_workflows'], 'fixed_workflows': self.fix_stats['fixed_workflows'], 'fix_stats': self.fix_stats, 'results': fix_results } def main(): """Main security fixing function""" fixer = AdvancedSecurityFixer() summary = fixer.fix_all_workflows() print(f"\nšŸŽ‰ Advanced security fixing complete!") print(f"šŸ“Š Fixed {summary['fixed_workflows']} out of {summary['total_workflows']} workflows") if __name__ == "__main__": main()