添加了抖音存储视频以及图片的逻辑,并将config.py中ENABLE_GET_IMAGES参数更名为ENABLE_GET_MEIDAS,在此基础上略微修改存储逻辑

This commit is contained in:
未来可欺
2025-07-30 18:24:08 +08:00
parent 417c39de69
commit 173bc08a9d
12 changed files with 631 additions and 716 deletions

View File

@@ -1,13 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。 # 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。 # 5. 不得用于任何非法或不当的用途。
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, Optional from typing import Dict, Optional
@@ -16,6 +15,7 @@ from playwright.async_api import BrowserContext, BrowserType, Playwright
class AbstractCrawler(ABC): class AbstractCrawler(ABC):
@abstractmethod @abstractmethod
async def start(self): async def start(self):
""" """
@@ -31,8 +31,7 @@ class AbstractCrawler(ABC):
pass pass
@abstractmethod @abstractmethod
async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str], async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str], headless: bool = True) -> BrowserContext:
headless: bool = True) -> BrowserContext:
""" """
launch browser launch browser
:param chromium: chromium browser :param chromium: chromium browser
@@ -43,8 +42,7 @@ class AbstractCrawler(ABC):
""" """
pass pass
async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], async def launch_browser_with_cdp(self, playwright: Playwright, playwright_proxy: Optional[Dict], user_agent: Optional[str], headless: bool = True) -> BrowserContext:
user_agent: Optional[str], headless: bool = True) -> BrowserContext:
""" """
使用CDP模式启动浏览器可选实现 使用CDP模式启动浏览器可选实现
:param playwright: playwright实例 :param playwright: playwright实例
@@ -58,6 +56,7 @@ class AbstractCrawler(ABC):
class AbstractLogin(ABC): class AbstractLogin(ABC):
@abstractmethod @abstractmethod
async def begin(self): async def begin(self):
pass pass
@@ -76,6 +75,7 @@ class AbstractLogin(ABC):
class AbstractStore(ABC): class AbstractStore(ABC):
@abstractmethod @abstractmethod
async def store_content(self, content_item: Dict): async def store_content(self, content_item: Dict):
pass pass
@@ -99,7 +99,16 @@ class AbstractStoreImage(ABC):
pass pass
class AbstractStoreVideo(ABC):
# TODO: support all platform
# only weibo is supported
# @abstractmethod
async def store_video(self, video_content_item: Dict):
pass
class AbstractApiClient(ABC): class AbstractApiClient(ABC):
@abstractmethod @abstractmethod
async def request(self, method, url, **kwargs): async def request(self, method, url, **kwargs):
pass pass

View File

@@ -8,14 +8,13 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# 基础配置 # 基础配置
PLATFORM = "xhs" # 平台xhs | dy | ks | bili | wb | tieba | zhihu PLATFORM = "dy" # 平台xhs | dy | ks | bili | wb | tieba | zhihu
KEYWORDS = "编程副业,编程兼职" # 关键词搜索配置,以英文逗号分隔 KEYWORDS = "希露非叶特" # 关键词搜索配置,以英文逗号分隔
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = "" COOKIES = ""
CRAWLER_TYPE = ( CRAWLER_TYPE = (
"search" # 爬取类型search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据) "creator" # 爬取类型search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)
) )
# 是否开启 IP 代理 # 是否开启 IP 代理
ENABLE_IP_PROXY = False ENABLE_IP_PROXY = False
@@ -63,7 +62,7 @@ BROWSER_LAUNCH_TIMEOUT = 30
AUTO_CLOSE_BROWSER = True AUTO_CLOSE_BROWSER = True
# 数据保存类型选项配置,支持四种类型csv、db、json、sqlite, 最好保存到DB有排重的功能。 # 数据保存类型选项配置,支持四种类型csv、db、json、sqlite, 最好保存到DB有排重的功能。
SAVE_DATA_OPTION = "json" # csv or db or json or sqlite SAVE_DATA_OPTION = "csv" # csv or db or json or sqlite
# 用户浏览器缓存的浏览器文件配置 # 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
@@ -77,8 +76,8 @@ CRAWLER_MAX_NOTES_COUNT = 200
# 并发爬虫数量控制 # 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 1 MAX_CONCURRENCY_NUM = 1
# 是否开启爬图片模式, 默认不开启爬图片 # 是否开启爬媒体模式(包含图片或视频资源),默认不开启爬媒体
ENABLE_GET_IMAGES = False ENABLE_GET_MEIDAS = True
# 是否开启爬评论模式, 默认开启爬评论 # 是否开启爬评论模式, 默认开启爬评论
ENABLE_GET_COMMENTS = True ENABLE_GET_COMMENTS = True

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/2 18:44 # @Time : 2023/12/2 18:44
@@ -59,13 +58,9 @@ class BilibiliCrawler(AbstractCrawler):
async def start(self): async def start(self):
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool( ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
@@ -81,9 +76,7 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.info("[BilibiliCrawler] 使用标准模式启动浏览器") utils.logger.info("[BilibiliCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(chromium, None, self.user_agent, headless=config.HEADLESS)
chromium, None, self.user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page() self.context_page = await self.browser_context.new_page()
@@ -100,9 +93,7 @@ class BilibiliCrawler(AbstractCrawler):
cookie_str=config.COOKIES, cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.bili_client.update_cookies( await self.bili_client.update_cookies(browser_context=self.browser_context)
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
@@ -136,7 +127,8 @@ class BilibiliCrawler(AbstractCrawler):
@staticmethod @staticmethod
async def get_pubtime_datetime( async def get_pubtime_datetime(
start: str = config.START_DAY, end: str = config.END_DAY start: str = config.START_DAY,
end: str = config.END_DAY,
) -> Tuple[str, str]: ) -> Tuple[str, str]:
""" """
获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s 获取 bilibili 作品发布日期起始时间戳 pubtime_begin_s 与发布日期结束时间戳 pubtime_end_s
@@ -158,17 +150,11 @@ class BilibiliCrawler(AbstractCrawler):
start_day: datetime = datetime.strptime(start, "%Y-%m-%d") start_day: datetime = datetime.strptime(start, "%Y-%m-%d")
end_day: datetime = datetime.strptime(end, "%Y-%m-%d") end_day: datetime = datetime.strptime(end, "%Y-%m-%d")
if start_day > end_day: if start_day > end_day:
raise ValueError( raise ValueError("Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end")
"Wrong time range, please check your start and end argument, to ensure that the start cannot exceed end"
)
elif start_day == end_day: # 搜索同一天的内容 elif start_day == end_day: # 搜索同一天的内容
end_day = ( end_day = (start_day + timedelta(days=1) - timedelta(seconds=1)) # 则将 end_day 设置为 start_day + 1 day - 1 second
start_day + timedelta(days=1) - timedelta(seconds=1)
) # 则将 end_day 设置为 start_day + 1 day - 1 second
else: # 搜索 start 至 end else: # 搜索 start 至 end
end_day = ( end_day = (end_day + timedelta(days=1) - timedelta(seconds=1)) # 则将 end_day 设置为 end_day + 1 day - 1 second
end_day + timedelta(days=1) - timedelta(seconds=1)
) # 则将 end_day 设置为 end_day + 1 day - 1 second
# 将其重新转换为时间戳 # 将其重新转换为时间戳
return str(int(start_day.timestamp())), str(int(end_day.timestamp())) return str(int(start_day.timestamp())), str(int(end_day.timestamp()))
@@ -177,32 +163,22 @@ class BilibiliCrawler(AbstractCrawler):
search bilibili video with keywords in normal mode search bilibili video with keywords in normal mode
:return: :return:
""" """
utils.logger.info( utils.logger.info("[BilibiliCrawler.search_by_keywords] Begin search bilibli keywords")
"[BilibiliCrawler.search_by_keywords] Begin search bilibli keywords"
)
bili_limit_count = 20 # bilibili limit page fixed value bili_limit_count = 20 # bilibili limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count: if config.CRAWLER_MAX_NOTES_COUNT < bili_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count config.CRAWLER_MAX_NOTES_COUNT = bili_limit_count
start_page = config.START_PAGE # start page number start_page = config.START_PAGE # start page number
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Current search keyword: {keyword}")
f"[BilibiliCrawler.search_by_keywords] Current search keyword: {keyword}"
)
page = 1 page = 1
while ( while (page - start_page + 1) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
page - start_page + 1
) * bili_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords] Skip page: {page}")
f"[BilibiliCrawler.search_by_keywords] Skip page: {page}"
)
page += 1 page += 1
continue continue
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords] search bilibili keyword: {keyword}, page: {page}")
f"[BilibiliCrawler.search_by_keywords] search bilibili keyword: {keyword}, page: {page}"
)
video_id_list: List[str] = [] video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword( videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword, keyword=keyword,
@@ -215,24 +191,15 @@ class BilibiliCrawler(AbstractCrawler):
video_list: List[Dict] = videos_res.get("result") video_list: List[Dict] = videos_res.get("result")
if not video_list: if not video_list:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords] No more videos for '{keyword}', moving to next keyword.")
f"[BilibiliCrawler.search_by_keywords] No more videos for '{keyword}', moving to next keyword."
)
break break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [] task_list = []
try: try:
task_list = [ task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
self.get_video_info_task(
aid=video_item.get("aid"), bvid="", semaphore=semaphore
)
for video_item in video_list
]
except Exception as e: except Exception as e:
utils.logger.warning( utils.logger.warning(f"[BilibiliCrawler.search_by_keywords] error in the task list. The video for this page will not be included. {e}")
f"[BilibiliCrawler.search_by_keywords] error in the task list. The video for this page will not be included. {e}"
)
video_items = await asyncio.gather(*task_list) video_items = await asyncio.gather(*task_list)
for video_item in video_items: for video_item in video_items:
if video_item: if video_item:
@@ -248,74 +215,40 @@ class BilibiliCrawler(AbstractCrawler):
Search bilibili video with keywords in a given time range. Search bilibili video with keywords in a given time range.
:param daily_limit: if True, strictly limit the number of notes per day and total. :param daily_limit: if True, strictly limit the number of notes per day and total.
""" """
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Begin search with daily_limit={daily_limit}")
f"[BilibiliCrawler.search_by_keywords_in_time_range] Begin search with daily_limit={daily_limit}"
)
bili_limit_count = 20 bili_limit_count = 20
start_page = config.START_PAGE start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search_by_keywords_in_time_range] Current search keyword: {keyword}")
f"[BilibiliCrawler.search_by_keywords_in_time_range] Current search keyword: {keyword}"
)
total_notes_crawled_for_keyword = 0 total_notes_crawled_for_keyword = 0
for day in pd.date_range( for day in pd.date_range(start=config.START_DAY, end=config.END_DAY, freq="D"):
start=config.START_DAY, end=config.END_DAY, freq="D" if (daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
): utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.")
if (
daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days."
)
break break
if ( if (not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
not daily_limit utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days.")
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}', skipping remaining days."
)
break break
pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime( pubtime_begin_s, pubtime_end_s = await self.get_pubtime_datetime(start=day.strftime("%Y-%m-%d"), end=day.strftime("%Y-%m-%d"))
start=day.strftime("%Y-%m-%d"), end=day.strftime("%Y-%m-%d")
)
page = 1 page = 1
notes_count_this_day = 0 notes_count_this_day = 0
while True: while True:
if notes_count_this_day >= config.MAX_NOTES_PER_DAY: if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search] Reached MAX_NOTES_PER_DAY limit for {day.ctime()}.")
f"[BilibiliCrawler.search] Reached MAX_NOTES_PER_DAY limit for {day.ctime()}."
)
break break
if ( if (daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
daily_limit utils.logger.info(f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}'.")
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
utils.logger.info(
f"[BilibiliCrawler.search] Reached CRAWLER_MAX_NOTES_COUNT limit for keyword '{keyword}'."
)
break break
if ( if (not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
not daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
try: try:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}")
f"[BilibiliCrawler.search] search bilibili keyword: {keyword}, date: {day.ctime()}, page: {page}"
)
video_id_list: List[str] = [] video_id_list: List[str] = []
videos_res = await self.bili_client.search_video_by_keyword( videos_res = await self.bili_client.search_video_by_keyword(
keyword=keyword, keyword=keyword,
@@ -328,33 +261,18 @@ class BilibiliCrawler(AbstractCrawler):
video_list: List[Dict] = videos_res.get("result") video_list: List[Dict] = videos_res.get("result")
if not video_list: if not video_list:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.search] No more videos for '{keyword}' on {day.ctime()}, moving to next day.")
f"[BilibiliCrawler.search] No more videos for '{keyword}' on {day.ctime()}, moving to next day."
)
break break
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [self.get_video_info_task(aid=video_item.get("aid"), bvid="", semaphore=semaphore) for video_item in video_list]
self.get_video_info_task(
aid=video_item.get("aid"), bvid="", semaphore=semaphore
)
for video_item in video_list
]
video_items = await asyncio.gather(*task_list) video_items = await asyncio.gather(*task_list)
for video_item in video_items: for video_item in video_items:
if video_item: if video_item:
if ( if (daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
if ( if (not daily_limit and total_notes_crawled_for_keyword >= config.CRAWLER_MAX_NOTES_COUNT):
not daily_limit
and total_notes_crawled_for_keyword
>= config.CRAWLER_MAX_NOTES_COUNT
):
break break
if notes_count_this_day >= config.MAX_NOTES_PER_DAY: if notes_count_this_day >= config.MAX_NOTES_PER_DAY:
break break
@@ -369,9 +287,7 @@ class BilibiliCrawler(AbstractCrawler):
await self.batch_get_video_comments(video_id_list) await self.batch_get_video_comments(video_id_list)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.search] Error searching on {day.ctime()}: {e}")
f"[BilibiliCrawler.search] Error searching on {day.ctime()}: {e}"
)
break break
async def batch_get_video_comments(self, video_id_list: List[str]): async def batch_get_video_comments(self, video_id_list: List[str]):
@@ -381,20 +297,14 @@ class BilibiliCrawler(AbstractCrawler):
:return: :return:
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled")
f"[BilibiliCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}")
f"[BilibiliCrawler.batch_get_video_comments] video ids:{video_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for video_id in video_id_list: for video_id in video_id_list:
task = asyncio.create_task( task = asyncio.create_task(self.get_comments(video_id, semaphore), name=video_id)
self.get_comments(video_id, semaphore), name=video_id
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -407,9 +317,7 @@ class BilibiliCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ...")
f"[BilibiliCrawler.get_comments] begin get video_id: {video_id} comments ..."
)
await asyncio.sleep(random.uniform(0.5, 1.5)) await asyncio.sleep(random.uniform(0.5, 1.5))
await self.bili_client.get_video_all_comments( await self.bili_client.get_video_all_comments(
video_id=video_id, video_id=video_id,
@@ -420,13 +328,9 @@ class BilibiliCrawler(AbstractCrawler):
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}")
f"[BilibiliCrawler.get_comments] get video_id: {video_id} comment error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}")
f"[BilibiliCrawler.get_comments] may be been blocked, err:{e}"
)
# Propagate the exception to be caught by the main loop # Propagate the exception to be caught by the main loop
raise raise
@@ -452,10 +356,7 @@ class BilibiliCrawler(AbstractCrawler):
:return: :return:
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [self.get_video_info_task(aid=0, bvid=video_id, semaphore=semaphore) for video_id in bvids_list]
self.get_video_info_task(aid=0, bvid=video_id, semaphore=semaphore)
for video_id in bvids_list
]
video_details = await asyncio.gather(*task_list) video_details = await asyncio.gather(*task_list)
video_aids_list = [] video_aids_list = []
for video_detail in video_details: for video_detail in video_details:
@@ -469,9 +370,7 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_bilibili_video(video_detail, semaphore) await self.get_bilibili_video(video_detail, semaphore)
await self.batch_get_video_comments(video_aids_list) await self.batch_get_video_comments(video_aids_list)
async def get_video_info_task( async def get_video_info_task(self, aid: int, bvid: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
self, aid: int, bvid: str, semaphore: asyncio.Semaphore
) -> Optional[Dict]:
""" """
Get video detail task Get video detail task
:param aid: :param aid:
@@ -484,19 +383,13 @@ class BilibiliCrawler(AbstractCrawler):
result = await self.bili_client.get_video_info(aid=aid, bvid=bvid) result = await self.bili_client.get_video_info(aid=aid, bvid=bvid)
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}")
f"[BilibiliCrawler.get_video_info_task] Get video detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}")
f"[BilibiliCrawler.get_video_info_task] have not fund note detail video_id:{bvid}, err: {ex}"
)
return None return None
async def get_video_play_url_task( async def get_video_play_url_task(self, aid: int, cid: int, semaphore: asyncio.Semaphore) -> Union[Dict, None]:
self, aid: int, cid: int, semaphore: asyncio.Semaphore
) -> Union[Dict, None]:
""" """
Get video play url Get video play url
:param aid: :param aid:
@@ -509,30 +402,20 @@ class BilibiliCrawler(AbstractCrawler):
result = await self.bili_client.get_video_play_url(aid=aid, cid=cid) result = await self.bili_client.get_video_play_url(aid=aid, cid=cid)
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_video_play_url_task] Get video play url error: {ex}")
f"[BilibiliCrawler.get_video_play_url_task] Get video play url error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_video_play_url_task] have not fund play url from :{aid}|{cid}, err: {ex}")
f"[BilibiliCrawler.get_video_play_url_task] have not fund play url from :{aid}|{cid}, err: {ex}"
)
return None return None
async def create_bilibili_client( async def create_bilibili_client(self, httpx_proxy: Optional[str]) -> BilibiliClient:
self, httpx_proxy: Optional[str]
) -> BilibiliClient:
""" """
create bilibili client create bilibili client
:param httpx_proxy: httpx proxy :param httpx_proxy: httpx proxy
:return: bilibili client :return: bilibili client
""" """
utils.logger.info( utils.logger.info("[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ...")
"[BilibiliCrawler.create_bilibili_client] Begin create bilibili API client ..." cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
bilibili_client_obj = BilibiliClient( bilibili_client_obj = BilibiliClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
@@ -562,30 +445,27 @@ class BilibiliCrawler(AbstractCrawler):
:param headless: headless mode :param headless: headless mode
:return: browser context :return: browser context
""" """
utils.logger.info( utils.logger.info("[BilibiliCrawler.launch_browser] Begin create browser context ...")
"[BilibiliCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
# feat issue #14 # feat issue #14
# we will save login state to avoid login every time # we will save login state to avoid login every time
user_data_dir = os.path.join( user_data_dir = os.path.join(os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM) # type: ignore
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={
"width": 1920,
"height": 1080
},
user_agent=user_agent, user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
# type: ignore # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) browser = await chromium.launch(headless=headless, proxy=playwright_proxy)
browser_context = await browser.new_context( browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context return browser_context
async def launch_browser_with_cdp( async def launch_browser_with_cdp(
@@ -614,14 +494,10 @@ class BilibiliCrawler(AbstractCrawler):
return browser_context return browser_context
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler] CDP模式启动失败回退到标准模式: {e}")
f"[BilibiliCrawler] CDP模式启动失败回退到标准模式: {e}"
)
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser( return await self.launch_browser(chromium, playwright_proxy, user_agent, headless)
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
@@ -634,13 +510,9 @@ class BilibiliCrawler(AbstractCrawler):
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[BilibiliCrawler.close] Browser context closed ...") utils.logger.info("[BilibiliCrawler.close] Browser context closed ...")
except TargetClosedError: except TargetClosedError:
utils.logger.warning( utils.logger.warning("[BilibiliCrawler.close] Browser context was already closed.")
"[BilibiliCrawler.close] Browser context was already closed."
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.close] An error occurred during close: {e}")
f"[BilibiliCrawler.close] An error occurred during close: {e}"
)
async def get_bilibili_video(self, video_item: Dict, semaphore: asyncio.Semaphore): async def get_bilibili_video(self, video_item: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -649,19 +521,15 @@ class BilibiliCrawler(AbstractCrawler):
:param semaphore: :param semaphore:
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_MEIDAS:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_bilibili_video] Crawling image mode is not enabled")
f"[BilibiliCrawler.get_bilibili_video] Crawling image mode is not enabled"
)
return return
video_item_view: Dict = video_item.get("View") video_item_view: Dict = video_item.get("View")
aid = video_item_view.get("aid") aid = video_item_view.get("aid")
cid = video_item_view.get("cid") cid = video_item_view.get("cid")
result = await self.get_video_play_url_task(aid, cid, semaphore) result = await self.get_video_play_url_task(aid, cid, semaphore)
if result is None: if result is None:
utils.logger.info( utils.logger.info("[BilibiliCrawler.get_bilibili_video] get video play url failed")
"[BilibiliCrawler.get_bilibili_video] get video play url failed"
)
return return
durl_list = result.get("durl") durl_list = result.get("durl")
max_size = -1 max_size = -1
@@ -672,9 +540,7 @@ class BilibiliCrawler(AbstractCrawler):
max_size = size max_size = size
video_url = durl.get("url") video_url = durl.get("url")
if video_url == "": if video_url == "":
utils.logger.info( utils.logger.info("[BilibiliCrawler.get_bilibili_video] get video url failed")
"[BilibiliCrawler.get_bilibili_video] get video url failed"
)
return return
content = await self.bili_client.get_video_media(video_url) content = await self.bili_client.get_video_media(video_url)
@@ -687,25 +553,17 @@ class BilibiliCrawler(AbstractCrawler):
""" """
creator_id_list: get details for creator from creator_id_list creator_id_list: get details for creator from creator_id_list
""" """
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator")
f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator" utils.logger.info(f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}")
)
utils.logger.info(
f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
try: try:
for creator_id in creator_id_list: for creator_id in creator_id_list:
task = asyncio.create_task( task = asyncio.create_task(self.get_creator_details(creator_id, semaphore), name=creator_id)
self.get_creator_details(creator_id, semaphore), name=creator_id
)
task_list.append(task) task_list.append(task)
except Exception as e: except Exception as e:
utils.logger.warning( utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}")
f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}"
)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -717,9 +575,7 @@ class BilibiliCrawler(AbstractCrawler):
:return: :return:
""" """
async with semaphore: async with semaphore:
creator_unhandled_info: Dict = await self.bili_client.get_creator_info( creator_unhandled_info: Dict = await self.bili_client.get_creator_info(creator_id)
creator_id
)
creator_info: Dict = { creator_info: Dict = {
"id": creator_id, "id": creator_id,
"name": creator_unhandled_info.get("name"), "name": creator_unhandled_info.get("name"),
@@ -740,9 +596,7 @@ class BilibiliCrawler(AbstractCrawler):
creator_id = creator_info["id"] creator_id = creator_info["id"]
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ...")
f"[BilibiliCrawler.get_fans] begin get creator_id: {creator_id} fans ..."
)
await self.bili_client.get_creator_all_fans( await self.bili_client.get_creator_all_fans(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -751,13 +605,9 @@ class BilibiliCrawler(AbstractCrawler):
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}")
f"[BilibiliCrawler.get_fans] get creator_id: {creator_id} fans error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}")
f"[BilibiliCrawler.get_fans] may be been blocked, err:{e}"
)
async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore): async def get_followings(self, creator_info: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -769,9 +619,7 @@ class BilibiliCrawler(AbstractCrawler):
creator_id = creator_info["id"] creator_id = creator_info["id"]
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ...")
f"[BilibiliCrawler.get_followings] begin get creator_id: {creator_id} followings ..."
)
await self.bili_client.get_creator_all_followings( await self.bili_client.get_creator_all_followings(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -780,13 +628,9 @@ class BilibiliCrawler(AbstractCrawler):
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}")
f"[BilibiliCrawler.get_followings] get creator_id: {creator_id} followings error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}")
f"[BilibiliCrawler.get_followings] may be been blocked, err:{e}"
)
async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore): async def get_dynamics(self, creator_info: Dict, semaphore: asyncio.Semaphore):
""" """
@@ -798,9 +642,7 @@ class BilibiliCrawler(AbstractCrawler):
creator_id = creator_info["id"] creator_id = creator_info["id"]
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ...")
f"[BilibiliCrawler.get_dynamics] begin get creator_id: {creator_id} dynamics ..."
)
await self.bili_client.get_creator_all_dynamics( await self.bili_client.get_creator_all_dynamics(
creator_info=creator_info, creator_info=creator_info,
crawl_interval=random.random(), crawl_interval=random.random(),
@@ -809,10 +651,6 @@ class BilibiliCrawler(AbstractCrawler):
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}")
f"[BilibiliCrawler.get_dynamics] get creator_id: {creator_id} dynamics error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}")
f"[BilibiliCrawler.get_dynamics] may be been blocked, err:{e}"
)

View File

@@ -1,21 +1,20 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。 # 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。 # 5. 不得用于任何非法或不当的用途。
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio import asyncio
import copy import copy
import json import json
import urllib.parse import urllib.parse
from typing import Any, Callable, Dict, Optional from typing import Any, Callable, Dict, Union, Optional
import requests import httpx
from playwright.async_api import BrowserContext from playwright.async_api import BrowserContext
from base.base_crawler import AbstractApiClient from base.base_crawler import AbstractApiClient
@@ -28,14 +27,15 @@ from .help import *
class DOUYINClient(AbstractApiClient): class DOUYINClient(AbstractApiClient):
def __init__( def __init__(
self, self,
timeout=30, timeout=30,
proxies=None, proxies=None,
*, *,
headers: Dict, headers: Dict,
playwright_page: Optional[Page], playwright_page: Optional[Page],
cookie_dict: Dict cookie_dict: Dict,
): ):
self.proxies = proxies self.proxies = proxies
self.timeout = timeout self.timeout = timeout
@@ -45,8 +45,11 @@ class DOUYINClient(AbstractApiClient):
self.cookie_dict = cookie_dict self.cookie_dict = cookie_dict
async def __process_req_params( async def __process_req_params(
self, uri: str, params: Optional[Dict] = None, headers: Optional[Dict] = None, self,
request_method="GET" uri: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None,
request_method="GET",
): ):
if not params: if not params:
@@ -93,10 +96,8 @@ class DOUYINClient(AbstractApiClient):
async def request(self, method, url, **kwargs): async def request(self, method, url, **kwargs):
response = None response = None
if method == "GET": async with httpx.AsyncClient(proxies=self.proxies) as client:
response = requests.request(method, url, **kwargs) response = await client.request(method, url, timeout=self.timeout, **kwargs)
elif method == "POST":
response = requests.request(method, url, **kwargs)
try: try:
if response.text == "" or response.text == "blocked": if response.text == "" or response.text == "blocked":
utils.logger.error(f"request params incrr, response.text: {response.text}") utils.logger.error(f"request params incrr, response.text: {response.text}")
@@ -132,13 +133,13 @@ class DOUYINClient(AbstractApiClient):
self.cookie_dict = cookie_dict self.cookie_dict = cookie_dict
async def search_info_by_keyword( async def search_info_by_keyword(
self, self,
keyword: str, keyword: str,
offset: int = 0, offset: int = 0,
search_channel: SearchChannelType = SearchChannelType.GENERAL, search_channel: SearchChannelType = SearchChannelType.GENERAL,
sort_type: SearchSortType = SearchSortType.GENERAL, sort_type: SearchSortType = SearchSortType.GENERAL,
publish_time: PublishTimeType = PublishTimeType.UNLIMITED, publish_time: PublishTimeType = PublishTimeType.UNLIMITED,
search_id: str = "" search_id: str = "",
): ):
""" """
DouYin Web Search API DouYin Web Search API
@@ -165,10 +166,7 @@ class DOUYINClient(AbstractApiClient):
'search_id': search_id, 'search_id': search_id,
} }
if sort_type.value != SearchSortType.GENERAL.value or publish_time.value != PublishTimeType.UNLIMITED.value: if sort_type.value != SearchSortType.GENERAL.value or publish_time.value != PublishTimeType.UNLIMITED.value:
query_params["filter_selected"] = json.dumps({ query_params["filter_selected"] = json.dumps({"sort_type": str(sort_type.value), "publish_time": str(publish_time.value)})
"sort_type": str(sort_type.value),
"publish_time": str(publish_time.value)
})
query_params["is_filter_search"] = 1 query_params["is_filter_search"] = 1
query_params["search_source"] = "tab_search" query_params["search_source"] = "tab_search"
referer_url = f"https://www.douyin.com/search/{keyword}?aid=f594bbd9-a0e2-4651-9319-ebe3cb6298c1&type=general" referer_url = f"https://www.douyin.com/search/{keyword}?aid=f594bbd9-a0e2-4651-9319-ebe3cb6298c1&type=general"
@@ -182,9 +180,7 @@ class DOUYINClient(AbstractApiClient):
:param aweme_id: :param aweme_id:
:return: :return:
""" """
params = { params = {"aweme_id": aweme_id}
"aweme_id": aweme_id
}
headers = copy.copy(self.headers) headers = copy.copy(self.headers)
del headers["Origin"] del headers["Origin"]
res = await self.get("/aweme/v1/web/aweme/detail/", params, headers) res = await self.get("/aweme/v1/web/aweme/detail/", params, headers)
@@ -195,12 +191,7 @@ class DOUYINClient(AbstractApiClient):
""" """
uri = "/aweme/v1/web/comment/list/" uri = "/aweme/v1/web/comment/list/"
params = { params = {"aweme_id": aweme_id, "cursor": cursor, "count": 20, "item_type": 0}
"aweme_id": aweme_id,
"cursor": cursor,
"count": 20,
"item_type": 0
}
keywords = request_keyword_var.get() keywords = request_keyword_var.get()
referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general' referer_url = "https://www.douyin.com/search/" + keywords + '?aid=3a3cec5a-9e27-4040-b6aa-ef548c2c1138&publish_time=0&sort_type=0&source=search_history&type=general'
headers = copy.copy(self.headers) headers = copy.copy(self.headers)
@@ -226,12 +217,12 @@ class DOUYINClient(AbstractApiClient):
return await self.get(uri, params) return await self.get(uri, params)
async def get_aweme_all_comments( async def get_aweme_all_comments(
self, self,
aweme_id: str, aweme_id: str,
crawl_interval: float = 1.0, crawl_interval: float = 1.0,
is_fetch_sub_comments=False, is_fetch_sub_comments=False,
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
max_count: int = 10, max_count: int = 10,
): ):
""" """
获取帖子的所有评论,包括子评论 获取帖子的所有评论,包括子评论
@@ -315,9 +306,17 @@ class DOUYINClient(AbstractApiClient):
posts_has_more = aweme_post_res.get("has_more", 0) posts_has_more = aweme_post_res.get("has_more", 0)
max_cursor = aweme_post_res.get("max_cursor") max_cursor = aweme_post_res.get("max_cursor")
aweme_list = aweme_post_res.get("aweme_list") if aweme_post_res.get("aweme_list") else [] aweme_list = aweme_post_res.get("aweme_list") if aweme_post_res.get("aweme_list") else []
utils.logger.info( utils.logger.info(f"[DouYinCrawler.get_all_user_aweme_posts] get sec_user_id:{sec_user_id} video len : {len(aweme_list)}")
f"[DOUYINClient.get_all_user_aweme_posts] got sec_user_id:{sec_user_id} video len : {len(aweme_list)}")
if callback: if callback:
await callback(aweme_list) await callback(aweme_list)
result.extend(aweme_list) result.extend(aweme_list)
return result return result
async def get_aweme_media(self, url: str) -> Union[bytes, None]:
async with httpx.AsyncClient(proxies=self.proxies) as client:
response = await client.request("GET", url, timeout=self.timeout)
if not response.reason_phrase == "OK":
utils.logger.error(f"[DouYinCrawler.get_aweme_media] request {url} err, res:{response.text}")
return None
else:
return response.content

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio import asyncio
import os import os
import random import random
@@ -50,13 +49,9 @@ class DouYinCrawler(AbstractCrawler):
async def start(self) -> None: async def start(self) -> None:
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool( ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
@@ -93,9 +88,7 @@ class DouYinCrawler(AbstractCrawler):
cookie_str=config.COOKIES, cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.dy_client.update_cookies( await self.dy_client.update_cookies(browser_context=self.browser_context)
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information. # Search for notes and retrieve their comment information.
@@ -121,17 +114,13 @@ class DouYinCrawler(AbstractCrawler):
aweme_list: List[str] = [] aweme_list: List[str] = []
page = 0 page = 0
dy_search_id = "" dy_search_id = ""
while ( while (page - start_page + 1) * dy_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
page - start_page + 1
) * dy_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[DouYinCrawler.search] Skip {page}") utils.logger.info(f"[DouYinCrawler.search] Skip {page}")
page += 1 page += 1
continue continue
try: try:
utils.logger.info( utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page}")
f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page}"
)
posts_res = await self.dy_client.search_info_by_keyword( posts_res = await self.dy_client.search_info_by_keyword(
keyword=keyword, keyword=keyword,
offset=page * dy_limit_count - dy_limit_count, offset=page * dy_limit_count - dy_limit_count,
@@ -139,67 +128,49 @@ class DouYinCrawler(AbstractCrawler):
search_id=dy_search_id, search_id=dy_search_id,
) )
if posts_res.get("data") is None or posts_res.get("data") == []: if posts_res.get("data") is None or posts_res.get("data") == []:
utils.logger.info( utils.logger.info(f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`")
f"[DouYinCrawler.search] search douyin keyword: {keyword}, page: {page} is empty,{posts_res.get('data')}`"
)
break break
except DataFetchError: except DataFetchError:
utils.logger.error( utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed")
f"[DouYinCrawler.search] search douyin keyword: {keyword} failed"
)
break break
page += 1 page += 1
if "data" not in posts_res: if "data" not in posts_res:
utils.logger.error( utils.logger.error(f"[DouYinCrawler.search] search douyin keyword: {keyword} failed账号也许被风控了。")
f"[DouYinCrawler.search] search douyin keyword: {keyword} failed账号也许被风控了。"
)
break break
dy_search_id = posts_res.get("extra", {}).get("logid", "") dy_search_id = posts_res.get("extra", {}).get("logid", "")
for post_item in posts_res.get("data"): for post_item in posts_res.get("data"):
try: try:
aweme_info: Dict = ( aweme_info: Dict = (post_item.get("aweme_info") or post_item.get("aweme_mix_info", {}).get("mix_items")[0])
post_item.get("aweme_info")
or post_item.get("aweme_mix_info", {}).get("mix_items")[0]
)
except TypeError: except TypeError:
continue continue
aweme_list.append(aweme_info.get("aweme_id", "")) aweme_list.append(aweme_info.get("aweme_id", ""))
await douyin_store.update_douyin_aweme(aweme_item=aweme_info) await douyin_store.update_douyin_aweme(aweme_item=aweme_info)
utils.logger.info( await self.get_aweme_media(aweme_item=aweme_info)
f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}" utils.logger.info(f"[DouYinCrawler.search] keyword:{keyword}, aweme_list:{aweme_list}")
)
await self.batch_get_note_comments(aweme_list) await self.batch_get_note_comments(aweme_list)
async def get_specified_awemes(self): async def get_specified_awemes(self):
"""Get the information and comments of the specified post""" """Get the information and comments of the specified post"""
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST]
self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore)
for aweme_id in config.DY_SPECIFIED_ID_LIST
]
aweme_details = await asyncio.gather(*task_list) aweme_details = await asyncio.gather(*task_list)
for aweme_detail in aweme_details: for aweme_detail in aweme_details:
if aweme_detail is not None: if aweme_detail is not None:
await douyin_store.update_douyin_aweme(aweme_detail) await douyin_store.update_douyin_aweme(aweme_item=aweme_detail)
await self.get_aweme_media(aweme_item=aweme_detail)
await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST) await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
async def get_aweme_detail( async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any:
self, aweme_id: str, semaphore: asyncio.Semaphore
) -> Any:
"""Get note detail""" """Get note detail"""
async with semaphore: async with semaphore:
try: try:
return await self.dy_client.get_video_by_id(aweme_id) return await self.dy_client.get_video_by_id(aweme_id)
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[DouYinCrawler.get_aweme_detail] Get aweme detail error: {ex}")
f"[DouYinCrawler.get_aweme_detail] Get aweme detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(f"[DouYinCrawler.get_aweme_detail] have not fund note detail aweme_id:{aweme_id}, err: {ex}")
f"[DouYinCrawler.get_aweme_detail] have not fund note detail aweme_id:{aweme_id}, err: {ex}"
)
return None return None
async def batch_get_note_comments(self, aweme_list: List[str]) -> None: async def batch_get_note_comments(self, aweme_list: List[str]) -> None:
@@ -207,17 +178,13 @@ class DouYinCrawler(AbstractCrawler):
Batch get note comments Batch get note comments
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(f"[DouYinCrawler.batch_get_note_comments] Crawling comment mode is not enabled")
f"[DouYinCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
task_list: List[Task] = [] task_list: List[Task] = []
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
for aweme_id in aweme_list: for aweme_id in aweme_list:
task = asyncio.create_task( task = asyncio.create_task(self.get_comments(aweme_id, semaphore), name=aweme_id)
self.get_comments(aweme_id, semaphore), name=aweme_id
)
task_list.append(task) task_list.append(task)
if len(task_list) > 0: if len(task_list) > 0:
await asyncio.wait(task_list) await asyncio.wait(task_list)
@@ -233,30 +200,22 @@ class DouYinCrawler(AbstractCrawler):
callback=douyin_store.batch_update_dy_aweme_comments, callback=douyin_store.batch_update_dy_aweme_comments,
max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES, max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
) )
utils.logger.info( utils.logger.info(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ...")
f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} comments have all been obtained and filtered ..."
)
except DataFetchError as e: except DataFetchError as e:
utils.logger.error( utils.logger.error(f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}")
f"[DouYinCrawler.get_comments] aweme_id: {aweme_id} get comments failed, error: {e}"
)
async def get_creators_and_videos(self) -> None: async def get_creators_and_videos(self) -> None:
""" """
Get the information and videos of the specified creator Get the information and videos of the specified creator
""" """
utils.logger.info( utils.logger.info("[DouYinCrawler.get_creators_and_videos] Begin get douyin creators")
"[DouYinCrawler.get_creators_and_videos] Begin get douyin creators"
)
for user_id in config.DY_CREATOR_ID_LIST: for user_id in config.DY_CREATOR_ID_LIST:
creator_info: Dict = await self.dy_client.get_user_info(user_id) creator_info: Dict = await self.dy_client.get_user_info(user_id)
if creator_info: if creator_info:
await douyin_store.save_creator(user_id, creator=creator_info) await douyin_store.save_creator(user_id, creator=creator_info)
# Get all video information of the creator # Get all video information of the creator
all_video_list = await self.dy_client.get_all_user_aweme_posts( all_video_list = await self.dy_client.get_all_user_aweme_posts(sec_user_id=user_id, callback=self.fetch_creator_video_detail)
sec_user_id=user_id, callback=self.fetch_creator_video_detail
)
video_ids = [video_item.get("aweme_id") for video_item in all_video_list] video_ids = [video_item.get("aweme_id") for video_item in all_video_list]
await self.batch_get_note_comments(video_ids) await self.batch_get_note_comments(video_ids)
@@ -266,15 +225,13 @@ class DouYinCrawler(AbstractCrawler):
Concurrently obtain the specified post list and save the data Concurrently obtain the specified post list and save the data
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [self.get_aweme_detail(post_item.get("aweme_id"), semaphore) for post_item in video_list]
self.get_aweme_detail(post_item.get("aweme_id"), semaphore)
for post_item in video_list
]
note_details = await asyncio.gather(*task_list) note_details = await asyncio.gather(*task_list)
for aweme_item in note_details: for aweme_item in note_details:
if aweme_item is not None: if aweme_item is not None:
await douyin_store.update_douyin_aweme(aweme_item) await douyin_store.update_douyin_aweme(aweme_item=aweme_item)
await self.get_aweme_media(aweme_item=aweme_item)
async def create_douyin_client(self, httpx_proxy: Optional[str]) -> DOUYINClient: async def create_douyin_client(self, httpx_proxy: Optional[str]) -> DOUYINClient:
"""Create douyin client""" """Create douyin client"""
@@ -282,9 +239,7 @@ class DouYinCrawler(AbstractCrawler):
douyin_client = DOUYINClient( douyin_client = DOUYINClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
"User-Agent": await self.context_page.evaluate( "User-Agent": await self.context_page.evaluate("() => navigator.userAgent"),
"() => navigator.userAgent"
),
"Cookie": cookie_str, "Cookie": cookie_str,
"Host": "www.douyin.com", "Host": "www.douyin.com",
"Origin": "https://www.douyin.com/", "Origin": "https://www.douyin.com/",
@@ -305,23 +260,22 @@ class DouYinCrawler(AbstractCrawler):
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join( user_data_dir = os.path.join(os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM) # type: ignore
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={
"width": 1920,
"height": 1080
},
user_agent=user_agent, user_agent=user_agent,
) # type: ignore ) # type: ignore
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context return browser_context
async def launch_browser_with_cdp( async def launch_browser_with_cdp(
@@ -356,9 +310,7 @@ class DouYinCrawler(AbstractCrawler):
utils.logger.error(f"[DouYinCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[DouYinCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser( return await self.launch_browser(chromium, playwright_proxy, user_agent, headless)
chromium, playwright_proxy, user_agent, headless
)
async def close(self) -> None: async def close(self) -> None:
"""Close browser context""" """Close browser context"""
@@ -369,3 +321,74 @@ class DouYinCrawler(AbstractCrawler):
else: else:
await self.browser_context.close() await self.browser_context.close()
utils.logger.info("[DouYinCrawler.close] Browser context closed ...") utils.logger.info("[DouYinCrawler.close] Browser context closed ...")
async def get_aweme_media(self, aweme_item: Dict):
"""
获取抖音媒体,自动判断媒体类型是短视频还是帖子图片并下载
Args:
aweme_item (Dict): 抖音作品详情
"""
if not config.ENABLE_GET_MEIDAS:
utils.logger.info(f"[DouYinCrawler.get_aweme_media] Crawling image mode is not enabled")
return
# 笔记 urls 列表,若为短视频类型则返回为空列表
note_download_url: List[str] = douyin_store._extract_note_image_list(aweme_item)
# 视频 url永远存在但为短视频类型时的文件其实是音频文件
video_download_url: str = douyin_store._extract_video_download_url(aweme_item)
# TODO: 抖音并没采用音视频分离的策略,故音频可从原视频中分离,暂不提取
if note_download_url:
await self.get_aweme_images(aweme_item)
else:
await self.get_aweme_video(aweme_item)
async def get_aweme_images(self, aweme_item: Dict):
"""
get aweme images. please use get_aweme_media
Args:
aweme_item (Dict): 抖音作品详情
"""
if not config.ENABLE_GET_MEIDAS:
return
aweme_id = aweme_item.get("aweme_id")
# 笔记 urls 列表,若为短视频类型则返回为空列表
note_download_url: List[str] = douyin_store._extract_note_image_list(aweme_item)
if not note_download_url:
return
picNum = 0
for url in note_download_url:
if not url:
continue
content = await self.dy_client.get_aweme_media(url)
if content is None:
continue
extension_file_name = f"{picNum}.jpeg"
picNum += 1
await douyin_store.update_dy_aweme_image(aweme_id, content, extension_file_name)
async def get_aweme_video(self, aweme_item: Dict):
"""
get aweme videos. please use get_aweme_media
Args:
aweme_item (Dict): 抖音作品详情
"""
if not config.ENABLE_GET_MEIDAS:
return
aweme_id = aweme_item.get("aweme_id")
# 视频 url永远存在但为短视频类型时的文件其实是音频文件
video_download_url: str = douyin_store._extract_video_download_url(aweme_item)
if not video_download_url:
return
videoNum = 0
for url in video_download_url:
content = await self.xhs_client.get_note_media(url)
if content is None:
continue
extension_file_name = f"{videoNum}.mp4"
videoNum += 1
await douyin_store.update_dy_aweme_image(aweme_id, content, extension_file_name)

View File

@@ -8,13 +8,11 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
# @Time : 2023/12/23 15:41 # @Time : 2023/12/23 15:41
# @Desc : 微博爬虫主流程代码 # @Desc : 微博爬虫主流程代码
import asyncio import asyncio
import os import os
import random import random
@@ -60,13 +58,9 @@ class WeiboCrawler(AbstractCrawler):
async def start(self): async def start(self):
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool( ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
@@ -82,9 +76,7 @@ class WeiboCrawler(AbstractCrawler):
utils.logger.info("[WeiboCrawler] 使用标准模式启动浏览器") utils.logger.info("[WeiboCrawler] 使用标准模式启动浏览器")
# Launch a browser context. # Launch a browser context.
chromium = playwright.chromium chromium = playwright.chromium
self.browser_context = await self.launch_browser( self.browser_context = await self.launch_browser(chromium, None, self.mobile_user_agent, headless=config.HEADLESS)
chromium, None, self.mobile_user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler. # stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js") await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page() self.context_page = await self.browser_context.new_page()
@@ -103,14 +95,10 @@ class WeiboCrawler(AbstractCrawler):
await login_obj.begin() await login_obj.begin()
# 登录成功后重定向到手机端的网站再更新手机端登录成功的cookie # 登录成功后重定向到手机端的网站再更新手机端登录成功的cookie
utils.logger.info( utils.logger.info("[WeiboCrawler.start] redirect weibo mobile homepage and update cookies on mobile platform")
"[WeiboCrawler.start] redirect weibo mobile homepage and update cookies on mobile platform"
)
await self.context_page.goto(self.mobile_index_url) await self.context_page.goto(self.mobile_index_url)
await asyncio.sleep(2) await asyncio.sleep(2)
await self.wb_client.update_cookies( await self.wb_client.update_cookies(browser_context=self.browser_context)
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
@@ -147,30 +135,20 @@ class WeiboCrawler(AbstractCrawler):
elif config.WEIBO_SEARCH_TYPE == "video": elif config.WEIBO_SEARCH_TYPE == "video":
search_type = SearchType.VIDEO search_type = SearchType.VIDEO
else: else:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.search] Invalid WEIBO_SEARCH_TYPE: {config.WEIBO_SEARCH_TYPE}")
f"[WeiboCrawler.search] Invalid WEIBO_SEARCH_TYPE: {config.WEIBO_SEARCH_TYPE}"
)
return return
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info( utils.logger.info(f"[WeiboCrawler.search] Current search keyword: {keyword}")
f"[WeiboCrawler.search] Current search keyword: {keyword}"
)
page = 1 page = 1
while ( while (page - start_page + 1) * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
page - start_page + 1
) * weibo_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[WeiboCrawler.search] Skip page: {page}") utils.logger.info(f"[WeiboCrawler.search] Skip page: {page}")
page += 1 page += 1
continue continue
utils.logger.info( utils.logger.info(f"[WeiboCrawler.search] search weibo keyword: {keyword}, page: {page}")
f"[WeiboCrawler.search] search weibo keyword: {keyword}, page: {page}" search_res = await self.wb_client.get_note_by_keyword(keyword=keyword, page=page, search_type=search_type)
)
search_res = await self.wb_client.get_note_by_keyword(
keyword=keyword, page=page, search_type=search_type
)
note_id_list: List[str] = [] note_id_list: List[str] = []
note_list = filter_search_result_card(search_res.get("cards")) note_list = filter_search_result_card(search_res.get("cards"))
for note_item in note_list: for note_item in note_list:
@@ -190,19 +168,14 @@ class WeiboCrawler(AbstractCrawler):
:return: :return:
""" """
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [ task_list = [self.get_note_info_task(note_id=note_id, semaphore=semaphore) for note_id in config.WEIBO_SPECIFIED_ID_LIST]
self.get_note_info_task(note_id=note_id, semaphore=semaphore)
for note_id in config.WEIBO_SPECIFIED_ID_LIST
]
video_details = await asyncio.gather(*task_list) video_details = await asyncio.gather(*task_list)
for note_item in video_details: for note_item in video_details:
if note_item: if note_item:
await weibo_store.update_weibo_note(note_item) await weibo_store.update_weibo_note(note_item)
await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST) await self.batch_get_notes_comments(config.WEIBO_SPECIFIED_ID_LIST)
async def get_note_info_task( async def get_note_info_task(self, note_id: str, semaphore: asyncio.Semaphore) -> Optional[Dict]:
self, note_id: str, semaphore: asyncio.Semaphore
) -> Optional[Dict]:
""" """
Get note detail task Get note detail task
:param note_id: :param note_id:
@@ -214,14 +187,10 @@ class WeiboCrawler(AbstractCrawler):
result = await self.wb_client.get_note_info_by_id(note_id) result = await self.wb_client.get_note_info_by_id(note_id)
return result return result
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}")
f"[WeiboCrawler.get_note_info_task] Get note detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}")
f"[WeiboCrawler.get_note_info_task] have not fund note detail note_id:{note_id}, err: {ex}"
)
return None return None
async def batch_get_notes_comments(self, note_id_list: List[str]): async def batch_get_notes_comments(self, note_id_list: List[str]):
@@ -231,20 +200,14 @@ class WeiboCrawler(AbstractCrawler):
:return: :return:
""" """
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(f"[WeiboCrawler.batch_get_note_comments] Crawling comment mode is not enabled")
f"[WeiboCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
utils.logger.info( utils.logger.info(f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}")
f"[WeiboCrawler.batch_get_notes_comments] note ids:{note_id_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for note_id in note_id_list: for note_id in note_id_list:
task = asyncio.create_task( task = asyncio.create_task(self.get_note_comments(note_id, semaphore), name=note_id)
self.get_note_comments(note_id, semaphore), name=note_id
)
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
@@ -257,25 +220,17 @@ class WeiboCrawler(AbstractCrawler):
""" """
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ...")
f"[WeiboCrawler.get_note_comments] begin get note_id: {note_id} comments ..."
)
await self.wb_client.get_note_all_comments( await self.wb_client.get_note_all_comments(
note_id=note_id, note_id=note_id,
crawl_interval=random.randint( crawl_interval=random.randint(1, 3), # 微博对API的限流比较严重所以延时提高一些
1, 3
), # 微博对API的限流比较严重所以延时提高一些
callback=weibo_store.batch_update_weibo_note_comments, callback=weibo_store.batch_update_weibo_note_comments,
max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES, max_count=config.CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES,
) )
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}")
f"[WeiboCrawler.get_note_comments] get note_id: {note_id} comment error: {ex}"
)
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}")
f"[WeiboCrawler.get_note_comments] may be been blocked, err:{e}"
)
async def get_note_images(self, mblog: Dict): async def get_note_images(self, mblog: Dict):
""" """
@@ -283,10 +238,8 @@ class WeiboCrawler(AbstractCrawler):
:param mblog: :param mblog:
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_MEIDAS:
utils.logger.info( utils.logger.info(f"[WeiboCrawler.get_note_images] Crawling image mode is not enabled")
f"[WeiboCrawler.get_note_images] Crawling image mode is not enabled"
)
return return
pics: Dict = mblog.get("pics") pics: Dict = mblog.get("pics")
@@ -299,9 +252,7 @@ class WeiboCrawler(AbstractCrawler):
content = await self.wb_client.get_note_image(url) content = await self.wb_client.get_note_image(url)
if content != None: if content != None:
extension_file_name = url.split(".")[-1] extension_file_name = url.split(".")[-1]
await weibo_store.update_weibo_note_image( await weibo_store.update_weibo_note_image(pic["pid"], content, extension_file_name)
pic["pid"], content, extension_file_name
)
async def get_creators_and_notes(self) -> None: async def get_creators_and_notes(self) -> None:
""" """
@@ -309,18 +260,12 @@ class WeiboCrawler(AbstractCrawler):
Returns: Returns:
""" """
utils.logger.info( utils.logger.info("[WeiboCrawler.get_creators_and_notes] Begin get weibo creators")
"[WeiboCrawler.get_creators_and_notes] Begin get weibo creators"
)
for user_id in config.WEIBO_CREATOR_ID_LIST: for user_id in config.WEIBO_CREATOR_ID_LIST:
createor_info_res: Dict = await self.wb_client.get_creator_info_by_id( createor_info_res: Dict = await self.wb_client.get_creator_info_by_id(creator_id=user_id)
creator_id=user_id
)
if createor_info_res: if createor_info_res:
createor_info: Dict = createor_info_res.get("userInfo", {}) createor_info: Dict = createor_info_res.get("userInfo", {})
utils.logger.info( utils.logger.info(f"[WeiboCrawler.get_creators_and_notes] creator info: {createor_info}")
f"[WeiboCrawler.get_creators_and_notes] creator info: {createor_info}"
)
if not createor_info: if not createor_info:
raise DataFetchError("Get creator info error") raise DataFetchError("Get creator info error")
await weibo_store.save_creator(user_id, user_info=createor_info) await weibo_store.save_creator(user_id, user_info=createor_info)
@@ -333,26 +278,16 @@ class WeiboCrawler(AbstractCrawler):
callback=weibo_store.batch_update_weibo_notes, callback=weibo_store.batch_update_weibo_notes,
) )
note_ids = [ note_ids = [note_item.get("mblog", {}).get("id") for note_item in all_notes_list if note_item.get("mblog", {}).get("id")]
note_item.get("mblog", {}).get("id")
for note_item in all_notes_list
if note_item.get("mblog", {}).get("id")
]
await self.batch_get_notes_comments(note_ids) await self.batch_get_notes_comments(note_ids)
else: else:
utils.logger.error( utils.logger.error(f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_id:{user_id}")
f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_id:{user_id}"
)
async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient: async def create_weibo_client(self, httpx_proxy: Optional[str]) -> WeiboClient:
"""Create xhs client""" """Create xhs client"""
utils.logger.info( utils.logger.info("[WeiboCrawler.create_weibo_client] Begin create weibo API client ...")
"[WeiboCrawler.create_weibo_client] Begin create weibo API client ..." cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
weibo_client_obj = WeiboClient( weibo_client_obj = WeiboClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
@@ -375,27 +310,24 @@ class WeiboCrawler(AbstractCrawler):
headless: bool = True, headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
utils.logger.info( utils.logger.info("[WeiboCrawler.launch_browser] Begin create browser context ...")
"[WeiboCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
user_data_dir = os.path.join( user_data_dir = os.path.join(os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM) # type: ignore
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={
"width": 1920,
"height": 1080
},
user_agent=user_agent, user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context return browser_context
async def launch_browser_with_cdp( async def launch_browser_with_cdp(
@@ -427,9 +359,7 @@ class WeiboCrawler(AbstractCrawler):
utils.logger.error(f"[WeiboCrawler] CDP模式启动失败回退到标准模式: {e}") utils.logger.error(f"[WeiboCrawler] CDP模式启动失败回退到标准模式: {e}")
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser( return await self.launch_browser(chromium, playwright_proxy, user_agent, headless)
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""

View File

@@ -8,7 +8,6 @@
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import asyncio import asyncio
import os import os
import random import random
@@ -57,13 +56,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def start(self) -> None: async def start(self) -> None:
playwright_proxy_format, httpx_proxy_format = None, None playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
ip_proxy_pool = await create_ip_pool( ip_proxy_pool = await create_ip_pool(config.IP_PROXY_POOL_COUNT, enable_validate_ip=True)
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy() ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info( playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
ip_proxy_info
)
async with async_playwright() as playwright: async with async_playwright() as playwright:
# 根据配置选择启动模式 # 根据配置选择启动模式
@@ -101,9 +96,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
cookie_str=config.COOKIES, cookie_str=config.COOKIES,
) )
await login_obj.begin() await login_obj.begin()
await self.xhs_client.update_cookies( await self.xhs_client.update_cookies(browser_context=self.browser_context)
browser_context=self.browser_context
)
crawler_type_var.set(config.CRAWLER_TYPE) crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search": if config.CRAWLER_TYPE == "search":
@@ -122,47 +115,33 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def search(self) -> None: async def search(self) -> None:
"""Search for notes and retrieve their comment information.""" """Search for notes and retrieve their comment information."""
utils.logger.info( utils.logger.info("[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords")
"[XiaoHongShuCrawler.search] Begin search xiaohongshu keywords"
)
xhs_limit_count = 20 # xhs limit page fixed value xhs_limit_count = 20 # xhs limit page fixed value
if config.CRAWLER_MAX_NOTES_COUNT < xhs_limit_count: if config.CRAWLER_MAX_NOTES_COUNT < xhs_limit_count:
config.CRAWLER_MAX_NOTES_COUNT = xhs_limit_count config.CRAWLER_MAX_NOTES_COUNT = xhs_limit_count
start_page = config.START_PAGE start_page = config.START_PAGE
for keyword in config.KEYWORDS.split(","): for keyword in config.KEYWORDS.split(","):
source_keyword_var.set(keyword) source_keyword_var.set(keyword)
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}")
f"[XiaoHongShuCrawler.search] Current search keyword: {keyword}"
)
page = 1 page = 1
search_id = get_search_id() search_id = get_search_id()
while ( while (page - start_page + 1) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
page - start_page + 1
) * xhs_limit_count <= config.CRAWLER_MAX_NOTES_COUNT:
if page < start_page: if page < start_page:
utils.logger.info(f"[XiaoHongShuCrawler.search] Skip page {page}") utils.logger.info(f"[XiaoHongShuCrawler.search] Skip page {page}")
page += 1 page += 1
continue continue
try: try:
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}")
f"[XiaoHongShuCrawler.search] search xhs keyword: {keyword}, page: {page}"
)
note_ids: List[str] = [] note_ids: List[str] = []
xsec_tokens: List[str] = [] xsec_tokens: List[str] = []
notes_res = await self.xhs_client.get_note_by_keyword( notes_res = await self.xhs_client.get_note_by_keyword(
keyword=keyword, keyword=keyword,
search_id=search_id, search_id=search_id,
page=page, page=page,
sort=( sort=(SearchSortType(config.SORT_TYPE) if config.SORT_TYPE != "" else SearchSortType.GENERAL),
SearchSortType(config.SORT_TYPE)
if config.SORT_TYPE != ""
else SearchSortType.GENERAL
),
)
utils.logger.info(
f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}"
) )
utils.logger.info(f"[XiaoHongShuCrawler.search] Search notes res:{notes_res}")
if not notes_res or not notes_res.get("has_more", False): if not notes_res or not notes_res.get("has_more", False):
utils.logger.info("No more content!") utils.logger.info("No more content!")
break break
@@ -173,9 +152,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
xsec_source=post_item.get("xsec_source"), xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"), xsec_token=post_item.get("xsec_token"),
semaphore=semaphore, semaphore=semaphore,
) ) for post_item in notes_res.get("items", {}) if post_item.get("model_type") not in ("rec_query", "hot_query")
for post_item in notes_res.get("items", {})
if post_item.get("model_type") not in ("rec_query", "hot_query")
] ]
note_details = await asyncio.gather(*task_list) note_details = await asyncio.gather(*task_list)
for note_detail in note_details: for note_detail in note_details:
@@ -185,26 +162,18 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_ids.append(note_detail.get("note_id")) note_ids.append(note_detail.get("note_id"))
xsec_tokens.append(note_detail.get("xsec_token")) xsec_tokens.append(note_detail.get("xsec_token"))
page += 1 page += 1
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.search] Note details: {note_details}")
f"[XiaoHongShuCrawler.search] Note details: {note_details}"
)
await self.batch_get_note_comments(note_ids, xsec_tokens) await self.batch_get_note_comments(note_ids, xsec_tokens)
except DataFetchError: except DataFetchError:
utils.logger.error( utils.logger.error("[XiaoHongShuCrawler.search] Get note detail error")
"[XiaoHongShuCrawler.search] Get note detail error"
)
break break
async def get_creators_and_notes(self) -> None: async def get_creators_and_notes(self) -> None:
"""Get creator's notes and retrieve their comment information.""" """Get creator's notes and retrieve their comment information."""
utils.logger.info( utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
"[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators"
)
for user_id in config.XHS_CREATOR_ID_LIST: for user_id in config.XHS_CREATOR_ID_LIST:
# get creator detail info from web html content # get creator detail info from web html content
createor_info: Dict = await self.xhs_client.get_creator_info( createor_info: Dict = await self.xhs_client.get_creator_info(user_id=user_id)
user_id=user_id
)
if createor_info: if createor_info:
await xhs_store.save_creator(user_id, creator=createor_info) await xhs_store.save_creator(user_id, creator=createor_info)
@@ -238,14 +207,14 @@ class XiaoHongShuCrawler(AbstractCrawler):
xsec_source=post_item.get("xsec_source"), xsec_source=post_item.get("xsec_source"),
xsec_token=post_item.get("xsec_token"), xsec_token=post_item.get("xsec_token"),
semaphore=semaphore, semaphore=semaphore,
) ) for post_item in note_list
for post_item in note_list
] ]
note_details = await asyncio.gather(*task_list) note_details = await asyncio.gather(*task_list)
for note_detail in note_details: for note_detail in note_details:
if note_detail: if note_detail:
await xhs_store.update_xhs_note(note_detail) await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail)
async def get_specified_notes(self): async def get_specified_notes(self):
""" """
@@ -257,9 +226,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
get_note_detail_task_list = [] get_note_detail_task_list = []
for full_note_url in config.XHS_SPECIFIED_NOTE_URL_LIST: for full_note_url in config.XHS_SPECIFIED_NOTE_URL_LIST:
note_url_info: NoteUrlInfo = parse_note_info_from_note_url(full_note_url) note_url_info: NoteUrlInfo = parse_note_info_from_note_url(full_note_url)
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}")
f"[XiaoHongShuCrawler.get_specified_notes] Parse note url info: {note_url_info}"
)
crawler_task = self.get_note_detail_async_task( crawler_task = self.get_note_detail_async_task(
note_id=note_url_info.note_id, note_id=note_url_info.note_id,
xsec_source=note_url_info.xsec_source, xsec_source=note_url_info.xsec_source,
@@ -276,14 +243,15 @@ class XiaoHongShuCrawler(AbstractCrawler):
need_get_comment_note_ids.append(note_detail.get("note_id", "")) need_get_comment_note_ids.append(note_detail.get("note_id", ""))
xsec_tokens.append(note_detail.get("xsec_token", "")) xsec_tokens.append(note_detail.get("xsec_token", ""))
await xhs_store.update_xhs_note(note_detail) await xhs_store.update_xhs_note(note_detail)
await self.get_notice_media(note_detail)
await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens) await self.batch_get_note_comments(need_get_comment_note_ids, xsec_tokens)
async def get_note_detail_async_task( async def get_note_detail_async_task(
self, self,
note_id: str, note_id: str,
xsec_source: str, xsec_source: str,
xsec_token: str, xsec_token: str,
semaphore: asyncio.Semaphore, semaphore: asyncio.Semaphore,
) -> Optional[Dict]: ) -> Optional[Dict]:
"""Get note detail """Get note detail
@@ -299,72 +267,49 @@ class XiaoHongShuCrawler(AbstractCrawler):
note_detail = None note_detail = None
async with semaphore: async with semaphore:
try: try:
utils.logger.info( utils.logger.info(f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}")
f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}"
)
try: try:
note_detail = await self.xhs_client.get_note_by_id( note_detail = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token)
note_id, xsec_source, xsec_token
)
except RetryError as e: except RetryError as e:
pass pass
if not note_detail: if not note_detail:
note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, enable_cookie=False)
enable_cookie=False)
if not note_detail: if not note_detail:
raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}") raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}")
note_detail.update( note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source})
{"xsec_token": xsec_token, "xsec_source": xsec_source}
)
return note_detail return note_detail
except DataFetchError as ex: except DataFetchError as ex:
utils.logger.error( utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}")
f"[XiaoHongShuCrawler.get_note_detail_async_task] Get note detail error: {ex}"
)
return None return None
except KeyError as ex: except KeyError as ex:
utils.logger.error( utils.logger.error(f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}")
f"[XiaoHongShuCrawler.get_note_detail_async_task] have not fund note detail note_id:{note_id}, err: {ex}"
)
return None return None
async def batch_get_note_comments( async def batch_get_note_comments(self, note_list: List[str], xsec_tokens: List[str]):
self, note_list: List[str], xsec_tokens: List[str]
):
"""Batch get note comments""" """Batch get note comments"""
if not config.ENABLE_GET_COMMENTS: if not config.ENABLE_GET_COMMENTS:
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled")
f"[XiaoHongShuCrawler.batch_get_note_comments] Crawling comment mode is not enabled"
)
return return
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}")
f"[XiaoHongShuCrawler.batch_get_note_comments] Begin batch get note comments, note list: {note_list}"
)
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM) semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = [] task_list: List[Task] = []
for index, note_id in enumerate(note_list): for index, note_id in enumerate(note_list):
task = asyncio.create_task( task = asyncio.create_task(
self.get_comments( self.get_comments(note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore),
note_id=note_id, xsec_token=xsec_tokens[index], semaphore=semaphore
),
name=note_id, name=note_id,
) )
task_list.append(task) task_list.append(task)
await asyncio.gather(*task_list) await asyncio.gather(*task_list)
async def get_comments( async def get_comments(self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore):
self, note_id: str, xsec_token: str, semaphore: asyncio.Semaphore
):
"""Get note comments with keyword filtering and quantity limitation""" """Get note comments with keyword filtering and quantity limitation"""
async with semaphore: async with semaphore:
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}")
f"[XiaoHongShuCrawler.get_comments] Begin get note id comments {note_id}"
)
# When proxy is not enabled, increase the crawling interval # When proxy is not enabled, increase the crawling interval
if config.ENABLE_IP_PROXY: if config.ENABLE_IP_PROXY:
crawl_interval = random.random() crawl_interval = random.random()
@@ -380,12 +325,8 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient: async def create_xhs_client(self, httpx_proxy: Optional[str]) -> XiaoHongShuClient:
"""Create xhs client""" """Create xhs client"""
utils.logger.info( utils.logger.info("[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ...")
"[XiaoHongShuCrawler.create_xhs_client] Begin create xiaohongshu API client ..." cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
)
cookie_str, cookie_dict = utils.convert_cookies(
await self.browser_context.cookies()
)
xhs_client_obj = XiaoHongShuClient( xhs_client_obj = XiaoHongShuClient(
proxies=httpx_proxy, proxies=httpx_proxy,
headers={ headers={
@@ -412,44 +353,41 @@ class XiaoHongShuCrawler(AbstractCrawler):
return xhs_client_obj return xhs_client_obj
async def launch_browser( async def launch_browser(
self, self,
chromium: BrowserType, chromium: BrowserType,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True, headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
"""Launch browser and create browser context""" """Launch browser and create browser context"""
utils.logger.info( utils.logger.info("[XiaoHongShuCrawler.launch_browser] Begin create browser context ...")
"[XiaoHongShuCrawler.launch_browser] Begin create browser context ..."
)
if config.SAVE_LOGIN_STATE: if config.SAVE_LOGIN_STATE:
# feat issue #14 # feat issue #14
# we will save login state to avoid login every time # we will save login state to avoid login every time
user_data_dir = os.path.join( user_data_dir = os.path.join(os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM) # type: ignore
os.getcwd(), "browser_data", config.USER_DATA_DIR % config.PLATFORM
) # type: ignore
browser_context = await chromium.launch_persistent_context( browser_context = await chromium.launch_persistent_context(
user_data_dir=user_data_dir, user_data_dir=user_data_dir,
accept_downloads=True, accept_downloads=True,
headless=headless, headless=headless,
proxy=playwright_proxy, # type: ignore proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080}, viewport={
"width": 1920,
"height": 1080
},
user_agent=user_agent, user_agent=user_agent,
) )
return browser_context return browser_context
else: else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser_context = await browser.new_context( browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
return browser_context return browser_context
async def launch_browser_with_cdp( async def launch_browser_with_cdp(
self, self,
playwright: Playwright, playwright: Playwright,
playwright_proxy: Optional[Dict], playwright_proxy: Optional[Dict],
user_agent: Optional[str], user_agent: Optional[str],
headless: bool = True, headless: bool = True,
) -> BrowserContext: ) -> BrowserContext:
""" """
使用CDP模式启动浏览器 使用CDP模式启动浏览器
@@ -470,14 +408,10 @@ class XiaoHongShuCrawler(AbstractCrawler):
return browser_context return browser_context
except Exception as e: except Exception as e:
utils.logger.error( utils.logger.error(f"[XiaoHongShuCrawler] CDP模式启动失败回退到标准模式: {e}")
f"[XiaoHongShuCrawler] CDP模式启动失败回退到标准模式: {e}"
)
# 回退到标准模式 # 回退到标准模式
chromium = playwright.chromium chromium = playwright.chromium
return await self.launch_browser( return await self.launch_browser(chromium, playwright_proxy, user_agent, headless)
chromium, playwright_proxy, user_agent, headless
)
async def close(self): async def close(self):
"""Close browser context""" """Close browser context"""
@@ -490,10 +424,8 @@ class XiaoHongShuCrawler(AbstractCrawler):
utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...") utils.logger.info("[XiaoHongShuCrawler.close] Browser context closed ...")
async def get_notice_media(self, note_detail: Dict): async def get_notice_media(self, note_detail: Dict):
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_MEIDAS:
utils.logger.info( utils.logger.info(f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled")
f"[XiaoHongShuCrawler.get_notice_media] Crawling image mode is not enabled"
)
return return
await self.get_note_images(note_detail) await self.get_note_images(note_detail)
await self.get_notice_video(note_detail) await self.get_notice_video(note_detail)
@@ -504,7 +436,7 @@ class XiaoHongShuCrawler(AbstractCrawler):
:param note_item: :param note_item:
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_MEIDAS:
return return
note_id = note_item.get("note_id") note_id = note_item.get("note_id")
image_list: List[Dict] = note_item.get("image_list", []) image_list: List[Dict] = note_item.get("image_list", [])
@@ -529,11 +461,11 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def get_notice_video(self, note_item: Dict): async def get_notice_video(self, note_item: Dict):
""" """
get note images. please use get_notice_media get note videos. please use get_notice_media
:param note_item: :param note_item:
:return: :return:
""" """
if not config.ENABLE_GET_IMAGES: if not config.ENABLE_GET_MEIDAS:
return return
note_id = note_item.get("note_id") note_id = note_item.get("note_id")
@@ -548,4 +480,4 @@ class XiaoHongShuCrawler(AbstractCrawler):
continue continue
extension_file_name = f"{videoNum}.mp4" extension_file_name = f"{videoNum}.mp4"
videoNum += 1 videoNum += 1
await xhs_store.update_xhs_note_image(note_id, content, extension_file_name) await xhs_store.update_xhs_note_video(note_id, content, extension_file_name)

View File

@@ -1,13 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。 # 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。 # 5. 不得用于任何非法或不当的用途。
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : helloteemo # @Author : helloteemo
@@ -18,11 +17,11 @@ from typing import Dict
import aiofiles import aiofiles
from base.base_crawler import AbstractStoreImage from base.base_crawler import AbstractStoreImage, AbstractStoreVideo
from tools import utils from tools import utils
class BilibiliVideo(AbstractStoreImage): class BilibiliVideo(AbstractStoreVideo):
video_store_path: str = "data/bilibili/videos" video_store_path: str = "data/bilibili/videos"
async def store_video(self, video_content_item: Dict): async def store_video(self, video_content_item: Dict):
@@ -34,8 +33,7 @@ class BilibiliVideo(AbstractStoreImage):
Returns: Returns:
""" """
await self.save_video(video_content_item.get("aid"), video_content_item.get("video_content"), await self.save_video(video_content_item.get("aid"), video_content_item.get("video_content"), video_content_item.get("extension_file_name"))
video_content_item.get("extension_file_name"))
def make_save_file_name(self, aid: str, extension_file_name: str) -> str: def make_save_file_name(self, aid: str, extension_file_name: str) -> str:
""" """

View File

@@ -18,6 +18,7 @@ import config
from var import source_keyword_var from var import source_keyword_var
from .douyin_store_impl import * from .douyin_store_impl import *
from .douyin_store_media import *
class DouyinStoreFactory: class DouyinStoreFactory:
@@ -233,3 +234,33 @@ async def save_creator(user_id: str, creator: Dict):
} }
utils.logger.info(f"[store.douyin.save_creator] creator:{local_db_item}") utils.logger.info(f"[store.douyin.save_creator] creator:{local_db_item}")
await DouyinStoreFactory.create_store().store_creator(local_db_item) await DouyinStoreFactory.create_store().store_creator(local_db_item)
async def update_dy_aweme_image(aweme_id, pic_content, extension_file_name):
"""
更新抖音笔记图片
Args:
aweme_id:
pic_content:
extension_file_name:
Returns:
"""
await DouYinImage().store_image({"aweme_id": aweme_id, "pic_content": pic_content, "extension_file_name": extension_file_name})
async def update_dy_aweme_video(aweme_id, video_content, extension_file_name):
"""
更新抖音短视频
Args:
aweme_id:
video_content:
extension_file_name:
Returns:
"""
await DouYinVideo().store_video({"aweme_id": aweme_id, "video_content": video_content, "extension_file_name": extension_file_name})

View File

@@ -0,0 +1,103 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import pathlib
from typing import Dict
import aiofiles
from base.base_crawler import AbstractStoreImage, AbstractStoreVideo
from tools import utils
class DouYinImage(AbstractStoreImage):
image_store_path: str = "data/douyin/images"
async def store_image(self, image_content_item: Dict):
"""
store content
Args:
content_item:
Returns:
"""
await self.save_image(image_content_item.get("aweme_id"), image_content_item.get("pic_content"), image_content_item.get("extension_file_name"))
def make_save_file_name(self, aweme_id: str, extension_file_name: str) -> str:
"""
make save file name by store type
Args:
aweme_id: aweme id
picid: image id
Returns:
"""
return f"{self.image_store_path}/{aweme_id}/{extension_file_name}"
async def save_image(self, aweme_id: str, pic_content: str, extension_file_name):
"""
save image to local
Args:
aweme_id: aweme id
pic_content: image content
Returns:
"""
pathlib.Path(self.image_store_path + "/" + aweme_id).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(aweme_id, extension_file_name)
async with aiofiles.open(save_file_name, 'wb') as f:
await f.write(pic_content)
utils.logger.info(f"[DouYinImageStoreImplement.save_image] save image {save_file_name} success ...")
class DouYinVideo(AbstractStoreVideo):
video_store_path: str = "data/douyin/videos"
async def store_video(self, video_content_item: Dict):
"""
store content
Args:
content_item:
Returns:
"""
await self.save_video(video_content_item.get("aweme_id"), video_content_item.get("video_content"), video_content_item.get("extension_file_name"))
def make_save_file_name(self, aweme_id: str, extension_file_name: str) -> str:
"""
make save file name by store type
Args:
aweme_id: aweme id
picid: image id
Returns:
"""
return f"{self.video_store_path}/{aweme_id}/{extension_file_name}"
async def save_video(self, aweme_id: str, video_content: str, extension_file_name):
"""
save video to local
Args:
aweme_id: aweme id
pic_content: image content
Returns:
"""
pathlib.Path(self.video_store_path + "/" + aweme_id).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(aweme_id, extension_file_name)
async with aiofiles.open(save_file_name, 'wb') as f:
await f.write(video_content)
utils.logger.info(f"[DouYinVideoStoreImplement.save_video] save video {save_file_name} success ...")

View File

@@ -1,13 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。 # 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。 # 5. 不得用于任何非法或不当的用途。
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com # @Author : relakkes@gmail.com
@@ -28,7 +27,7 @@ class XhsStoreFactory:
"csv": XhsCsvStoreImplement, "csv": XhsCsvStoreImplement,
"db": XhsDbStoreImplement, "db": XhsDbStoreImplement,
"json": XhsJsonStoreImplement, "json": XhsJsonStoreImplement,
"sqlite": XhsSqliteStoreImplement "sqlite": XhsSqliteStoreImplement,
} }
@staticmethod @staticmethod
@@ -88,27 +87,27 @@ async def update_xhs_note(note_item: Dict):
video_url = ','.join(get_video_url_arr(note_item)) video_url = ','.join(get_video_url_arr(note_item))
local_db_item = { local_db_item = {
"note_id": note_item.get("note_id"), # 帖子id "note_id": note_item.get("note_id"), # 帖子id
"type": note_item.get("type"), # 帖子类型 "type": note_item.get("type"), # 帖子类型
"title": note_item.get("title") or note_item.get("desc", "")[:255], # 帖子标题 "title": note_item.get("title") or note_item.get("desc", "")[:255], # 帖子标题
"desc": note_item.get("desc", ""), # 帖子描述 "desc": note_item.get("desc", ""), # 帖子描述
"video_url": video_url, # 帖子视频url "video_url": video_url, # 帖子视频url
"time": note_item.get("time"), # 帖子发布时间 "time": note_item.get("time"), # 帖子发布时间
"last_update_time": note_item.get("last_update_time", 0), # 帖子最后更新时间 "last_update_time": note_item.get("last_update_time", 0), # 帖子最后更新时间
"user_id": user_info.get("user_id"), # 用户id "user_id": user_info.get("user_id"), # 用户id
"nickname": user_info.get("nickname"), # 用户昵称 "nickname": user_info.get("nickname"), # 用户昵称
"avatar": user_info.get("avatar"), # 用户头像 "avatar": user_info.get("avatar"), # 用户头像
"liked_count": interact_info.get("liked_count"), # 点赞数 "liked_count": interact_info.get("liked_count"), # 点赞数
"collected_count": interact_info.get("collected_count"), # 收藏数 "collected_count": interact_info.get("collected_count"), # 收藏数
"comment_count": interact_info.get("comment_count"), # 评论数 "comment_count": interact_info.get("comment_count"), # 评论数
"share_count": interact_info.get("share_count"), # 分享数 "share_count": interact_info.get("share_count"), # 分享数
"ip_location": note_item.get("ip_location", ""), # ip地址 "ip_location": note_item.get("ip_location", ""), # ip地址
"image_list": ','.join([img.get('url', '') for img in image_list]), # 图片url "image_list": ','.join([img.get('url', '') for img in image_list]), # 图片url
"tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type') == 'topic']), # 标签 "tag_list": ','.join([tag.get('name', '') for tag in tag_list if tag.get('type') == 'topic']), # 标签
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间 "last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
"note_url": f"https://www.xiaohongshu.com/explore/{note_id}?xsec_token={note_item.get('xsec_token')}&xsec_source=pc_search", # 帖子url "note_url": f"https://www.xiaohongshu.com/explore/{note_id}?xsec_token={note_item.get('xsec_token')}&xsec_source=pc_search", # 帖子url
"source_keyword": source_keyword_var.get(), # 搜索关键词 "source_keyword": source_keyword_var.get(), # 搜索关键词
"xsec_token": note_item.get("xsec_token"), # xsec_token "xsec_token": note_item.get("xsec_token"), # xsec_token
} }
utils.logger.info(f"[store.xhs.update_xhs_note] xhs note: {local_db_item}") utils.logger.info(f"[store.xhs.update_xhs_note] xhs note: {local_db_item}")
await XhsStoreFactory.create_store().store_content(local_db_item) await XhsStoreFactory.create_store().store_content(local_db_item)
@@ -145,18 +144,18 @@ async def update_xhs_note_comment(note_id: str, comment_item: Dict):
comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])] comment_pictures = [item.get("url_default", "") for item in comment_item.get("pictures", [])]
target_comment = comment_item.get("target_comment", {}) target_comment = comment_item.get("target_comment", {})
local_db_item = { local_db_item = {
"comment_id": comment_id, # 评论id "comment_id": comment_id, # 评论id
"create_time": comment_item.get("create_time"), # 评论时间 "create_time": comment_item.get("create_time"), # 评论时间
"ip_location": comment_item.get("ip_location"), # ip地址 "ip_location": comment_item.get("ip_location"), # ip地址
"note_id": note_id, # 帖子id "note_id": note_id, # 帖子id
"content": comment_item.get("content"), # 评论内容 "content": comment_item.get("content"), # 评论内容
"user_id": user_info.get("user_id"), # 用户id "user_id": user_info.get("user_id"), # 用户id
"nickname": user_info.get("nickname"), # 用户昵称 "nickname": user_info.get("nickname"), # 用户昵称
"avatar": user_info.get("image"), # 用户头像 "avatar": user_info.get("image"), # 用户头像
"sub_comment_count": comment_item.get("sub_comment_count", 0), # 子评论数 "sub_comment_count": comment_item.get("sub_comment_count", 0), # 子评论数
"pictures": ",".join(comment_pictures), # 评论图片 "pictures": ",".join(comment_pictures), # 评论图片
"parent_comment_id": target_comment.get("id", 0), # 父评论id "parent_comment_id": target_comment.get("id", 0), # 父评论id
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间 "last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
"like_count": comment_item.get("like_count", 0), "like_count": comment_item.get("like_count", 0),
} }
utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}") utils.logger.info(f"[store.xhs.update_xhs_note_comment] xhs note comment:{local_db_item}")
@@ -197,16 +196,16 @@ async def save_creator(user_id: str, creator: Dict):
local_db_item = { local_db_item = {
'user_id': user_id, # 用户id 'user_id': user_id, # 用户id
'nickname': user_info.get('nickname'), # 昵称 'nickname': user_info.get('nickname'), # 昵称
'gender': get_gender(user_info.get('gender')), # 性别 'gender': get_gender(user_info.get('gender')), # 性别
'avatar': user_info.get('images'), # 头像 'avatar': user_info.get('images'), # 头像
'desc': user_info.get('desc'), # 个人描述 'desc': user_info.get('desc'), # 个人描述
'ip_location': user_info.get('ipLocation'), # ip地址 'ip_location': user_info.get('ipLocation'), # ip地址
'follows': follows, # 关注数 'follows': follows, # 关注数
'fans': fans, # 粉丝数 'fans': fans, # 粉丝数
'interaction': interaction, # 互动数 'interaction': interaction, # 互动数
'tag_list': json.dumps({tag.get('tagType'): tag.get('name') for tag in creator.get('tags')}, 'tag_list': json.dumps({tag.get('tagType'): tag.get('name')
ensure_ascii=False), # 标签 for tag in creator.get('tags')}, ensure_ascii=False), # 标签
"last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间 "last_modify_ts": utils.get_current_timestamp(), # 最后更新时间戳MediaCrawler程序生成的主要用途在db存储的时候记录一条记录最新更新时间
} }
utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}") utils.logger.info(f"[store.xhs.save_creator] creator:{local_db_item}")
await XhsStoreFactory.create_store().store_creator(local_db_item) await XhsStoreFactory.create_store().store_creator(local_db_item)
@@ -214,7 +213,7 @@ async def save_creator(user_id: str, creator: Dict):
async def update_xhs_note_image(note_id, pic_content, extension_file_name): async def update_xhs_note_image(note_id, pic_content, extension_file_name):
""" """
更新小红书笔 更新小红书笔记图片
Args: Args:
note_id: note_id:
pic_content: pic_content:
@@ -224,5 +223,19 @@ async def update_xhs_note_image(note_id, pic_content, extension_file_name):
""" """
await XiaoHongShuImage().store_image( await XiaoHongShuImage().store_image({"notice_id": note_id, "pic_content": pic_content, "extension_file_name": extension_file_name})
{"notice_id": note_id, "pic_content": pic_content, "extension_file_name": extension_file_name})
async def update_xhs_note_video(note_id, video_content, extension_file_name):
"""
更新小红书笔记视频
Args:
note_id:
video_content:
extension_file_name:
Returns:
"""
await XiaoHongShuVideo().store_video({"notice_id": note_id, "video_content": video_content, "extension_file_name": extension_file_name})

View File

@@ -1,13 +1,12 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则: # 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。 # 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。 # 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。 # 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。 # 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。 # 5. 不得用于任何非法或不当的用途。
# #
# 详细许可条款请参阅项目根目录下的LICENSE文件。 # 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。 # 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# @Author : helloteemo # @Author : helloteemo
@@ -18,7 +17,7 @@ from typing import Dict
import aiofiles import aiofiles
from base.base_crawler import AbstractStoreImage from base.base_crawler import AbstractStoreImage, AbstractStoreVideo
from tools import utils from tools import utils
@@ -34,8 +33,7 @@ class XiaoHongShuImage(AbstractStoreImage):
Returns: Returns:
""" """
await self.save_image(image_content_item.get("notice_id"), image_content_item.get("pic_content"), await self.save_image(image_content_item.get("notice_id"), image_content_item.get("pic_content"), image_content_item.get("extension_file_name"))
image_content_item.get("extension_file_name"))
def make_save_file_name(self, notice_id: str, extension_file_name: str) -> str: def make_save_file_name(self, notice_id: str, extension_file_name: str) -> str:
""" """
@@ -49,7 +47,7 @@ class XiaoHongShuImage(AbstractStoreImage):
""" """
return f"{self.image_store_path}/{notice_id}/{extension_file_name}" return f"{self.image_store_path}/{notice_id}/{extension_file_name}"
async def save_image(self, notice_id: str, pic_content: str, extension_file_name="jpg"): async def save_image(self, notice_id: str, pic_content: str, extension_file_name):
""" """
save image to local save image to local
Args: Args:
@@ -64,3 +62,45 @@ class XiaoHongShuImage(AbstractStoreImage):
async with aiofiles.open(save_file_name, 'wb') as f: async with aiofiles.open(save_file_name, 'wb') as f:
await f.write(pic_content) await f.write(pic_content)
utils.logger.info(f"[XiaoHongShuImageStoreImplement.save_image] save image {save_file_name} success ...") utils.logger.info(f"[XiaoHongShuImageStoreImplement.save_image] save image {save_file_name} success ...")
class XiaoHongShuVideo(AbstractStoreVideo):
video_store_path: str = "data/xhs/videos"
async def store_video(self, video_content_item: Dict):
"""
store content
Args:
content_item:
Returns:
"""
await self.save_video(video_content_item.get("notice_id"), video_content_item.get("video_content"), video_content_item.get("extension_file_name"))
def make_save_file_name(self, notice_id: str, extension_file_name: str) -> str:
"""
make save file name by store type
Args:
notice_id: notice id
Returns:
"""
return f"{self.video_store_path}/{notice_id}/{extension_file_name}"
async def save_video(self, notice_id: str, video_content: str, extension_file_name):
"""
save image to local
Args:
notice_id: notice id
video_content: video content
Returns:
"""
pathlib.Path(self.video_store_path + "/" + notice_id).mkdir(parents=True, exist_ok=True)
save_file_name = self.make_save_file_name(notice_id, extension_file_name)
async with aiofiles.open(save_file_name, 'wb') as f:
await f.write(video_content)
utils.logger.info(f"[XiaoHongShuVideoStoreImplement.save_video] save video {save_file_name} success ...")