实现初步过滤功能以节省token费用

- 添加标题匹配过滤逻辑,支持关键词包含、排除和正则表达式匹配
- 添加基于Embedding的相似度过滤逻辑,使用轻量级文本相似度计算方法
- 在爬虫流程中集成两种过滤机制,在AI分析之前应用以减少不必要的API调用
- 更新任务配置模型,支持新的过滤参数配置
- 添加配置示例文件展示如何使用过滤功能

This commit implements the first step of the proposed solution to reduce token consumption by filtering items before AI analysis.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: rainsfly <dingyufei615@users.noreply.github.com>
This commit is contained in:
claude[bot]
2025-08-27 07:01:02 +00:00
parent 828a7b0ca1
commit 4f51c39cb3
6 changed files with 215 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
{
"任务名称": "测试任务-带embedding过滤",
"启用状态": true,
"搜索关键词": "MacBook Air",
"任务描述": "测试embedding过滤功能",
"最大爬取页数": 1,
"仅个人闲置": true,
"最低价格": "3000",
"最高价格": "8000",
"AI基础Prompt文件": "prompts/base_prompt.txt",
"AI评判标准文件": "prompts/criteria_prompt.txt",
"标题包含关键词": ["MacBook", "笔记本"],
"标题排除关键词": ["配件", "壳"],
"标题正则表达式": "MacBook.*Air",
"embedding_filter": {
"reference_texts": [
"MacBook Air M1芯片笔记本电脑",
"苹果 MacBook Air M1 2020款",
"Apple MacBook Air M1 轻薄本"
],
"threshold": 0.6
}
}

View File

@@ -15,3 +15,4 @@ qrcode
pytest
pytest-asyncio
coverage
sentence-transformers

111
src/embedding_filter.py Normal file
View File

@@ -0,0 +1,111 @@
import json
import os
from typing import List, Optional
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 全局变量存储embedding模型和配置
_embedding_model = None
_embedding_config = None
def initialize_embedding_filter():
"""
初始化embedding过滤器加载模型和配置
"""
global _embedding_model, _embedding_config
# 检查是否已经初始化
if _embedding_model is not None:
return
try:
# 加载embedding模型
print("正在加载embedding模型...")
_embedding_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
print("embedding模型加载完成。")
except Exception as e:
print(f"加载embedding模型时出错: {e}")
_embedding_model = None
def calculate_similarity(text1: str, text2: str) -> float:
"""
计算两个文本之间的余弦相似度
Args:
text1: 第一个文本
text2: 第二个文本
Returns:
float: 余弦相似度值 (0-1之间)
"""
global _embedding_model
if _embedding_model is None:
return 0.0
try:
# 生成文本的embedding向量
embeddings = _embedding_model.encode([text1, text2])
# 计算余弦相似度
similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
return float(similarity)
except Exception as e:
print(f"计算文本相似度时出错: {e}")
return 0.0
def filter_by_similarity(item_data: dict, task_config: dict) -> bool:
"""
根据embedding相似度过滤商品
Args:
item_data: 商品数据
task_config: 任务配置
Returns:
bool: True表示通过过滤False表示未通过过滤
"""
global _embedding_model
# 检查是否启用了embedding过滤
embedding_filter_config = task_config.get('embedding_filter')
if not embedding_filter_config or _embedding_model is None:
return True # 未启用过滤则直接通过
# 获取配置参数
reference_texts = embedding_filter_config.get('reference_texts', [])
threshold = embedding_filter_config.get('threshold', 0.5)
# 如果没有参考文本,则直接通过
if not reference_texts:
return True
# 获取商品标题
title = item_data.get('商品标题', '')
if not title:
return True # 没有标题则直接通过
# 计算商品标题与每个参考文本的相似度
max_similarity = 0.0
for ref_text in reference_texts:
similarity = calculate_similarity(title, ref_text)
if similarity > max_similarity:
max_similarity = similarity
# 如果已经超过了阈值,可以提前退出
if max_similarity >= threshold:
break
# 判断是否通过过滤
is_passed = max_similarity >= threshold
if not is_passed:
print(f" -> 商品 '{title[:30]}...' 未通过embedding相似度过滤 (相似度: {max_similarity:.3f}, 阈值: {threshold})")
return is_passed

View File

@@ -2,9 +2,11 @@ import asyncio
import json
import os
import random
import re
from datetime import datetime
from urllib.parse import urlencode
from src.embedding_filter import initialize_embedding_filter, filter_by_similarity
from playwright.async_api import (
Response,
TimeoutError as PlaywrightTimeoutError,
@@ -40,6 +42,7 @@ from src.utils import (
safe_get,
save_to_jsonl,
)
from src.embedding_filter import initialize_embedding_filter, filter_item_by_embedding
async def scrape_user_profile(context, user_id: str) -> dict:
@@ -149,6 +152,9 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
min_price = task_config.get('min_price')
max_price = task_config.get('max_price')
ai_prompt_text = task_config.get('ai_prompt_text', '')
# 初始化embedding过滤器
initialize_embedding_filter()
processed_item_count = 0
stop_scraping = False
@@ -174,6 +180,9 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
print(f"LOG: 输出文件 {output_filename} 不存在,将创建新文件。")
async with async_playwright() as p:
# 初始化embedding过滤器
await initialize_embedding_filter()
if LOGIN_IS_EDGE:
browser = await p.chromium.launch(headless=RUN_HEADLESS, channel="msedge")
else:
@@ -398,6 +407,65 @@ async def scrape_xianyu(task_config: dict, debug_limit: int = 0):
"卖家信息": user_profile_data
}
# --- START: 标题过滤逻辑 ---
def filter_item_by_title(item_data, task_config):
"""
根据标题过滤商品
"""
title = item_data.get('商品标题', '')
# 获取过滤配置
include_keywords = task_config.get('title_include_keywords', [])
exclude_keywords = task_config.get('title_exclude_keywords', [])
regex_pattern = task_config.get('title_regex_pattern')
# 如果没有配置过滤条件,不过滤
if not include_keywords and not exclude_keywords and not regex_pattern:
return True
# 包含关键词检查
if include_keywords:
include_match = False
for keyword in include_keywords:
if keyword in title:
include_match = True
break
if not include_match:
return False
# 排除关键词检查
if exclude_keywords:
for keyword in exclude_keywords:
if keyword in title:
return False
# 正则表达式检查
if regex_pattern:
try:
if not re.search(regex_pattern, title):
return False
except re.error:
# 正则表达式有误,跳过正则匹配
pass
return True
# 应用标题过滤
if not filter_item_by_title(item_data, task_config):
print(f" -> 商品 '{item_data['商品标题'][:30]}...' 未通过标题过滤跳过AI分析。")
processed_links.add(unique_key)
processed_item_count += 1
continue
# --- END: 标题过滤逻辑 ---
# --- START: Embedding相似度过滤逻辑 ---
if not filter_by_similarity(item_data, task_config):
print(f" -> 商品 '{item_data['商品标题'][:30]}...' 未通过Embedding相似度过滤跳过AI分析。")
processed_links.add(unique_key)
processed_item_count += 1
continue
# --- END: Embedding相似度过滤逻辑 ---
# --- START: Real-time AI Analysis & Notification ---
from src.config import SKIP_AI_ANALYSIS

View File

@@ -20,6 +20,9 @@ class Task(BaseModel):
ai_prompt_base_file: str
ai_prompt_criteria_file: str
is_running: Optional[bool] = False
title_include_keywords: Optional[list[str]] = None
title_exclude_keywords: Optional[list[str]] = None
title_regex_pattern: Optional[str] = None
class TaskUpdate(BaseModel):
@@ -35,6 +38,9 @@ class TaskUpdate(BaseModel):
ai_prompt_base_file: Optional[str] = None
ai_prompt_criteria_file: Optional[str] = None
is_running: Optional[bool] = None
title_include_keywords: Optional[list[str]] = None
title_exclude_keywords: Optional[list[str]] = None
title_regex_pattern: Optional[str] = None
async def add_task(task: Task) -> bool:

View File

@@ -37,6 +37,9 @@ class Task(BaseModel):
ai_prompt_base_file: str
ai_prompt_criteria_file: str
is_running: Optional[bool] = False
title_include_keywords: Optional[list[str]] = None
title_exclude_keywords: Optional[list[str]] = None
title_regex_pattern: Optional[str] = None
class TaskUpdate(BaseModel):
@@ -52,6 +55,9 @@ class TaskUpdate(BaseModel):
ai_prompt_base_file: Optional[str] = None
ai_prompt_criteria_file: Optional[str] = None
is_running: Optional[bool] = None
title_include_keywords: Optional[list[str]] = None
title_exclude_keywords: Optional[list[str]] = None
title_regex_pattern: Optional[str] = None
class TaskGenerateRequest(BaseModel):