import asyncio import csv import json import os import pathlib from typing import Dict, List import aiofiles import config from tools.utils import utils from tools.words import AsyncWordCloudGenerator class AsyncFileWriter: def __init__(self, platform: str, crawler_type: str): self.lock = asyncio.Lock() self.platform = platform self.crawler_type = crawler_type self.wordcloud_generator = AsyncWordCloudGenerator() if config.ENABLE_GET_WORDCLOUD else None def _get_file_path(self, file_type: str, item_type: str) -> str: base_path = f"data/{self.platform}/{file_type}" pathlib.Path(base_path).mkdir(parents=True, exist_ok=True) file_name = f"{self.crawler_type}_{item_type}_{utils.get_current_date()}.{file_type}" return f"{base_path}/{file_name}" async def write_to_csv(self, item: Dict, item_type: str): file_path = self._get_file_path('csv', item_type) async with self.lock: file_exists = os.path.exists(file_path) async with aiofiles.open(file_path, 'a', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=item.keys()) if not file_exists or await f.tell() == 0: await writer.writeheader() await writer.writerow(item) async def write_single_item_to_json(self, item: Dict, item_type: str): file_path = self._get_file_path('json', item_type) async with self.lock: existing_data = [] if os.path.exists(file_path) and os.path.getsize(file_path) > 0: async with aiofiles.open(file_path, 'r', encoding='utf-8') as f: try: content = await f.read() if content: existing_data = json.loads(content) if not isinstance(existing_data, list): existing_data = [existing_data] except json.JSONDecodeError: existing_data = [] existing_data.append(item) async with aiofiles.open(file_path, 'w', encoding='utf-8') as f: await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4)) async def generate_wordcloud_from_comments(self): """ Generate wordcloud from comments data Only works when ENABLE_GET_WORDCLOUD and ENABLE_GET_COMMENTS are True """ if not config.ENABLE_GET_WORDCLOUD or not config.ENABLE_GET_COMMENTS: return if not self.wordcloud_generator: return try: # Read comments from JSON file comments_file_path = self._get_file_path('json', 'comments') if not os.path.exists(comments_file_path) or os.path.getsize(comments_file_path) == 0: utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No comments file found at {comments_file_path}") return async with aiofiles.open(comments_file_path, 'r', encoding='utf-8') as f: content = await f.read() if not content: utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Comments file is empty") return comments_data = json.loads(content) if not isinstance(comments_data, list): comments_data = [comments_data] # Filter comments data to only include 'content' field # Handle different comment data structures across platforms filtered_data = [] for comment in comments_data: if isinstance(comment, dict): # Try different possible content field names content_text = comment.get('content') or comment.get('comment_text') or comment.get('text') or '' if content_text: filtered_data.append({'content': content_text}) if not filtered_data: utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No valid comment content found") return # Generate wordcloud words_base_path = f"data/{self.platform}/words" pathlib.Path(words_base_path).mkdir(parents=True, exist_ok=True) words_file_prefix = f"{words_base_path}/{self.crawler_type}_comments_{utils.get_current_date()}" utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Generating wordcloud from {len(filtered_data)} comments") await self.wordcloud_generator.generate_word_frequency_and_cloud(filtered_data, words_file_prefix) utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Wordcloud generated successfully at {words_file_prefix}") except Exception as e: utils.logger.error(f"[AsyncFileWriter.generate_wordcloud_from_comments] Error generating wordcloud: {e}")