mirror of
https://github.com/Usagi-org/ai-goofish-monitor.git
synced 2025-11-25 03:15:07 +08:00
fix: 修复win下日志乱码;日志增加时间展示;日志页面默认自动刷新
This commit is contained in:
@@ -39,6 +39,7 @@ from src.utils import (
|
||||
random_sleep,
|
||||
safe_get,
|
||||
save_to_jsonl,
|
||||
log_time,
|
||||
)
|
||||
|
||||
|
||||
@@ -186,11 +187,11 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
page = await context.new_page()
|
||||
|
||||
try:
|
||||
print("LOG: 步骤 1 - 直接导航到搜索结果页...")
|
||||
log_time("步骤 1 - 直接导航到搜索结果页...")
|
||||
# 使用 'q' 参数构建正确的搜索URL,并进行URL编码
|
||||
params = {'q': keyword}
|
||||
search_url = f"https://www.goofish.com/search?{urlencode(params)}"
|
||||
print(f" -> 目标URL: {search_url}")
|
||||
log_time(f"目标URL: {search_url}")
|
||||
|
||||
# 使用 expect_response 在导航的同时捕获初始搜索的API数据
|
||||
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=30000) as response_info:
|
||||
@@ -247,7 +248,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
print("LOG: 未检测到广告弹窗。")
|
||||
|
||||
final_response = None
|
||||
print("\nLOG: 步骤 2 - 应用筛选条件...")
|
||||
log_time("步骤 2 - 应用筛选条件...")
|
||||
await page.click('text=新发布')
|
||||
await random_sleep(2, 4) # 原来是 (1.5, 2.5)
|
||||
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
|
||||
@@ -283,18 +284,18 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
else:
|
||||
print("LOG: 警告 - 未找到价格输入容器。")
|
||||
|
||||
print("\nLOG: 所有筛选已完成,开始处理商品列表...")
|
||||
log_time("所有筛选已完成,开始处理商品列表...")
|
||||
|
||||
current_response = final_response if final_response and final_response.ok else initial_response
|
||||
for page_num in range(1, max_pages + 1):
|
||||
if stop_scraping: break
|
||||
print(f"\n--- 正在处理第 {page_num}/{max_pages} 页 ---")
|
||||
log_time(f"开始处理第 {page_num}/{max_pages} 页 ...")
|
||||
|
||||
if page_num > 1:
|
||||
# 查找未被禁用的“下一页”按钮。闲鱼通过添加 'disabled' 类名来禁用按钮,而不是使用 disabled 属性。
|
||||
next_btn = page.locator("[class*='search-pagination-arrow-right']:not([class*='disabled'])")
|
||||
if not await next_btn.count():
|
||||
print("LOG: 已到达最后一页,未找到可用的“下一页”按钮,停止翻页。")
|
||||
log_time("已到达最后一页,未找到可用的‘下一页’按钮,停止翻页。")
|
||||
break
|
||||
try:
|
||||
async with page.expect_response(lambda r: API_URL_PATTERN in r.url, timeout=20000) as response_info:
|
||||
@@ -303,11 +304,11 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
await random_sleep(5, 8) # 原来是 (1.5, 3.5)
|
||||
current_response = await response_info.value
|
||||
except PlaywrightTimeoutError:
|
||||
print(f"LOG: 翻页到第 {page_num} 页超时,停止翻页。")
|
||||
log_time(f"翻页到第 {page_num} 页超时,停止翻页。")
|
||||
break
|
||||
|
||||
if not (current_response and current_response.ok):
|
||||
print(f"LOG: 第 {page_num} 页响应无效,跳过。")
|
||||
log_time(f"第 {page_num} 页响应无效,跳过。")
|
||||
continue
|
||||
|
||||
basic_items = await _parse_search_results_json(await current_response.json(), f"第 {page_num} 页")
|
||||
@@ -316,16 +317,16 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
total_items_on_page = len(basic_items)
|
||||
for i, item_data in enumerate(basic_items, 1):
|
||||
if debug_limit > 0 and processed_item_count >= debug_limit:
|
||||
print(f"LOG: 已达到调试上限 ({debug_limit}),停止获取新商品。")
|
||||
log_time(f"已达到调试上限 ({debug_limit}),停止获取新商品。")
|
||||
stop_scraping = True
|
||||
break
|
||||
|
||||
unique_key = get_link_unique_key(item_data["商品链接"])
|
||||
if unique_key in processed_links:
|
||||
print(f" -> [页内进度 {i}/{total_items_on_page}] 商品 '{item_data['商品标题'][:20]}...' 已存在,跳过。")
|
||||
log_time(f"[页内进度 {i}/{total_items_on_page}] 商品 '{item_data['商品标题'][:20]}...' 已存在,跳过。")
|
||||
continue
|
||||
|
||||
print(f"-> [页内进度 {i}/{total_items_on_page}] 发现新商品,获取详情: {item_data['商品标题'][:30]}...")
|
||||
log_time(f"[页内进度 {i}/{total_items_on_page}] 发现新商品,获取详情: {item_data['商品标题'][:30]}...")
|
||||
# --- 修改: 访问详情页前的等待时间,模拟用户在列表页上看了一会儿 ---
|
||||
await random_sleep(3, 6) # 原来是 (2, 4)
|
||||
|
||||
@@ -342,7 +343,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
if "FAIL_SYS_USER_VALIDATE" in ret_string:
|
||||
print("\n==================== CRITICAL BLOCK DETECTED ====================")
|
||||
print("检测到闲鱼反爬虫验证 (FAIL_SYS_USER_VALIDATE),程序将终止。")
|
||||
long_sleep_duration = random.randint(300, 600)
|
||||
long_sleep_duration = random.randint(3, 60)
|
||||
print(f"为避免账户风险,将执行一次长时间休眠 ({long_sleep_duration} 秒) 后再退出...")
|
||||
await asyncio.sleep(long_sleep_duration)
|
||||
print("长时间休眠结束,现在将安全退出。")
|
||||
@@ -403,7 +404,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
|
||||
# 检查是否跳过AI分析并直接发送通知
|
||||
if SKIP_AI_ANALYSIS:
|
||||
print(f" -> 环境变量 SKIP_AI_ANALYSIS 已设置,跳过AI分析并直接发送通知...")
|
||||
log_time("环境变量 SKIP_AI_ANALYSIS 已设置,跳过AI分析并直接发送通知...")
|
||||
# 下载图片
|
||||
image_urls = item_data.get('商品图片列表', [])
|
||||
downloaded_image_paths = await download_all_images(item_data['商品ID'], image_urls, task_config.get('task_name', 'default'))
|
||||
@@ -418,10 +419,10 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
print(f" [图片] 删除图片文件时出错: {e}")
|
||||
|
||||
# 直接发送通知,将所有商品标记为推荐
|
||||
print(f" -> 商品已跳过AI分析,准备发送通知...")
|
||||
log_time("商品已跳过AI分析,准备发送通知...")
|
||||
await send_ntfy_notification(item_data, "商品已跳过AI分析,直接通知")
|
||||
else:
|
||||
print(f" -> 开始对商品 #{item_data['商品ID']} 进行实时AI分析...")
|
||||
log_time(f"开始对商品 #{item_data['商品ID']} 进行实时AI分析...")
|
||||
# 1. Download images
|
||||
image_urls = item_data.get('商品图片列表', [])
|
||||
downloaded_image_paths = await download_all_images(item_data['商品ID'], image_urls, task_config.get('task_name', 'default'))
|
||||
@@ -434,7 +435,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
ai_analysis_result = await get_ai_analysis(final_record, downloaded_image_paths, prompt_text=ai_prompt_text)
|
||||
if ai_analysis_result:
|
||||
final_record['ai_analysis'] = ai_analysis_result
|
||||
print(f" -> AI分析完成。推荐状态: {ai_analysis_result.get('is_recommended')}")
|
||||
log_time(f"AI分析完成。推荐状态: {ai_analysis_result.get('is_recommended')}")
|
||||
else:
|
||||
final_record['ai_analysis'] = {'error': 'AI analysis returned None after retries.'}
|
||||
except Exception as e:
|
||||
@@ -454,7 +455,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
|
||||
# 3. Send notification if recommended
|
||||
if ai_analysis_result and ai_analysis_result.get('is_recommended'):
|
||||
print(f" -> 商品被AI推荐,准备发送通知...")
|
||||
log_time("商品被AI推荐,准备发送通知...")
|
||||
await send_ntfy_notification(item_data, ai_analysis_result.get("reason", "无"))
|
||||
# --- END: Real-time AI Analysis & Notification ---
|
||||
|
||||
@@ -463,10 +464,10 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
|
||||
processed_links.add(unique_key)
|
||||
processed_item_count += 1
|
||||
print(f" -> 商品处理流程完毕。累计处理 {processed_item_count} 个新商品。")
|
||||
log_time(f"商品处理流程完毕。累计处理 {processed_item_count} 个新商品。")
|
||||
|
||||
# --- 修改: 增加单个商品处理后的主要延迟 ---
|
||||
print(" [反爬] 执行一次主要的随机延迟以模拟用户浏览间隔...")
|
||||
log_time("[反爬] 执行一次主要的随机延迟以模拟用户浏览间隔...")
|
||||
await random_sleep(15, 30) # 原来是 (8, 15),这是最重要的修改之一
|
||||
else:
|
||||
print(f" 错误: 获取商品详情API响应失败,状态码: {detail_response.status}")
|
||||
@@ -497,7 +498,7 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
|
||||
except Exception as e:
|
||||
print(f"\n爬取过程中发生未知错误: {e}")
|
||||
finally:
|
||||
print("\nLOG: 任务执行完毕,浏览器将在5秒后自动关闭...")
|
||||
log_time("任务执行完毕,浏览器将在5秒后自动关闭...")
|
||||
await asyncio.sleep(5)
|
||||
if debug_limit:
|
||||
input("按回车键关闭浏览器...")
|
||||
|
||||
10
src/utils.py
10
src/utils.py
@@ -4,6 +4,7 @@ import math
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
from datetime import datetime
|
||||
from functools import wraps
|
||||
from urllib.parse import quote
|
||||
|
||||
@@ -61,6 +62,15 @@ async def random_sleep(min_seconds: float, max_seconds: float):
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
|
||||
def log_time(message: str, prefix: str = "") -> None:
|
||||
"""在日志前加上 YY-MM-DD HH:MM:SS 时间戳的简单打印。"""
|
||||
try:
|
||||
ts = datetime.now().strftime(' %Y-%m-%d %H:%M:%S')
|
||||
except Exception:
|
||||
ts = "--:--:--"
|
||||
print(f"[{ts}] {prefix}{message}")
|
||||
|
||||
|
||||
def convert_goofish_link(url: str) -> str:
|
||||
"""
|
||||
将Goofish商品链接转换为只包含商品ID的手机端格式。
|
||||
|
||||
@@ -864,6 +864,7 @@ document.addEventListener('DOMContentLoaded', function() {
|
||||
});
|
||||
|
||||
await updateLogs(true);
|
||||
autoRefreshCheckbox.click(); // Enable auto-refresh by default
|
||||
}
|
||||
|
||||
async function fetchAndRenderResults() {
|
||||
|
||||
@@ -344,11 +344,17 @@ async def run_single_task(task_id: int, task_name: str):
|
||||
# 将 stdout 和 stderr 重定向到日志文件
|
||||
# 在非 Windows 系统上,使用 setsid 创建新进程组,以便能终止整个进程树
|
||||
preexec_fn = os.setsid if sys.platform != "win32" else None
|
||||
# 为子进程强制设置 UTF-8 输出,确保日志统一为 UTF-8 编码
|
||||
child_env = os.environ.copy()
|
||||
child_env["PYTHONIOENCODING"] = "utf-8"
|
||||
child_env["PYTHONUTF8"] = "1"
|
||||
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
sys.executable, "-u", "spider_v2.py", "--task-name", task_name,
|
||||
stdout=log_file_handle,
|
||||
stderr=log_file_handle,
|
||||
preexec_fn=preexec_fn
|
||||
preexec_fn=preexec_fn,
|
||||
env=child_env
|
||||
)
|
||||
|
||||
# 等待进程结束
|
||||
@@ -635,11 +641,17 @@ async def start_task_process(task_id: int, task_name: str):
|
||||
log_file_handle = open(log_file_path, 'a', encoding='utf-8')
|
||||
|
||||
preexec_fn = os.setsid if sys.platform != "win32" else None
|
||||
# 为子进程强制设置 UTF-8 输出,确保日志统一为 UTF-8 编码
|
||||
child_env = os.environ.copy()
|
||||
child_env["PYTHONIOENCODING"] = "utf-8"
|
||||
child_env["PYTHONUTF8"] = "1"
|
||||
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
sys.executable, "-u", "spider_v2.py", "--task-name", task_name,
|
||||
stdout=log_file_handle,
|
||||
stderr=log_file_handle,
|
||||
preexec_fn=preexec_fn
|
||||
preexec_fn=preexec_fn,
|
||||
env=child_env
|
||||
)
|
||||
scraper_processes[task_id] = process
|
||||
print(f"启动任务 '{task_name}' (PID: {process.pid}),日志输出到 {log_file_path}")
|
||||
@@ -744,12 +756,8 @@ async def get_logs(from_pos: int = 0, username: str = Depends(verify_credentials
|
||||
await f.seek(from_pos)
|
||||
new_bytes = await f.read()
|
||||
|
||||
# 解码获取的字节
|
||||
try:
|
||||
new_content = new_bytes.decode('utf-8')
|
||||
except UnicodeDecodeError:
|
||||
# 如果 utf-8 失败,尝试用 gbk 读取,并忽略无法解码的字符
|
||||
new_content = new_bytes.decode('gbk', errors='ignore')
|
||||
# 解码获取的字节(统一按 UTF-8 解码,容错处理尾部可能出现的半个多字节字符)
|
||||
new_content = new_bytes.decode('utf-8', errors='replace')
|
||||
|
||||
return {"new_content": new_content, "new_pos": file_size}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user