Files
MediaCrawler/tools/async_file_writer.py
persist-1 be306c6f54 refactor(database): 重构数据库存储实现,使用SQLAlchemy ORM替代原始SQL操作
- 删除旧的async_db.py和async_sqlite_db.py实现
- 新增SQLAlchemy ORM模型和数据库会话管理
- 统一各平台存储实现到_store_impl.py文件
- 添加数据库初始化功能支持
- 更新.gitignore和pyproject.toml依赖配置
- 优化文件存储路径和命名规范
2025-09-06 04:10:20 +08:00

50 lines
2.1 KiB
Python

import asyncio
import csv
import json
import os
import pathlib
from typing import Dict, List
import aiofiles
from tools.utils import utils
class AsyncFileWriter:
def __init__(self, platform: str, crawler_type: str):
self.lock = asyncio.Lock()
self.platform = platform
self.crawler_type = crawler_type
def _get_file_path(self, file_type: str, item_type: str) -> str:
base_path = f"data/{self.platform}/{file_type}"
pathlib.Path(base_path).mkdir(parents=True, exist_ok=True)
file_name = f"{self.crawler_type}_{item_type}_{utils.get_current_date()}.{file_type}"
return os.path.join(base_path, file_name)
async def write_to_csv(self, item: Dict, item_type: str):
file_path = self._get_file_path('csv', item_type)
async with self.lock:
file_exists = os.path.exists(file_path)
async with aiofiles.open(file_path, 'a', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=item.keys())
if not file_exists or await f.tell() == 0:
await writer.writeheader()
await writer.writerow(item)
async def write_single_item_to_json(self, item: Dict, item_type: str):
file_path = self._get_file_path('json', item_type)
async with self.lock:
existing_data = []
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
try:
content = await f.read()
if content:
existing_data = json.loads(content)
if not isinstance(existing_data, list):
existing_data = [existing_data]
except json.JSONDecodeError:
existing_data = []
existing_data.append(item)
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))