Files
n8n-workflows/import_workflows.py
W9GFO f0bc7582c8 refactor(import): update module import to reflect renamed file - Changed import from 'categorize_workflows' to 'create_categories' - Maintains existing functionality with correct module reference - Resolves ModuleNotFoundError when running import_workflows.py
## Description
Updated the import statement in `import_workflows.py` to reference the renamed module `create_categories.py`, resolving a `ModuleNotFoundError` that occurred due to the missing reference to the old module name (`categorize_workflows`).

## Changes
- **Updated Import**: Changed `from categorize_workflows import categorize_by_filename` to `from create_categories import categorize_by_filename` in `import_workflows.py`[3](@ref).

## Related Issue
Closes #[Insert_Issue_Number_Here] (if applicable; otherwise, omit this section)

## Testing
- [x] Tested locally to ensure the script runs without import errors.
- [x] Verified that the `categorize_by_filename` function is accessible and functional.

## Impact
This change ensures the import statement correctly references the existing module after its rename, maintaining functionality without introducing breaking changes[1](@ref).
2025-09-09 17:00:33 +08:00

205 lines
7.1 KiB
Python

#!/usr/bin/env python3
"""
N8N Workflow Importer
Python replacement for import-workflows.sh with better error handling and progress tracking.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import List, Dict, Any
from create_categories import categorize_by_filename
def load_categories():
"""Load the search categories file."""
try:
with open('context/search_categories.json', 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_categories(data):
"""Save the search categories file."""
with open('context/search_categories.json', 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
class WorkflowImporter:
"""Import n8n workflows with progress tracking and error handling."""
def __init__(self, workflows_dir: str = "workflows"):
self.workflows_dir = Path(workflows_dir)
self.imported_count = 0
self.failed_count = 0
self.errors = []
def validate_workflow(self, file_path: Path) -> bool:
"""Validate workflow JSON before import."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Basic validation
if not isinstance(data, dict):
return False
# Check required fields
required_fields = ['nodes', 'connections']
for field in required_fields:
if field not in data:
return False
return True
except (json.JSONDecodeError, FileNotFoundError, PermissionError):
return False
def import_workflow(self, file_path: Path) -> bool:
"""Import a single workflow file."""
try:
# Validate first
if not self.validate_workflow(file_path):
self.errors.append(f"Invalid JSON: {file_path.name}")
return False
# Run n8n import command
result = subprocess.run([
'npx', 'n8n', 'import:workflow',
f'--input={file_path}'
], capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"✅ Imported: {file_path.name}")
# Categorize the workflow and update search_categories.json
suggested_category = categorize_by_filename(file_path.name)
all_workflows_data = load_categories()
found = False
for workflow_entry in all_workflows_data:
if workflow_entry.get('filename') == file_path.name:
workflow_entry['category'] = suggested_category
found = True
break
if not found:
# Add new workflow entry if not found (e.g., first import)
all_workflows_data.append({
"filename": file_path.name,
"category": suggested_category,
"name": file_path.stem, # Assuming workflow name is filename without extension
"description": "", # Placeholder, can be updated manually
"nodes": [] # Placeholder, can be updated manually
})
save_categories(all_workflows_data)
print(f" Categorized '{file_path.name}' as '{suggested_category or 'Uncategorized'}'")
return True
else:
error_msg = result.stderr.strip() or result.stdout.strip()
self.errors.append(f"Import failed for {file_path.name}: {error_msg}")
print(f"❌ Failed: {file_path.name}")
return False
except subprocess.TimeoutExpired:
self.errors.append(f"Timeout importing {file_path.name}")
print(f"⏰ Timeout: {file_path.name}")
return False
except Exception as e:
self.errors.append(f"Error importing {file_path.name}: {str(e)}")
print(f"❌ Error: {file_path.name} - {str(e)}")
return False
def get_workflow_files(self) -> List[Path]:
"""Get all workflow JSON files."""
if not self.workflows_dir.exists():
print(f"❌ Workflows directory not found: {self.workflows_dir}")
return []
json_files = list(self.workflows_dir.glob("*.json"))
if not json_files:
print(f"❌ No JSON files found in: {self.workflows_dir}")
return []
return sorted(json_files)
def import_all(self) -> Dict[str, Any]:
"""Import all workflow files."""
workflow_files = self.get_workflow_files()
total_files = len(workflow_files)
if total_files == 0:
return {"success": False, "message": "No workflow files found"}
print(f"🚀 Starting import of {total_files} workflows...")
print("-" * 50)
for i, file_path in enumerate(workflow_files, 1):
print(f"[{i}/{total_files}] Processing {file_path.name}...")
if self.import_workflow(file_path):
self.imported_count += 1
else:
self.failed_count += 1
# Summary
print("\n" + "=" * 50)
print(f"📊 Import Summary:")
print(f"✅ Successfully imported: {self.imported_count}")
print(f"❌ Failed imports: {self.failed_count}")
print(f"📁 Total files: {total_files}")
if self.errors:
print(f"\n❌ Errors encountered:")
for error in self.errors[:10]: # Show first 10 errors
print(f"{error}")
if len(self.errors) > 10:
print(f" ... and {len(self.errors) - 10} more errors")
return {
"success": self.failed_count == 0,
"imported": self.imported_count,
"failed": self.failed_count,
"total": total_files,
"errors": self.errors
}
def check_n8n_available() -> bool:
"""Check if n8n CLI is available."""
try:
result = subprocess.run(
['npx', 'n8n', '--version'],
capture_output=True, text=True, timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def main():
"""Main entry point."""
sys.stdout.reconfigure(encoding='utf-8')
print("🔧 N8N Workflow Importer")
print("=" * 40)
# Check if n8n is available
if not check_n8n_available():
print("❌ n8n CLI not found. Please install n8n first:")
print(" npm install -g n8n")
sys.exit(1)
# Create importer and run
importer = WorkflowImporter()
result = importer.import_all()
# Exit with appropriate code
sys.exit(0 if result["success"] else 1)
if __name__ == "__main__":
main()