mirror of
https://github.com/NanmiCoder/MediaCrawler.git
synced 2025-11-25 03:15:17 +08:00
Compare commits
2 Commits
45ec4b433a
...
05a1782746
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
05a1782746 | ||
|
|
ef6948b305 |
@@ -55,7 +55,7 @@ CUSTOM_BROWSER_PATH = ""
|
||||
CDP_HEADLESS = False
|
||||
|
||||
# 浏览器启动超时时间(秒)
|
||||
BROWSER_LAUNCH_TIMEOUT = 30
|
||||
BROWSER_LAUNCH_TIMEOUT = 60
|
||||
|
||||
# 是否在程序结束时自动关闭浏览器
|
||||
# 设置为False可以保持浏览器运行,便于调试
|
||||
|
||||
@@ -42,4 +42,19 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "datab
|
||||
|
||||
sqlite_db_config = {
|
||||
"db_path": SQLITE_DB_PATH
|
||||
}
|
||||
|
||||
# mongodb config
|
||||
MONGODB_HOST = os.getenv("MONGODB_HOST", "localhost")
|
||||
MONGODB_PORT = os.getenv("MONGODB_PORT", 27017)
|
||||
MONGODB_USER = os.getenv("MONGODB_USER", "")
|
||||
MONGODB_PWD = os.getenv("MONGODB_PWD", "")
|
||||
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME", "media_crawler")
|
||||
|
||||
mongodb_config = {
|
||||
"host": MONGODB_HOST,
|
||||
"port": int(MONGODB_PORT),
|
||||
"user": MONGODB_USER,
|
||||
"password": MONGODB_PWD,
|
||||
"db_name": MONGODB_DB_NAME,
|
||||
}
|
||||
@@ -497,11 +497,12 @@ class BilibiliCrawler(AbstractCrawler):
|
||||
"height": 1080
|
||||
},
|
||||
user_agent=user_agent,
|
||||
channel="chrome", # 使用系统的Chrome稳定版
|
||||
)
|
||||
return browser_context
|
||||
else:
|
||||
# type: ignore
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy)
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome")
|
||||
browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
|
||||
return browser_context
|
||||
|
||||
|
||||
@@ -333,10 +333,11 @@ class KuaishouCrawler(AbstractCrawler):
|
||||
proxy=playwright_proxy, # type: ignore
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
user_agent=user_agent,
|
||||
channel="chrome", # 使用系统的Chrome稳定版
|
||||
)
|
||||
return browser_context
|
||||
else:
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
|
||||
browser_context = await browser.new_context(
|
||||
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
|
||||
)
|
||||
|
||||
@@ -614,10 +614,11 @@ class TieBaCrawler(AbstractCrawler):
|
||||
proxy=playwright_proxy, # type: ignore
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
user_agent=user_agent,
|
||||
channel="chrome", # 使用系统的Chrome稳定版
|
||||
)
|
||||
return browser_context
|
||||
else:
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
|
||||
browser_context = await browser.new_context(
|
||||
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
|
||||
)
|
||||
|
||||
@@ -343,10 +343,11 @@ class WeiboCrawler(AbstractCrawler):
|
||||
"height": 1080
|
||||
},
|
||||
user_agent=user_agent,
|
||||
channel="chrome", # 使用系统的Chrome稳定版
|
||||
)
|
||||
return browser_context
|
||||
else:
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
|
||||
browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
|
||||
return browser_context
|
||||
|
||||
|
||||
@@ -429,10 +429,11 @@ class ZhihuCrawler(AbstractCrawler):
|
||||
proxy=playwright_proxy, # type: ignore
|
||||
viewport={"width": 1920, "height": 1080},
|
||||
user_agent=user_agent,
|
||||
channel="chrome", # 使用系统的Chrome稳定版
|
||||
)
|
||||
return browser_context
|
||||
else:
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
|
||||
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
|
||||
browser_context = await browser.new_context(
|
||||
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
|
||||
)
|
||||
|
||||
@@ -16,6 +16,7 @@ dependencies = [
|
||||
"httpx==0.28.1",
|
||||
"jieba==0.42.1",
|
||||
"matplotlib==3.9.0",
|
||||
"motor>=3.3.0",
|
||||
"opencv-python>=4.11.0.86",
|
||||
"pandas==2.2.3",
|
||||
"parsel==1.9.1",
|
||||
|
||||
@@ -24,3 +24,4 @@ cryptography>=45.0.7
|
||||
alembic>=1.16.5
|
||||
asyncmy>=0.2.10
|
||||
sqlalchemy>=2.0.43
|
||||
motor>=3.3.0
|
||||
|
||||
@@ -28,13 +28,14 @@ class BiliStoreFactory:
|
||||
"db": BiliDbStoreImplement,
|
||||
"json": BiliJsonStoreImplement,
|
||||
"sqlite": BiliSqliteStoreImplement,
|
||||
"mongodb": BiliMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_store() -> AbstractStore:
|
||||
store_class = BiliStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError("[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
raise ValueError("[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from database.models import BilibiliVideoComment, BilibiliVideo, BilibiliUpInfo,
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from tools import utils, words
|
||||
from var import crawler_type_var
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
|
||||
class BiliCsvStoreImplement(AbstractStore):
|
||||
@@ -297,3 +298,61 @@ class BiliJsonStoreImplement(AbstractStore):
|
||||
|
||||
class BiliSqliteStoreImplement(BiliDbStoreImplement):
|
||||
pass
|
||||
|
||||
|
||||
class BiliMongoStoreImplement(AbstractStore):
|
||||
"""B站MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="bilibili")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储视频内容到MongoDB
|
||||
Args:
|
||||
content_item: 视频内容数据
|
||||
"""
|
||||
video_id = content_item.get("video_id")
|
||||
if not video_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"video_id": video_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[BiliMongoStoreImplement.store_content] Saved video {video_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[BiliMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储UP主信息到MongoDB
|
||||
Args:
|
||||
creator_item: UP主数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[BiliMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
|
||||
@@ -27,13 +27,14 @@ class DouyinStoreFactory:
|
||||
"db": DouyinDbStoreImplement,
|
||||
"json": DouyinJsonStoreImplement,
|
||||
"sqlite": DouyinSqliteStoreImplement,
|
||||
"mongodb": DouyinMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_store() -> AbstractStore:
|
||||
store_class = DouyinStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError("[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
raise ValueError("[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from database.models import DouyinAweme, DouyinAwemeComment, DyCreator
|
||||
from tools import utils, words
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from var import crawler_type_var
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
|
||||
class DouyinCsvStoreImplement(AbstractStore):
|
||||
@@ -195,4 +196,62 @@ class DouyinJsonStoreImplement(AbstractStore):
|
||||
|
||||
|
||||
class DouyinSqliteStoreImplement(DouyinDbStoreImplement):
|
||||
pass
|
||||
pass
|
||||
|
||||
|
||||
class DouyinMongoStoreImplement(AbstractStore):
|
||||
"""抖音MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="douyin")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储视频内容到MongoDB
|
||||
Args:
|
||||
content_item: 视频内容数据
|
||||
"""
|
||||
aweme_id = content_item.get("aweme_id")
|
||||
if not aweme_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"aweme_id": aweme_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[DouyinMongoStoreImplement.store_content] Saved aweme {aweme_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[DouyinMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[DouyinMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
@@ -26,7 +26,8 @@ class KuaishouStoreFactory:
|
||||
"csv": KuaishouCsvStoreImplement,
|
||||
"db": KuaishouDbStoreImplement,
|
||||
"json": KuaishouJsonStoreImplement,
|
||||
"sqlite": KuaishouSqliteStoreImplement
|
||||
"sqlite": KuaishouSqliteStoreImplement,
|
||||
"mongodb": KuaishouMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@@ -34,7 +35,7 @@ class KuaishouStoreFactory:
|
||||
store_class = KuaishouStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError(
|
||||
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from database.db_session import get_session
|
||||
from database.models import KuaishouVideo, KuaishouVideoComment
|
||||
from tools import utils, words
|
||||
from var import crawler_type_var
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
|
||||
def calculate_number_of_files(file_store_path: str) -> int:
|
||||
@@ -157,4 +158,62 @@ class KuaishouJsonStoreImplement(AbstractStore):
|
||||
|
||||
class KuaishouSqliteStoreImplement(KuaishouDbStoreImplement):
|
||||
async def store_creator(self, creator: Dict):
|
||||
pass
|
||||
pass
|
||||
|
||||
|
||||
class KuaishouMongoStoreImplement(AbstractStore):
|
||||
"""快手MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="kuaishou")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储视频内容到MongoDB
|
||||
Args:
|
||||
content_item: 视频内容数据
|
||||
"""
|
||||
video_id = content_item.get("video_id")
|
||||
if not video_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"video_id": video_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[KuaishouMongoStoreImplement.store_content] Saved video {video_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[KuaishouMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[KuaishouMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
173
store/mongodb_store_base.py
Normal file
173
store/mongodb_store_base.py
Normal file
@@ -0,0 +1,173 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
MongoDB存储基类
|
||||
提供MongoDB连接管理和通用存储方法
|
||||
"""
|
||||
import asyncio
|
||||
from typing import Dict, List, Optional
|
||||
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
|
||||
from config import db_config
|
||||
from tools import utils
|
||||
|
||||
|
||||
class MongoDBConnection:
|
||||
"""MongoDB连接管理单例类"""
|
||||
_instance = None
|
||||
_client: Optional[AsyncIOMotorClient] = None
|
||||
_db: Optional[AsyncIOMotorDatabase] = None
|
||||
_lock = asyncio.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(MongoDBConnection, cls).__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
async def get_client(self) -> AsyncIOMotorClient:
|
||||
"""获取MongoDB客户端"""
|
||||
if self._client is None:
|
||||
async with self._lock:
|
||||
if self._client is None:
|
||||
await self._connect()
|
||||
return self._client
|
||||
|
||||
async def get_db(self) -> AsyncIOMotorDatabase:
|
||||
"""获取MongoDB数据库"""
|
||||
if self._db is None:
|
||||
async with self._lock:
|
||||
if self._db is None:
|
||||
await self._connect()
|
||||
return self._db
|
||||
|
||||
async def _connect(self):
|
||||
"""建立MongoDB连接"""
|
||||
try:
|
||||
mongo_config = db_config.mongodb_config
|
||||
host = mongo_config.get("host", "localhost")
|
||||
port = mongo_config.get("port", 27017)
|
||||
user = mongo_config.get("user", "")
|
||||
password = mongo_config.get("password", "")
|
||||
db_name = mongo_config.get("db_name", "media_crawler")
|
||||
|
||||
# 构建连接URL
|
||||
if user and password:
|
||||
connection_url = f"mongodb://{user}:{password}@{host}:{port}/"
|
||||
else:
|
||||
connection_url = f"mongodb://{host}:{port}/"
|
||||
|
||||
self._client = AsyncIOMotorClient(connection_url, serverSelectionTimeoutMS=5000)
|
||||
# 测试连接
|
||||
await self._client.server_info()
|
||||
self._db = self._client[db_name]
|
||||
utils.logger.info(f"[MongoDBConnection] Successfully connected to MongoDB at {host}:{port}, database: {db_name}")
|
||||
except Exception as e:
|
||||
utils.logger.error(f"[MongoDBConnection] Failed to connect to MongoDB: {e}")
|
||||
raise
|
||||
|
||||
async def close(self):
|
||||
"""关闭MongoDB连接"""
|
||||
if self._client is not None:
|
||||
self._client.close()
|
||||
self._client = None
|
||||
self._db = None
|
||||
utils.logger.info("[MongoDBConnection] MongoDB connection closed")
|
||||
|
||||
|
||||
class MongoDBStoreBase:
|
||||
"""MongoDB存储基类"""
|
||||
|
||||
def __init__(self, collection_prefix: str):
|
||||
"""
|
||||
初始化MongoDB存储基类
|
||||
Args:
|
||||
collection_prefix: 集合名称前缀(如:xhs, douyin, bilibili等)
|
||||
"""
|
||||
self.collection_prefix = collection_prefix
|
||||
self._connection = MongoDBConnection()
|
||||
|
||||
async def get_collection(self, collection_suffix: str) -> AsyncIOMotorCollection:
|
||||
"""
|
||||
获取MongoDB集合
|
||||
Args:
|
||||
collection_suffix: 集合名称后缀(如:contents, comments, creators)
|
||||
Returns:
|
||||
MongoDB集合对象
|
||||
"""
|
||||
db = await self._connection.get_db()
|
||||
collection_name = f"{self.collection_prefix}_{collection_suffix}"
|
||||
return db[collection_name]
|
||||
|
||||
async def save_or_update(self, collection_suffix: str, query: Dict, data: Dict) -> bool:
|
||||
"""
|
||||
保存或更新数据(upsert操作)
|
||||
Args:
|
||||
collection_suffix: 集合名称后缀
|
||||
query: 查询条件
|
||||
data: 要保存的数据
|
||||
Returns:
|
||||
是否成功
|
||||
"""
|
||||
try:
|
||||
collection = await self.get_collection(collection_suffix)
|
||||
result = await collection.update_one(
|
||||
query,
|
||||
{"$set": data},
|
||||
upsert=True
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
utils.logger.error(f"[MongoDBStoreBase.save_or_update] Failed to save data to {self.collection_prefix}_{collection_suffix}: {e}")
|
||||
return False
|
||||
|
||||
async def find_one(self, collection_suffix: str, query: Dict) -> Optional[Dict]:
|
||||
"""
|
||||
查询单条数据
|
||||
Args:
|
||||
collection_suffix: 集合名称后缀
|
||||
query: 查询条件
|
||||
Returns:
|
||||
查询结果
|
||||
"""
|
||||
try:
|
||||
collection = await self.get_collection(collection_suffix)
|
||||
result = await collection.find_one(query)
|
||||
return result
|
||||
except Exception as e:
|
||||
utils.logger.error(f"[MongoDBStoreBase.find_one] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}")
|
||||
return None
|
||||
|
||||
async def find_many(self, collection_suffix: str, query: Dict, limit: int = 0) -> List[Dict]:
|
||||
"""
|
||||
查询多条数据
|
||||
Args:
|
||||
collection_suffix: 集合名称后缀
|
||||
query: 查询条件
|
||||
limit: 限制返回数量,0表示不限制
|
||||
Returns:
|
||||
查询结果列表
|
||||
"""
|
||||
try:
|
||||
collection = await self.get_collection(collection_suffix)
|
||||
cursor = collection.find(query)
|
||||
if limit > 0:
|
||||
cursor = cursor.limit(limit)
|
||||
results = await cursor.to_list(length=None)
|
||||
return results
|
||||
except Exception as e:
|
||||
utils.logger.error(f"[MongoDBStoreBase.find_many] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}")
|
||||
return []
|
||||
|
||||
async def create_index(self, collection_suffix: str, keys: List[tuple], unique: bool = False):
|
||||
"""
|
||||
创建索引
|
||||
Args:
|
||||
collection_suffix: 集合名称后缀
|
||||
keys: 索引键列表,例如:[("note_id", 1)]
|
||||
unique: 是否创建唯一索引
|
||||
"""
|
||||
try:
|
||||
collection = await self.get_collection(collection_suffix)
|
||||
await collection.create_index(keys, unique=unique)
|
||||
utils.logger.info(f"[MongoDBStoreBase.create_index] Created index on {self.collection_prefix}_{collection_suffix}")
|
||||
except Exception as e:
|
||||
utils.logger.error(f"[MongoDBStoreBase.create_index] Failed to create index: {e}")
|
||||
|
||||
@@ -23,7 +23,8 @@ class TieBaStoreFactory:
|
||||
"csv": TieBaCsvStoreImplement,
|
||||
"db": TieBaDbStoreImplement,
|
||||
"json": TieBaJsonStoreImplement,
|
||||
"sqlite": TieBaSqliteStoreImplement
|
||||
"sqlite": TieBaSqliteStoreImplement,
|
||||
"mongodb": TieBaMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
@@ -31,7 +32,7 @@ class TieBaStoreFactory:
|
||||
store_class = TieBaStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError(
|
||||
"[TieBaStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
|
||||
"[TieBaStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from tools import utils, words
|
||||
from database.db_session import get_session
|
||||
from var import crawler_type_var
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
|
||||
def calculate_number_of_files(file_store_path: str) -> int:
|
||||
@@ -190,3 +191,61 @@ class TieBaSqliteStoreImplement(TieBaDbStoreImplement):
|
||||
Tieba sqlite store implement
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TieBaMongoStoreImplement(AbstractStore):
|
||||
"""贴吧MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="tieba")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储帖子内容到MongoDB
|
||||
Args:
|
||||
content_item: 帖子内容数据
|
||||
"""
|
||||
note_id = content_item.get("note_id")
|
||||
if not note_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"note_id": note_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[TieBaMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[TieBaMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[TieBaMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
|
||||
@@ -28,13 +28,14 @@ class WeibostoreFactory:
|
||||
"db": WeiboDbStoreImplement,
|
||||
"json": WeiboJsonStoreImplement,
|
||||
"sqlite": WeiboSqliteStoreImplement,
|
||||
"mongodb": WeiboMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_store() -> AbstractStore:
|
||||
store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError("[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
raise ValueError("[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ from tools import utils, words
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from database.db_session import get_session
|
||||
from var import crawler_type_var
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
|
||||
def calculate_number_of_files(file_store_path: str) -> int:
|
||||
@@ -212,3 +213,61 @@ class WeiboSqliteStoreImplement(WeiboDbStoreImplement):
|
||||
Weibo content SQLite storage implementation
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class WeiboMongoStoreImplement(AbstractStore):
|
||||
"""微博MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="weibo")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储微博内容到MongoDB
|
||||
Args:
|
||||
content_item: 微博内容数据
|
||||
"""
|
||||
note_id = content_item.get("note_id")
|
||||
if not note_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"note_id": note_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[WeiboMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[WeiboMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[WeiboMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
|
||||
@@ -27,13 +27,14 @@ class XhsStoreFactory:
|
||||
"db": XhsDbStoreImplement,
|
||||
"json": XhsJsonStoreImplement,
|
||||
"sqlite": XhsSqliteStoreImplement,
|
||||
"mongodb": XhsMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_store() -> AbstractStore:
|
||||
store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
|
||||
|
||||
@@ -18,6 +18,8 @@ from database.models import XhsNote, XhsNoteComment, XhsCreator
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from tools.time_util import get_current_timestamp
|
||||
from var import crawler_type_var
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
from tools import utils
|
||||
|
||||
class XhsCsvStoreImplement(AbstractStore):
|
||||
def __init__(self, **kwargs):
|
||||
@@ -258,3 +260,62 @@ class XhsDbStoreImplement(AbstractStore):
|
||||
class XhsSqliteStoreImplement(XhsDbStoreImplement):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
|
||||
class XhsMongoStoreImplement(AbstractStore):
|
||||
"""小红书MongoDB存储实现"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="xhs")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储笔记内容到MongoDB
|
||||
Args:
|
||||
content_item: 笔记内容数据
|
||||
"""
|
||||
note_id = content_item.get("note_id")
|
||||
if not note_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"note_id": note_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[XhsMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[XhsMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[XhsMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
|
||||
@@ -18,7 +18,8 @@ from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
|
||||
from ._store_impl import (ZhihuCsvStoreImplement,
|
||||
ZhihuDbStoreImplement,
|
||||
ZhihuJsonStoreImplement,
|
||||
ZhihuSqliteStoreImplement)
|
||||
ZhihuSqliteStoreImplement,
|
||||
ZhihuMongoStoreImplement)
|
||||
from tools import utils
|
||||
from var import source_keyword_var
|
||||
|
||||
@@ -28,14 +29,15 @@ class ZhihuStoreFactory:
|
||||
"csv": ZhihuCsvStoreImplement,
|
||||
"db": ZhihuDbStoreImplement,
|
||||
"json": ZhihuJsonStoreImplement,
|
||||
"sqlite": ZhihuSqliteStoreImplement
|
||||
"sqlite": ZhihuSqliteStoreImplement,
|
||||
"mongodb": ZhihuMongoStoreImplement,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def create_store() -> AbstractStore:
|
||||
store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
|
||||
if not store_class:
|
||||
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
|
||||
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
|
||||
return store_class()
|
||||
|
||||
async def batch_update_zhihu_contents(contents: List[ZhihuContent]):
|
||||
|
||||
@@ -31,6 +31,7 @@ from database.models import ZhihuContent, ZhihuComment, ZhihuCreator
|
||||
from tools import utils, words
|
||||
from var import crawler_type_var
|
||||
from tools.async_file_writer import AsyncFileWriter
|
||||
from store.mongodb_store_base import MongoDBStoreBase
|
||||
|
||||
def calculate_number_of_files(file_store_path: str) -> int:
|
||||
"""计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中
|
||||
@@ -189,3 +190,61 @@ class ZhihuSqliteStoreImplement(ZhihuDbStoreImplement):
|
||||
Zhihu content SQLite storage implementation
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ZhihuMongoStoreImplement(AbstractStore):
|
||||
"""知乎MongoDB存储实现"""
|
||||
|
||||
def __init__(self):
|
||||
self.mongo_store = MongoDBStoreBase(collection_prefix="zhihu")
|
||||
|
||||
async def store_content(self, content_item: Dict):
|
||||
"""
|
||||
存储内容到MongoDB
|
||||
Args:
|
||||
content_item: 内容数据
|
||||
"""
|
||||
note_id = content_item.get("note_id")
|
||||
if not note_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="contents",
|
||||
query={"note_id": note_id},
|
||||
data=content_item
|
||||
)
|
||||
utils.logger.info(f"[ZhihuMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
|
||||
|
||||
async def store_comment(self, comment_item: Dict):
|
||||
"""
|
||||
存储评论到MongoDB
|
||||
Args:
|
||||
comment_item: 评论数据
|
||||
"""
|
||||
comment_id = comment_item.get("comment_id")
|
||||
if not comment_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="comments",
|
||||
query={"comment_id": comment_id},
|
||||
data=comment_item
|
||||
)
|
||||
utils.logger.info(f"[ZhihuMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
|
||||
|
||||
async def store_creator(self, creator_item: Dict):
|
||||
"""
|
||||
存储创作者信息到MongoDB
|
||||
Args:
|
||||
creator_item: 创作者数据
|
||||
"""
|
||||
user_id = creator_item.get("user_id")
|
||||
if not user_id:
|
||||
return
|
||||
|
||||
await self.mongo_store.save_or_update(
|
||||
collection_suffix="creators",
|
||||
query={"user_id": user_id},
|
||||
data=creator_item
|
||||
)
|
||||
utils.logger.info(f"[ZhihuMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")
|
||||
|
||||
Reference in New Issue
Block a user