25 Commits

Author SHA1 Message Date
程序员阿江-Relakkes
05a1782746 Merge pull request #764 from yangtao210/main
新增存储到mongoDB
2025-11-06 06:10:49 -05:00
yt210
ef6948b305 新增存储到mongoDB 2025-11-06 10:40:30 +08:00
程序员阿江(Relakkes)
45ec4b433a docs: update 2025-11-06 00:08:03 +08:00
程序员阿江(Relakkes)
0074e975dd fix: dy search 2025-11-04 00:14:16 +08:00
程序员阿江(Relakkes)
889fa01466 fix: bili词云图修复 2025-11-02 13:25:31 +08:00
程序员阿江(Relakkes)
3f5925e326 feat: update xhs sign 2025-10-27 19:06:07 +08:00
程序员阿江(Relakkes)
ed6e0bfb5f refactor: tieba 改为浏览器获取数据 2025-10-19 17:09:55 +08:00
程序员阿江(Relakkes)
26a261bc09 Merge branch 'feature/config-refactor-20251018' 2025-10-19 15:32:42 +08:00
程序员阿江(Relakkes)
03e384bbe2 refactor: cdp模式下移除stealth注入 2025-10-19 15:32:03 +08:00
程序员阿江-Relakkes
56bf5d226f The configuration file supports URL crawling
Feature/config refactor 20251018
2025-10-18 07:42:14 +08:00
程序员阿江(Relakkes)
ae7955787c feat: kuaishou support url link 2025-10-18 07:40:10 +08:00
程序员阿江(Relakkes)
a9dd08680f feat: xhs support creator url link 2025-10-18 07:20:09 +08:00
程序员阿江(Relakkes)
cae707cb2a feat: douyin support url link 2025-10-18 07:00:21 +08:00
程序员阿江(Relakkes)
906c259cc7 feat: bilibili support url link 2025-10-18 06:30:20 +08:00
程序员阿江(Relakkes)
3b6fae8a62 docs: update README.md 2025-10-17 15:30:44 +08:00
程序员阿江-Relakkes
a72504a33d Merge pull request #739 from callmeiks/add-tikhub-sponsor
docs: resize TikHub banner to smaller size
2025-10-16 16:54:18 +08:00
Callmeiks
e177f799df docs: resize TikHub banner to smaller size 2025-10-16 01:51:55 -07:00
程序员阿江-Relakkes
1a5dcb6db7 Merge pull request #738 from callmeiks/add-tikhub-sponsor
docs: add TikHub as sponsor
2025-10-16 16:41:19 +08:00
Callmeiks
2c9eec544d docs: add TikHub as sponsor 2025-10-16 01:22:40 -07:00
程序员阿江(Relakkes)
d1f73e811c docs: update README.md 2025-10-12 21:19:11 +08:00
程序员阿江(Relakkes)
2d3e7555c6 docs: update README.md 2025-10-11 16:16:11 +08:00
程序员阿江(Relakkes)
3c5b9e8035 docs: update wechat qrcode 2025-10-02 14:27:10 +08:00
程序员阿江(Relakkes)
e6f3182ed7 Merge branch 'codex/replace-argparse-with-typer-for-cli' 2025-09-26 18:11:02 +08:00
程序员阿江(Relakkes)
2cf143cc7c fix: #730 2025-09-26 18:10:30 +08:00
程序员阿江-Relakkes
eb625b0b48 Merge pull request #729 from NanmiCoder/codex/replace-argparse-with-typer-for-cli
feat(cli): migrate CLI argument parsing to Typer
2025-09-26 18:08:21 +08:00
62 changed files with 2823 additions and 985 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -1,3 +1,5 @@
# 🔥 MediaCrawler - 自媒体平台爬虫 🕷️
<div align="center" markdown="1">
<sup>Special thanks to:</sup>
<br>
@@ -12,8 +14,6 @@
</div>
<hr>
# 🔥 MediaCrawler - 自媒体平台爬虫 🕷️
<div align="center">
<a href="https://trendshift.io/repositories/8291" target="_blank">
@@ -239,26 +239,59 @@ uv run main.py --platform xhs --lt qrcode --type search --save_data_option db
[🚀 MediaCrawlerPro 重磅发布 🚀!更多的功能,更好的架构设计!](https://github.com/MediaCrawlerPro)
### 💬 交流群组
- **微信交流群**[点击加入](https://nanmicoder.github.io/MediaCrawler/%E5%BE%AE%E4%BF%A1%E4%BA%A4%E6%B5%81%E7%BE%A4.html)
### 📚 其他
- **常见问题**[MediaCrawler 完整文档](https://nanmicoder.github.io/MediaCrawler/)
- **爬虫入门教程**[CrawlerTutorial 免费教程](https://github.com/NanmiCoder/CrawlerTutorial)
- **新闻爬虫开源项目**[NewsCrawlerCollection](https://github.com/NanmiCoder/NewsCrawlerCollection)
---
### 💰 赞助商展示
<a href="https://www.swiftproxy.net/?ref=nanmi">
<img src="docs/static/images/img_5.png">
<br>
Swiftproxy - 90M+ 全球高质量纯净住宅IP注册可领免费 500MB 测试流量,动态流量不过期!
> 专属折扣码:**GHB5** 立享九折优惠!
</a>
<br>
<br>
<a href="https://h.wandouip.com">
<img src="docs/static/images/img_8.jpg">
<br>
豌豆HTTP自营千万级IP资源池IP纯净度≥99.8%每日保持IP高频更新快速响应稳定连接满足多种业务场景支持按需定制注册免费提取10000ip。
豌豆HTTP自营千万级IP资源池IP纯净度≥99.8%每日保持IP高频更新快速响应稳定连接,满足多种业务场景支持按需定制注册免费提取10000ip。
</a>
---
<p align="center">
<a href="https://tikhub.io/?utm_source=github.com/NanmiCoder/MediaCrawler&utm_medium=marketing_social&utm_campaign=retargeting&utm_content=carousel_ad">
<img style="border-radius:20px" width="500" alt="TikHub IO_Banner zh" src="docs/static/images/tikhub_banner_zh.png">
</a>
</p>
[TikHub](https://tikhub.io/?utm_source=github.com/NanmiCoder/MediaCrawler&utm_medium=marketing_social&utm_campaign=retargeting&utm_content=carousel_ad) 提供超过 **700 个端点**,可用于从 **14+ 个社交媒体平台** 获取与分析数据 —— 包括视频、用户、评论、商店、商品与趋势等,一站式完成所有数据访问与分析。
通过每日签到,可以获取免费额度。可以使用我的注册链接:[https://user.tikhub.io/users/signup?referral_code=cfzyejV9](https://user.tikhub.io/users/signup?referral_code=cfzyejV9&utm_source=github.com/NanmiCoder/MediaCrawler&utm_medium=marketing_social&utm_campaign=retargeting&utm_content=carousel_ad) 或使用邀请码:`cfzyejV9`,注册并充值即可获得 **$2 免费额度**。
[TikHub](https://tikhub.io/?utm_source=github.com/NanmiCoder/MediaCrawler&utm_medium=marketing_social&utm_campaign=retargeting&utm_content=carousel_ad) 提供以下服务:
- 🚀 丰富的社交媒体数据接口TikTok、Douyin、XHS、YouTube、Instagram等
- 💎 每日签到免费领取额度
- ⚡ 高成功率与高并发支持
- 🌐 官网:[https://tikhub.io/](https://tikhub.io/?utm_source=github.com/NanmiCoder/MediaCrawler&utm_medium=marketing_social&utm_campaign=retargeting&utm_content=carousel_ad)
- 💻 GitHub地址[https://github.com/TikHubIO/](https://github.com/TikHubIO/)
---
<p align="center">
<a href="https://app.nstbrowser.io/account/register?utm_source=official&utm_term=mediacrawler">
<img style="border-radius:20px" alt="NstBrowser Banner " src="docs/static/images/nstbrowser.jpg">
</a>
</p>
Nstbrowser 指纹浏览器 — 多账号运营&自动化管理的最佳解决方案
<br>
多账号安全管理与会话隔离指纹定制结合反检测浏览器环境兼顾真实度与稳定性覆盖店铺管理、电商监控、社媒营销、广告验证、Web3、投放监控与联盟营销等业务线提供生产级并发与定制化企业服务提供可一键部署的云端浏览器方案配套全球高质量 IP 池,为您构建长期行业竞争力
<br>
[点击此处即刻开始免费使用](https://app.nstbrowser.io/account/register?utm_source=official&utm_term=mediacrawler)
<br>
使用 NSTBROWSER 可获得 10% 充值赠礼
### 🤝 成为赞助者
@@ -266,32 +299,9 @@ Swiftproxy - 90M+ 全球高质量纯净住宅IP注册可领免费 500MB 测
成为赞助者,可以将您的产品展示在这里,每天获得大量曝光!
**联系方式**
- 微信:`yzglan`
- 微信:`relakkes`
- 邮箱:`relakkes@gmail.com`
## 🤝 社区与支持
### 💬 交流群组
- **微信交流群**[点击加入](https://nanmicoder.github.io/MediaCrawler/%E5%BE%AE%E4%BF%A1%E4%BA%A4%E6%B5%81%E7%BE%A4.html)
### 📚 文档与教程
- **在线文档**[MediaCrawler 完整文档](https://nanmicoder.github.io/MediaCrawler/)
- **爬虫教程**[CrawlerTutorial 免费教程](https://github.com/NanmiCoder/CrawlerTutorial)
# 其他常见问题可以查看在线文档
>
> 在线文档包含使用方法、常见问题、加入项目交流群等。
> [MediaCrawler在线文档](https://nanmicoder.github.io/MediaCrawler/)
>
# 作者提供的知识服务
> 如果想快速入门和学习该项目的使用、源码架构设计等、学习编程技术、亦或者想了解MediaCrawlerPro的源代码设计可以看下我的知识付费栏目。
[作者的知识付费栏目介绍](https://nanmicoder.github.io/MediaCrawler/%E7%9F%A5%E8%AF%86%E4%BB%98%E8%B4%B9%E4%BB%8B%E7%BB%8D.html)
---
## ⭐ Star 趋势图

View File

@@ -282,7 +282,7 @@ If this project helps you, please give a ⭐ Star to support and let more people
Become a sponsor and showcase your product here, getting massive exposure daily!
**Contact Information**:
- WeChat: `yzglan`
- WeChat: `relakkes`
- Email: `relakkes@gmail.com`

View File

@@ -282,7 +282,7 @@ uv run main.py --platform xhs --lt qrcode --type search --save_data_option db
¡Conviértase en patrocinador y muestre su producto aquí, obteniendo exposición masiva diariamente!
**Información de Contacto**:
- WeChat: `yzglan`
- WeChat: `relakkes`
- Email: `relakkes@gmail.com`

View File

@@ -38,7 +38,7 @@ SAVE_LOGIN_STATE = True
# 是否启用CDP模式 - 使用用户现有的Chrome/Edge浏览器进行爬取提供更好的反检测能力
# 启用后将自动检测并启动用户的Chrome/Edge浏览器通过CDP协议进行控制
# 这种方式使用真实的浏览器环境包括用户的扩展、Cookie和设置大大降低被检测的风险
ENABLE_CDP_MODE = False
ENABLE_CDP_MODE = True
# CDP调试端口用于与浏览器通信
# 如果端口被占用,系统会自动尝试下一个可用端口
@@ -55,7 +55,7 @@ CUSTOM_BROWSER_PATH = ""
CDP_HEADLESS = False
# 浏览器启动超时时间(秒)
BROWSER_LAUNCH_TIMEOUT = 30
BROWSER_LAUNCH_TIMEOUT = 60
# 是否在程序结束时自动关闭浏览器
# 设置为False可以保持浏览器运行便于调试

View File

@@ -13,16 +13,23 @@
# 每天爬取视频/帖子的数量控制
MAX_NOTES_PER_DAY = 1
# 指定B站视频ID列表
# 指定B站视频URL列表 (支持完整URL或BV号)
# 示例:
# - 完整URL: "https://www.bilibili.com/video/BV1dwuKzmE26/?spm_id_from=333.1387.homepage.video_card.click"
# - BV号: "BV1d54y1g7db"
BILI_SPECIFIED_ID_LIST = [
"BV1d54y1g7db",
"https://www.bilibili.com/video/BV1dwuKzmE26/?spm_id_from=333.1387.homepage.video_card.click",
"BV1Sz4y1U77N",
"BV14Q4y1n7jz",
# ........................
]
# 指定B站用户ID列表
# 指定B站创作者URL列表 (支持完整URL或UID)
# 示例:
# - 完整URL: "https://space.bilibili.com/434377496?spm_id_from=333.1007.0.0"
# - UID: "20813884"
BILI_CREATOR_ID_LIST = [
"https://space.bilibili.com/434377496?spm_id_from=333.1007.0.0",
"20813884",
# ........................
]

View File

@@ -42,4 +42,19 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "datab
sqlite_db_config = {
"db_path": SQLITE_DB_PATH
}
# mongodb config
MONGODB_HOST = os.getenv("MONGODB_HOST", "localhost")
MONGODB_PORT = os.getenv("MONGODB_PORT", 27017)
MONGODB_USER = os.getenv("MONGODB_USER", "")
MONGODB_PWD = os.getenv("MONGODB_PWD", "")
MONGODB_DB_NAME = os.getenv("MONGODB_DB_NAME", "media_crawler")
mongodb_config = {
"host": MONGODB_HOST,
"port": int(MONGODB_PORT),
"user": MONGODB_USER,
"password": MONGODB_PWD,
"db_name": MONGODB_DB_NAME,
}

View File

@@ -11,15 +11,27 @@
# 抖音平台配置
PUBLISH_TIME_TYPE = 0
# 指定DY视频ID列表
# 指定DY视频URL列表 (支持多种格式)
# 支持格式:
# 1. 完整视频URL: "https://www.douyin.com/video/7525538910311632128"
# 2. 带modal_id的URL: "https://www.douyin.com/user/xxx?modal_id=7525538910311632128"
# 3. 搜索页带modal_id: "https://www.douyin.com/root/search/python?modal_id=7525538910311632128"
# 4. 短链接: "https://v.douyin.com/drIPtQ_WPWY/"
# 5. 纯视频ID: "7280854932641664319"
DY_SPECIFIED_ID_LIST = [
"7280854932641664319",
"7202432992642387233",
"https://www.douyin.com/video/7525538910311632128",
"https://v.douyin.com/drIPtQ_WPWY/",
"https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main&modal_id=7525538910311632128",
"7202432992642387233",
# ........................
]
# 指定DY用户ID列表
# 指定DY创作者URL列表 (支持完整URL或sec_user_id)
# 支持格式:
# 1. 完整创作者主页URL: "https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main"
# 2. sec_user_id: "MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE"
DY_CREATOR_ID_LIST = [
"MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE",
"https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main",
"MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE"
# ........................
]

View File

@@ -10,11 +10,22 @@
# 快手平台配置
# 指定快手视频ID列表
KS_SPECIFIED_ID_LIST = ["3xf8enb8dbj6uig", "3x6zz972bchmvqe"]
# 指定快手视频URL列表 (支持完整URL或纯ID)
# 支持格式:
# 1. 完整视频URL: "https://www.kuaishou.com/short-video/3x3zxz4mjrsc8ke?authorId=3x84qugg4ch9zhs&streamSource=search"
# 2. 纯视频ID: "3xf8enb8dbj6uig"
KS_SPECIFIED_ID_LIST = [
"https://www.kuaishou.com/short-video/3x3zxz4mjrsc8ke?authorId=3x84qugg4ch9zhs&streamSource=search&area=searchxxnull&searchKey=python",
"3xf8enb8dbj6uig",
# ........................
]
# 指定快手用户ID列表
# 指定快手创作者URL列表 (支持完整URL或纯ID)
# 支持格式:
# 1. 创作者主页URL: "https://www.kuaishou.com/profile/3x84qugg4ch9zhs"
# 2. 纯user_id: "3x4sm73aye7jq7i"
KS_CREATOR_ID_LIST = [
"https://www.kuaishou.com/profile/3x84qugg4ch9zhs",
"3x4sm73aye7jq7i",
# ........................
]

View File

@@ -17,12 +17,16 @@ SORT_TYPE = "popularity_descending"
# 指定笔记URL列表, 必须要携带xsec_token参数
XHS_SPECIFIED_NOTE_URL_LIST = [
"https://www.xiaohongshu.com/explore/66fad51c000000001b0224b8?xsec_token=AB3rO-QopW5sgrJ41GwN01WCXh6yWPxjSoFI9D5JIMgKw=&xsec_source=pc_search"
"https://www.xiaohongshu.com/explore/68f99f6d0000000007033fcf?xsec_token=ABZEzjuN2fPjKF9EcMsCCxfbt3IBRsFZldGFoCJbdDmXI=&xsec_source=pc_feed"
# ........................
]
# 指定用户ID列表
# 指定创作者URL列表 (支持完整URL或纯ID)
# 支持格式:
# 1. 完整创作者主页URL (带xsec_token和xsec_source参数): "https://www.xiaohongshu.com/user/profile/5eb8e1d400000000010075ae?xsec_token=AB1nWBKCo1vE2HEkfoJUOi5B6BE5n7wVrbdpHoWIj5xHw=&xsec_source=pc_feed"
# 2. 纯user_id: "63e36c9a000000002703502b"
XHS_CREATOR_ID_LIST = [
"63e36c9a000000002703502b",
"https://www.xiaohongshu.com/user/profile/5eb8e1d400000000010075ae?xsec_token=AB1nWBKCo1vE2HEkfoJUOi5B6BE5n7wVrbdpHoWIj5xHw=&xsec_source=pc_feed",
"63e36c9a000000002703502b",
# ........................
]

View File

@@ -59,7 +59,6 @@ export default defineConfig({
text: 'MediaCrawler源码剖析课',
link: 'https://relakkes.feishu.cn/wiki/JUgBwdhIeiSbAwkFCLkciHdAnhh'
},
{text: '知识星球文章专栏', link: '/知识星球介绍'},
{text: '开发者咨询服务', link: '/开发者咨询'},
]
},

View File

@@ -17,7 +17,7 @@
扫描下方我的个人微信备注pro版本如果图片展示不出来可以直接添加我的微信号yzglan
扫描下方我的个人微信备注pro版本如果图片展示不出来可以直接添加我的微信号relakkes
![relakkes_weichat.JPG](static/images/relakkes_weichat.jpg)

BIN
docs/static/images/nstbrowser.jpg vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 580 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 223 KiB

After

Width:  |  Height:  |  Size: 230 KiB

BIN
docs/static/images/tikhub_banner.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 750 KiB

BIN
docs/static/images/tikhub_banner_zh.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 758 KiB

View File

@@ -1,12 +1,12 @@
# 关于作者
> 大家都叫我阿江,网名:程序员阿江-Relakkes目前裸辞正探索自由职业,希望能靠自己的技术能力和努力,实现自己理想的生活方式
>
> 我身边有大量的技术人脉资源,如果大家有一些爬虫咨询或者编程单子可以向我丢过来
> 大家都叫我阿江,网名:程序员阿江-Relakkes目前是一名独立开发者,专注于 AI Agent 和爬虫相关的开发工作All in AI
- [Github万星开源自媒体爬虫仓库MediaCrawler作者](https://github.com/NanmiCoder/MediaCrawler)
- 全栈程序员熟悉Python、Golang、JavaScript工作中主要用Golang。
- 曾经主导并参与过百万级爬虫采集系统架构设计与编码
- 爬虫是一种技术兴趣爱好,参与爬虫有一种对抗的感觉,越难越兴奋。
- 目前专注于 AI Agent 领域,积极探索 AI 技术的应用与创新
- 如果你有 AI Agent 相关的项目需要合作,欢迎联系我,我有很多时间可以投入
## 微信联系方式
![relakkes_weichat.JPG](static/images/relakkes_weichat.jpg)

View File

@@ -7,6 +7,6 @@
## 加群方式
> 备注github会有拉群小助手自动拉你进群。
>
> 如果图片展示不出来或过期,可以直接添加我的微信号:yzglan并备注github会有拉群小助手自动拉你进群
> 如果图片展示不出来或过期,可以直接添加我的微信号:relakkes并备注github会有拉群小助手自动拉你进群
![relakkes_wechat](static/images/relakkes_weichat.jpg)

View File

@@ -15,5 +15,3 @@
## MediaCrawler源码剖析视频课程
[mediacrawler源码课程介绍](https://relakkes.feishu.cn/wiki/JUgBwdhIeiSbAwkFCLkciHdAnhh)
## 知识星球爬虫逆向、编程专栏
[知识星球专栏介绍](知识星球介绍.md)

View File

@@ -1,31 +0,0 @@
# 知识星球专栏
## 基本介绍
文章:
- 1.爬虫JS逆向案例分享
- 2.MediaCrawler技术实现分享。
- 3.沉淀python开发经验和技巧
- ......................
提问:
- 4.在星球内向我提问关于MediaCrawler、爬虫、编程任何问题
## 章节内容
- [逆向案例 - 某16x8平台商品列表接口逆向参数分析](https://articles.zsxq.com/id_x1qmtg8pzld9.html)
- [逆向案例 - Product Hunt月度最佳产品榜单接口加密参数分析](https://articles.zsxq.com/id_au4eich3x2sg.html)
- [逆向案例 - 某zhi乎x-zse-96参数分析过程](https://articles.zsxq.com/id_dui2vil0ag1l.html)
- [逆向案例 - 某x识星球X-Signature加密参数分析过程](https://articles.zsxq.com/id_pp4madwcwcg8.html)
- [【独创】使用Playwright获取某音a_bogus参数流程包含加密参数分析](https://articles.zsxq.com/id_u89al50jk9x0.html)
- [【独创】使用Playwright低成本获取某书X-s参数流程分析当年的回忆录](https://articles.zsxq.com/id_u4lcrvqakuc7.html)
- [ MediaCrawler-基于抽象类设计重构项目缓存](https://articles.zsxq.com/id_4ju73oxewt9j.html)
- [ 手把手带你撸一个自己的IP代理池](https://articles.zsxq.com/id_38fza371ladm.html)
- [一次Mysql数据库中混用collation排序规则带来的bug](https://articles.zsxq.com/id_pibwr1wnst2p.html)
- [错误使用 Python 可变类型带来的隐藏 Bug](https://articles.zsxq.com/id_f7vn89l1d303.html)
- [【MediaCrawler】微博帖子评论爬虫教程](https://articles.zsxq.com/id_vrmuhw0ovj3t.html)
- [Python协程在并发场景下的幂等性问题](https://articles.zsxq.com/id_wocdwsfmfcmp.html)
- ........................................
## 加入星球
![星球qrcode.JPG](static/images/星球qrcode.jpg)

14
main.py
View File

@@ -24,6 +24,8 @@ from media_platform.tieba import TieBaCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler
from media_platform.zhihu import ZhihuCrawler
from tools.async_file_writer import AsyncFileWriter
from var import crawler_type_var
class CrawlerFactory:
@@ -72,6 +74,18 @@ async def main():
crawler = CrawlerFactory.create_crawler(platform=config.PLATFORM)
await crawler.start()
# Generate wordcloud after crawling is complete
# Only for JSON save mode
if config.SAVE_DATA_OPTION == "json" and config.ENABLE_GET_WORDCLOUD:
try:
file_writer = AsyncFileWriter(
platform=config.PLATFORM,
crawler_type=crawler_type_var.get()
)
await file_writer.generate_wordcloud_from_comments()
except Exception as e:
print(f"Error generating wordcloud: {e}")
def cleanup():
if crawler:

View File

@@ -41,6 +41,7 @@ from var import crawler_type_var, source_keyword_var
from .client import BilibiliClient
from .exception import DataFetchError
from .field import SearchOrderType
from .help import parse_video_info_from_url, parse_creator_info_from_url
from .login import BilibiliLogin
@@ -77,8 +78,9 @@ class BilibiliCrawler(AbstractCrawler):
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(chromium, None, self.user_agent, headless=config.HEADLESS)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url)
@@ -103,8 +105,14 @@ class BilibiliCrawler(AbstractCrawler):
await self.get_specified_videos(config.BILI_SPECIFIED_ID_LIST)
elif config.CRAWLER_TYPE == "creator":
if config.CREATOR_MODE:
for creator_id in config.BILI_CREATOR_ID_LIST:
await self.get_creator_videos(int(creator_id))
for creator_url in config.BILI_CREATOR_ID_LIST:
try:
creator_info = parse_creator_info_from_url(creator_url)
utils.logger.info(f"[BilibiliCrawler.start] Parsed creator ID: {creator_info.creator_id} from {creator_url}")
await self.get_creator_videos(int(creator_info.creator_id))
except ValueError as e:
utils.logger.error(f"[BilibiliCrawler.start] Failed to parse creator URL: {e}")
continue
else:
await self.get_all_creator_details(config.BILI_CREATOR_ID_LIST)
else:
@@ -362,11 +370,23 @@ class BilibiliCrawler(AbstractCrawler):
utils.logger.info(f"[BilibiliCrawler.get_creator_videos] Sleeping for {config.CRAWLER_MAX_SLEEP_SEC} seconds after page {pn}")
pn += 1
async def get_specified_videos(self, bvids_list: List[str]):
async def get_specified_videos(self, video_url_list: List[str]):
"""
get specified videos info
get specified videos info from URLs or BV IDs
:param video_url_list: List of video URLs or BV IDs
:return:
"""
utils.logger.info("[BilibiliCrawler.get_specified_videos] Parsing video URLs...")
bvids_list = []
for video_url in video_url_list:
try:
video_info = parse_video_info_from_url(video_url)
bvids_list.append(video_info.video_id)
utils.logger.info(f"[BilibiliCrawler.get_specified_videos] Parsed video ID: {video_info.video_id} from {video_url}")
except ValueError as e:
utils.logger.error(f"[BilibiliCrawler.get_specified_videos] Failed to parse video URL: {e}")
continue
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_video_info_task(aid=0, bvid=video_id, semaphore=semaphore) for video_id in bvids_list]
video_details = await asyncio.gather(*task_list)
@@ -477,11 +497,12 @@ class BilibiliCrawler(AbstractCrawler):
"height": 1080
},
user_agent=user_agent,
channel="chrome", # 使用系统的Chrome稳定版
)
return browser_context
else:
# type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy)
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome")
browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
return browser_context
@@ -568,18 +589,30 @@ class BilibiliCrawler(AbstractCrawler):
extension_file_name = f"video.mp4"
await bilibili_store.store_video(aid, content, extension_file_name)
async def get_all_creator_details(self, creator_id_list: List[int]):
async def get_all_creator_details(self, creator_url_list: List[str]):
"""
creator_id_list: get details for creator from creator_id_list
creator_url_list: get details for creator from creator URL list
"""
utils.logger.info(f"[BilibiliCrawler.get_creator_details] Crawling the detalis of creator")
utils.logger.info(f"[BilibiliCrawler.get_creator_details] creator ids:{creator_id_list}")
utils.logger.info(f"[BilibiliCrawler.get_all_creator_details] Crawling the details of creators")
utils.logger.info(f"[BilibiliCrawler.get_all_creator_details] Parsing creator URLs...")
creator_id_list = []
for creator_url in creator_url_list:
try:
creator_info = parse_creator_info_from_url(creator_url)
creator_id_list.append(int(creator_info.creator_id))
utils.logger.info(f"[BilibiliCrawler.get_all_creator_details] Parsed creator ID: {creator_info.creator_id} from {creator_url}")
except ValueError as e:
utils.logger.error(f"[BilibiliCrawler.get_all_creator_details] Failed to parse creator URL: {e}")
continue
utils.logger.info(f"[BilibiliCrawler.get_all_creator_details] creator ids:{creator_id_list}")
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list: List[Task] = []
try:
for creator_id in creator_id_list:
task = asyncio.create_task(self.get_creator_details(creator_id, semaphore), name=creator_id)
task = asyncio.create_task(self.get_creator_details(creator_id, semaphore), name=str(creator_id))
task_list.append(task)
except Exception as e:
utils.logger.warning(f"[BilibiliCrawler.get_all_creator_details] error in the task list. The creator will not be included. {e}")

View File

@@ -9,15 +9,17 @@
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# @Author : relakkes@gmail.com
# @Time : 2023/12/2 23:26
# @Desc : bilibili 请求参数签名
# 逆向实现参考https://socialsisteryi.github.io/bilibili-API-collect/docs/misc/sign/wbi.html#wbi%E7%AD%BE%E5%90%8D%E7%AE%97%E6%B3%95
import re
import urllib.parse
from hashlib import md5
from typing import Dict
from model.m_bilibili import VideoUrlInfo, CreatorUrlInfo
from tools import utils
@@ -66,16 +68,71 @@ class BilibiliSign:
return req_data
def parse_video_info_from_url(url: str) -> VideoUrlInfo:
"""
从B站视频URL中解析出视频ID
Args:
url: B站视频链接
- https://www.bilibili.com/video/BV1dwuKzmE26/?spm_id_from=333.1387.homepage.video_card.click
- https://www.bilibili.com/video/BV1d54y1g7db
- BV1d54y1g7db (直接传入BV号)
Returns:
VideoUrlInfo: 包含视频ID的对象
"""
# 如果传入的已经是BV号,直接返回
if url.startswith("BV"):
return VideoUrlInfo(video_id=url)
# 使用正则表达式提取BV号
# 匹配 /video/BV... 或 /video/av... 格式
bv_pattern = r'/video/(BV[a-zA-Z0-9]+)'
match = re.search(bv_pattern, url)
if match:
video_id = match.group(1)
return VideoUrlInfo(video_id=video_id)
raise ValueError(f"无法从URL中解析出视频ID: {url}")
def parse_creator_info_from_url(url: str) -> CreatorUrlInfo:
"""
从B站创作者空间URL中解析出创作者ID
Args:
url: B站创作者空间链接
- https://space.bilibili.com/434377496?spm_id_from=333.1007.0.0
- https://space.bilibili.com/20813884
- 434377496 (直接传入UID)
Returns:
CreatorUrlInfo: 包含创作者ID的对象
"""
# 如果传入的已经是纯数字ID,直接返回
if url.isdigit():
return CreatorUrlInfo(creator_id=url)
# 使用正则表达式提取UID
# 匹配 /space.bilibili.com/数字 格式
uid_pattern = r'space\.bilibili\.com/(\d+)'
match = re.search(uid_pattern, url)
if match:
creator_id = match.group(1)
return CreatorUrlInfo(creator_id=creator_id)
raise ValueError(f"无法从URL中解析出创作者ID: {url}")
if __name__ == '__main__':
_img_key = "7cd084941338484aae1ad9425b84077c"
_sub_key = "4932caff0ff746eab6f01bf08b70ac45"
_search_url = "__refresh__=true&_extra=&ad_resource=5654&category_id=&context=&dynamic_offset=0&from_source=&from_spmid=333.337&gaia_vtoken=&highlight=1&keyword=python&order=click&page=1&page_size=20&platform=pc&qv_id=OQ8f2qtgYdBV1UoEnqXUNUl8LEDAdzsD&search_type=video&single_column=0&source_tag=3&web_location=1430654"
_req_data = dict()
for params in _search_url.split("&"):
kvalues = params.split("=")
key = kvalues[0]
value = kvalues[1]
_req_data[key] = value
print("pre req_data", _req_data)
_req_data = BilibiliSign(img_key=_img_key, sub_key=_sub_key).sign(req_data={"aid":170001})
print(_req_data)
# 测试视频URL解析
video_url1 = "https://www.bilibili.com/video/BV1dwuKzmE26/?spm_id_from=333.1387.homepage.video_card.click"
video_url2 = "BV1d54y1g7db"
print("视频URL解析测试:")
print(f"URL1: {video_url1} -> {parse_video_info_from_url(video_url1)}")
print(f"URL2: {video_url2} -> {parse_video_info_from_url(video_url2)}")
# 测试创作者URL解析
creator_url1 = "https://space.bilibili.com/434377496?spm_id_from=333.1007.0.0"
creator_url2 = "20813884"
print("\n创作者URL解析测试:")
print(f"URL1: {creator_url1} -> {parse_creator_info_from_url(creator_url1)}")
print(f"URL2: {creator_url2} -> {parse_creator_info_from_url(creator_url2)}")

View File

@@ -91,8 +91,10 @@ class DouYinClient(AbstractApiClient):
post_data = {}
if request_method == "POST":
post_data = params
a_bogus = await get_a_bogus(uri, query_string, post_data, headers["User-Agent"], self.playwright_page)
params["a_bogus"] = a_bogus
if "/v1/web/general/search" not in uri:
a_bogus = await get_a_bogus(uri, query_string, post_data, headers["User-Agent"], self.playwright_page)
params["a_bogus"] = a_bogus
async def request(self, method, url, **kwargs):
async with httpx.AsyncClient(proxy=self.proxy) as client:
@@ -324,3 +326,28 @@ class DouYinClient(AbstractApiClient):
except httpx.HTTPError as exc: # some wrong when call httpx.request method, such as connection error, client error, server error or response status code is not 2xx
utils.logger.error(f"[DouYinClient.get_aweme_media] {exc.__class__.__name__} for {exc.request.url} - {exc}") # 保留原始异常类型名称,以便开发者调试
return None
async def resolve_short_url(self, short_url: str) -> str:
"""
解析抖音短链接,获取重定向后的真实URL
Args:
short_url: 短链接,如 https://v.douyin.com/iF12345ABC/
Returns:
重定向后的完整URL
"""
async with httpx.AsyncClient(proxy=self.proxy, follow_redirects=False) as client:
try:
utils.logger.info(f"[DouYinClient.resolve_short_url] Resolving short URL: {short_url}")
response = await client.get(short_url, timeout=10)
# 短链接通常返回302重定向
if response.status_code in [301, 302, 303, 307, 308]:
redirect_url = response.headers.get("Location", "")
utils.logger.info(f"[DouYinClient.resolve_short_url] Resolved to: {redirect_url}")
return redirect_url
else:
utils.logger.warning(f"[DouYinClient.resolve_short_url] Unexpected status code: {response.status_code}")
return ""
except Exception as e:
utils.logger.error(f"[DouYinClient.resolve_short_url] Failed to resolve short URL: {e}")
return ""

View File

@@ -33,6 +33,7 @@ from var import crawler_type_var, source_keyword_var
from .client import DouYinClient
from .exception import DataFetchError
from .field import PublishTimeType
from .help import parse_video_info_from_url, parse_creator_info_from_url
from .login import DouYinLogin
@@ -73,8 +74,9 @@ class DouYinCrawler(AbstractCrawler):
user_agent=None,
headless=config.HEADLESS,
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url)
@@ -154,15 +156,39 @@ class DouYinCrawler(AbstractCrawler):
await self.batch_get_note_comments(aweme_list)
async def get_specified_awemes(self):
"""Get the information and comments of the specified post"""
"""Get the information and comments of the specified post from URLs or IDs"""
utils.logger.info("[DouYinCrawler.get_specified_awemes] Parsing video URLs...")
aweme_id_list = []
for video_url in config.DY_SPECIFIED_ID_LIST:
try:
video_info = parse_video_info_from_url(video_url)
# 处理短链接
if video_info.url_type == "short":
utils.logger.info(f"[DouYinCrawler.get_specified_awemes] Resolving short link: {video_url}")
resolved_url = await self.dy_client.resolve_short_url(video_url)
if resolved_url:
# 从解析后的URL中提取视频ID
video_info = parse_video_info_from_url(resolved_url)
utils.logger.info(f"[DouYinCrawler.get_specified_awemes] Short link resolved to aweme ID: {video_info.aweme_id}")
else:
utils.logger.error(f"[DouYinCrawler.get_specified_awemes] Failed to resolve short link: {video_url}")
continue
aweme_id_list.append(video_info.aweme_id)
utils.logger.info(f"[DouYinCrawler.get_specified_awemes] Parsed aweme ID: {video_info.aweme_id} from {video_url}")
except ValueError as e:
utils.logger.error(f"[DouYinCrawler.get_specified_awemes] Failed to parse video URL: {e}")
continue
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in config.DY_SPECIFIED_ID_LIST]
task_list = [self.get_aweme_detail(aweme_id=aweme_id, semaphore=semaphore) for aweme_id in aweme_id_list]
aweme_details = await asyncio.gather(*task_list)
for aweme_detail in aweme_details:
if aweme_detail is not None:
await douyin_store.update_douyin_aweme(aweme_item=aweme_detail)
await self.get_aweme_media(aweme_item=aweme_detail)
await self.batch_get_note_comments(config.DY_SPECIFIED_ID_LIST)
await self.batch_get_note_comments(aweme_id_list)
async def get_aweme_detail(self, aweme_id: str, semaphore: asyncio.Semaphore) -> Any:
"""Get note detail"""
@@ -218,10 +244,20 @@ class DouYinCrawler(AbstractCrawler):
async def get_creators_and_videos(self) -> None:
"""
Get the information and videos of the specified creator
Get the information and videos of the specified creator from URLs or IDs
"""
utils.logger.info("[DouYinCrawler.get_creators_and_videos] Begin get douyin creators")
for user_id in config.DY_CREATOR_ID_LIST:
utils.logger.info("[DouYinCrawler.get_creators_and_videos] Parsing creator URLs...")
for creator_url in config.DY_CREATOR_ID_LIST:
try:
creator_info_parsed = parse_creator_info_from_url(creator_url)
user_id = creator_info_parsed.sec_user_id
utils.logger.info(f"[DouYinCrawler.get_creators_and_videos] Parsed sec_user_id: {user_id} from {creator_url}")
except ValueError as e:
utils.logger.error(f"[DouYinCrawler.get_creators_and_videos] Failed to parse creator URL: {e}")
continue
creator_info: Dict = await self.dy_client.get_user_info(user_id)
if creator_info:
await douyin_store.save_creator(user_id, creator=creator_info)

View File

@@ -16,10 +16,15 @@
# @Desc : 获取 a_bogus 参数, 学习交流使用,请勿用作商业用途,侵权联系作者删除
import random
import re
from typing import Optional
import execjs
from playwright.async_api import Page
from model.m_douyin import VideoUrlInfo, CreatorUrlInfo
from tools.crawler_util import extract_url_params_to_dict
douyin_sign_obj = execjs.compile(open('libs/douyin.js', encoding='utf-8-sig').read())
def get_web_id():
@@ -83,3 +88,103 @@ async def get_a_bogus_from_playright(params: str, post_data: dict, user_agent: s
return a_bogus
def parse_video_info_from_url(url: str) -> VideoUrlInfo:
"""
从抖音视频URL中解析出视频ID
支持以下格式:
1. 普通视频链接: https://www.douyin.com/video/7525082444551310602
2. 带modal_id参数的链接:
- https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?modal_id=7525082444551310602
- https://www.douyin.com/root/search/python?modal_id=7471165520058862848
3. 短链接: https://v.douyin.com/iF12345ABC/ (需要client解析)
4. 纯ID: 7525082444551310602
Args:
url: 抖音视频链接或ID
Returns:
VideoUrlInfo: 包含视频ID的对象
"""
# 如果是纯数字ID,直接返回
if url.isdigit():
return VideoUrlInfo(aweme_id=url, url_type="normal")
# 检查是否是短链接 (v.douyin.com)
if "v.douyin.com" in url or url.startswith("http") and len(url) < 50 and "video" not in url:
return VideoUrlInfo(aweme_id="", url_type="short") # 需要通过client解析
# 尝试从URL参数中提取modal_id
params = extract_url_params_to_dict(url)
modal_id = params.get("modal_id")
if modal_id:
return VideoUrlInfo(aweme_id=modal_id, url_type="modal")
# 从标准视频URL中提取ID: /video/数字
video_pattern = r'/video/(\d+)'
match = re.search(video_pattern, url)
if match:
aweme_id = match.group(1)
return VideoUrlInfo(aweme_id=aweme_id, url_type="normal")
raise ValueError(f"无法从URL中解析出视频ID: {url}")
def parse_creator_info_from_url(url: str) -> CreatorUrlInfo:
"""
从抖音创作者主页URL中解析出创作者ID (sec_user_id)
支持以下格式:
1. 创作者主页: https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main
2. 纯ID: MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE
Args:
url: 抖音创作者主页链接或sec_user_id
Returns:
CreatorUrlInfo: 包含创作者ID的对象
"""
# 如果是纯ID格式(通常以MS4wLjABAAAA开头),直接返回
if url.startswith("MS4wLjABAAAA") or (not url.startswith("http") and "douyin.com" not in url):
return CreatorUrlInfo(sec_user_id=url)
# 从创作者主页URL中提取sec_user_id: /user/xxx
user_pattern = r'/user/([^/?]+)'
match = re.search(user_pattern, url)
if match:
sec_user_id = match.group(1)
return CreatorUrlInfo(sec_user_id=sec_user_id)
raise ValueError(f"无法从URL中解析出创作者ID: {url}")
if __name__ == '__main__':
# 测试视频URL解析
print("=== 视频URL解析测试 ===")
test_urls = [
"https://www.douyin.com/video/7525082444551310602",
"https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main&modal_id=7525082444551310602",
"https://www.douyin.com/root/search/python?aid=b733a3b0-4662-4639-9a72-c2318fba9f3f&modal_id=7471165520058862848&type=general",
"7525082444551310602",
]
for url in test_urls:
try:
result = parse_video_info_from_url(url)
print(f"✓ URL: {url[:80]}...")
print(f" 结果: {result}\n")
except Exception as e:
print(f"✗ URL: {url}")
print(f" 错误: {e}\n")
# 测试创作者URL解析
print("=== 创作者URL解析测试 ===")
test_creator_urls = [
"https://www.douyin.com/user/MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE?from_tab_name=main",
"MS4wLjABAAAATJPY7LAlaa5X-c8uNdWkvz0jUGgpw4eeXIwu_8BhvqE",
]
for url in test_creator_urls:
try:
result = parse_creator_info_from_url(url)
print(f"✓ URL: {url[:80]}...")
print(f" 结果: {result}\n")
except Exception as e:
print(f"✗ URL: {url}")
print(f" 错误: {e}\n")

View File

@@ -26,6 +26,7 @@ from playwright.async_api import (
import config
from base.base_crawler import AbstractCrawler
from model.m_kuaishou import VideoUrlInfo, CreatorUrlInfo
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import kuaishou as kuaishou_store
from tools import utils
@@ -34,6 +35,7 @@ from var import comment_tasks_var, crawler_type_var, source_keyword_var
from .client import KuaiShouClient
from .exception import DataFetchError
from .help import parse_video_info_from_url, parse_creator_info_from_url
from .login import KuaishouLogin
@@ -76,8 +78,10 @@ class KuaishouCrawler(AbstractCrawler):
self.browser_context = await self.launch_browser(
chromium, None, self.user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(f"{self.index_url}?isHome=1")
@@ -168,16 +172,27 @@ class KuaishouCrawler(AbstractCrawler):
async def get_specified_videos(self):
"""Get the information and comments of the specified post"""
utils.logger.info("[KuaishouCrawler.get_specified_videos] Parsing video URLs...")
video_ids = []
for video_url in config.KS_SPECIFIED_ID_LIST:
try:
video_info = parse_video_info_from_url(video_url)
video_ids.append(video_info.video_id)
utils.logger.info(f"Parsed video ID: {video_info.video_id} from {video_url}")
except ValueError as e:
utils.logger.error(f"Failed to parse video URL: {e}")
continue
semaphore = asyncio.Semaphore(config.MAX_CONCURRENCY_NUM)
task_list = [
self.get_video_info_task(video_id=video_id, semaphore=semaphore)
for video_id in config.KS_SPECIFIED_ID_LIST
for video_id in video_ids
]
video_details = await asyncio.gather(*task_list)
for video_detail in video_details:
if video_detail is not None:
await kuaishou_store.update_kuaishou_video(video_detail)
await self.batch_get_video_comments(config.KS_SPECIFIED_ID_LIST)
await self.batch_get_video_comments(video_ids)
async def get_video_info_task(
self, video_id: str, semaphore: asyncio.Semaphore
@@ -318,10 +333,11 @@ class KuaishouCrawler(AbstractCrawler):
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent,
channel="chrome", # 使用系统的Chrome稳定版
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)
@@ -367,16 +383,25 @@ class KuaishouCrawler(AbstractCrawler):
utils.logger.info(
"[KuaiShouCrawler.get_creators_and_videos] Begin get kuaishou creators"
)
for user_id in config.KS_CREATOR_ID_LIST:
# get creator detail info from web html content
createor_info: Dict = await self.ks_client.get_creator_info(user_id=user_id)
if createor_info:
await kuaishou_store.save_creator(user_id, creator=createor_info)
for creator_url in config.KS_CREATOR_ID_LIST:
try:
# Parse creator URL to get user_id
creator_info: CreatorUrlInfo = parse_creator_info_from_url(creator_url)
utils.logger.info(f"[KuaiShouCrawler.get_creators_and_videos] Parse creator URL info: {creator_info}")
user_id = creator_info.user_id
# get creator detail info from web html content
createor_info: Dict = await self.ks_client.get_creator_info(user_id=user_id)
if createor_info:
await kuaishou_store.save_creator(user_id, creator=createor_info)
except ValueError as e:
utils.logger.error(f"[KuaiShouCrawler.get_creators_and_videos] Failed to parse creator URL: {e}")
continue
# Get all video information of the creator
all_video_list = await self.ks_client.get_all_videos_by_creator(
user_id=user_id,
crawl_interval=random.random(),
crawl_interval=config.CRAWLER_MAX_SLEEP_SEC,
callback=self.fetch_creator_video_detail,
)

View File

@@ -0,0 +1,99 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
import re
from model.m_kuaishou import VideoUrlInfo, CreatorUrlInfo
def parse_video_info_from_url(url: str) -> VideoUrlInfo:
"""
从快手视频URL中解析出视频ID
支持以下格式:
1. 完整视频URL: "https://www.kuaishou.com/short-video/3x3zxz4mjrsc8ke?authorId=3x84qugg4ch9zhs&streamSource=search"
2. 纯视频ID: "3x3zxz4mjrsc8ke"
Args:
url: 快手视频链接或视频ID
Returns:
VideoUrlInfo: 包含视频ID的对象
"""
# 如果不包含http且不包含kuaishou.com认为是纯ID
if not url.startswith("http") and "kuaishou.com" not in url:
return VideoUrlInfo(video_id=url, url_type="normal")
# 从标准视频URL中提取ID: /short-video/视频ID
video_pattern = r'/short-video/([a-zA-Z0-9_-]+)'
match = re.search(video_pattern, url)
if match:
video_id = match.group(1)
return VideoUrlInfo(video_id=video_id, url_type="normal")
raise ValueError(f"无法从URL中解析出视频ID: {url}")
def parse_creator_info_from_url(url: str) -> CreatorUrlInfo:
"""
从快手创作者主页URL中解析出创作者ID
支持以下格式:
1. 创作者主页: "https://www.kuaishou.com/profile/3x84qugg4ch9zhs"
2. 纯ID: "3x4sm73aye7jq7i"
Args:
url: 快手创作者主页链接或user_id
Returns:
CreatorUrlInfo: 包含创作者ID的对象
"""
# 如果不包含http且不包含kuaishou.com认为是纯ID
if not url.startswith("http") and "kuaishou.com" not in url:
return CreatorUrlInfo(user_id=url)
# 从创作者主页URL中提取user_id: /profile/xxx
user_pattern = r'/profile/([a-zA-Z0-9_-]+)'
match = re.search(user_pattern, url)
if match:
user_id = match.group(1)
return CreatorUrlInfo(user_id=user_id)
raise ValueError(f"无法从URL中解析出创作者ID: {url}")
if __name__ == '__main__':
# 测试视频URL解析
print("=== 视频URL解析测试 ===")
test_video_urls = [
"https://www.kuaishou.com/short-video/3x3zxz4mjrsc8ke?authorId=3x84qugg4ch9zhs&streamSource=search&area=searchxxnull&searchKey=python",
"3xf8enb8dbj6uig",
]
for url in test_video_urls:
try:
result = parse_video_info_from_url(url)
print(f"✓ URL: {url[:80]}...")
print(f" 结果: {result}\n")
except Exception as e:
print(f"✗ URL: {url}")
print(f" 错误: {e}\n")
# 测试创作者URL解析
print("=== 创作者URL解析测试 ===")
test_creator_urls = [
"https://www.kuaishou.com/profile/3x84qugg4ch9zhs",
"3x4sm73aye7jq7i",
]
for url in test_creator_urls:
try:
result = parse_creator_info_from_url(url)
print(f"✓ URL: {url[:80]}...")
print(f" 结果: {result}\n")
except Exception as e:
print(f"✗ URL: {url}")
print(f" 错误: {e}\n")

View File

@@ -11,10 +11,10 @@
import asyncio
import json
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
from urllib.parse import urlencode, quote
import httpx
from playwright.async_api import BrowserContext
import requests
from playwright.async_api import BrowserContext, Page
from tenacity import RetryError, retry, stop_after_attempt, wait_fixed
import config
@@ -34,34 +34,76 @@ class BaiduTieBaClient(AbstractApiClient):
timeout=10,
ip_pool=None,
default_ip_proxy=None,
headers: Dict[str, str] = None,
playwright_page: Optional[Page] = None,
):
self.ip_pool: Optional[ProxyIpPool] = ip_pool
self.timeout = timeout
self.headers = {
# 使用传入的headers(包含真实浏览器UA)或默认headers
self.headers = headers or {
"User-Agent": utils.get_user_agent(),
"Cookies": "",
"Cookie": "",
}
self._host = "https://tieba.baidu.com"
self._page_extractor = TieBaExtractor()
self.default_ip_proxy = default_ip_proxy
self.playwright_page = playwright_page # Playwright页面对象
def _sync_request(self, method, url, proxy=None, **kwargs):
"""
同步的requests请求方法
Args:
method: 请求方法
url: 请求的URL
proxy: 代理IP
**kwargs: 其他请求参数
Returns:
response对象
"""
# 构造代理字典
proxies = None
if proxy:
proxies = {
"http": proxy,
"https": proxy,
}
# 发送请求
response = requests.request(
method=method,
url=url,
headers=self.headers,
proxies=proxies,
timeout=self.timeout,
**kwargs
)
return response
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def request(self, method, url, return_ori_content=False, proxy=None, **kwargs) -> Union[str, Any]:
"""
封装httpx的公共请求方法,对请求响应做一些处理
封装requests的公共请求方法,对请求响应做一些处理
Args:
method: 请求方法
url: 请求的URL
return_ori_content: 是否返回原始内容
proxies: 代理IP
proxy: 代理IP
**kwargs: 其他请求参数,例如请求头、请求体等
Returns:
"""
actual_proxy = proxy if proxy else self.default_ip_proxy
async with httpx.AsyncClient(proxy=actual_proxy) as client:
response = await client.request(method, url, timeout=self.timeout, headers=self.headers, **kwargs)
# 在线程池中执行同步的requests请求
response = await asyncio.to_thread(
self._sync_request,
method,
url,
actual_proxy,
**kwargs
)
if response.status_code != 200:
utils.logger.error(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}")
@@ -69,7 +111,7 @@ class BaiduTieBaClient(AbstractApiClient):
raise Exception(f"Request failed, method: {method}, url: {url}, status code: {response.status_code}")
if response.text == "" or response.text == "blocked":
utils.logger.error(f"request params incrr, response.text: {response.text}")
utils.logger.error(f"request params incorrect, response.text: {response.text}")
raise Exception("account blocked")
if return_ori_content:
@@ -119,26 +161,41 @@ class BaiduTieBaClient(AbstractApiClient):
json_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return await self.request(method="POST", url=f"{self._host}{uri}", data=json_str, **kwargs)
async def pong(self) -> bool:
async def pong(self, browser_context: BrowserContext = None) -> bool:
"""
用于检查登录态是否失效了
Returns:
使用Cookie检测而非API调用,避免被检测
Args:
browser_context: 浏览器上下文对象
Returns:
bool: True表示已登录,False表示未登录
"""
utils.logger.info("[BaiduTieBaClient.pong] Begin to pong tieba...")
utils.logger.info("[BaiduTieBaClient.pong] Begin to check tieba login state by cookies...")
if not browser_context:
utils.logger.warning("[BaiduTieBaClient.pong] browser_context is None, assume not logged in")
return False
try:
uri = "/mo/q/sync"
res: Dict = await self.get(uri)
utils.logger.info(f"[BaiduTieBaClient.pong] res: {res}")
if res and res.get("no") == 0:
ping_flag = True
# 从浏览器获取cookies并检查关键登录cookie
_, cookie_dict = utils.convert_cookies(await browser_context.cookies())
# 百度贴吧的登录标识: STOKEN 或 PTOKEN
stoken = cookie_dict.get("STOKEN")
ptoken = cookie_dict.get("PTOKEN")
bduss = cookie_dict.get("BDUSS") # 百度通用登录cookie
if stoken or ptoken or bduss:
utils.logger.info(f"[BaiduTieBaClient.pong] Login state verified by cookies (STOKEN: {bool(stoken)}, PTOKEN: {bool(ptoken)}, BDUSS: {bool(bduss)})")
return True
else:
utils.logger.info(f"[BaiduTieBaClient.pong] user not login, will try to login again...")
ping_flag = False
utils.logger.info("[BaiduTieBaClient.pong] No valid login cookies found, need to login")
return False
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.pong] Ping tieba failed: {e}, and try to login again...")
ping_flag = False
return ping_flag
utils.logger.error(f"[BaiduTieBaClient.pong] Check login state failed: {e}, assume not logged in")
return False
async def update_cookies(self, browser_context: BrowserContext):
"""
@@ -149,7 +206,9 @@ class BaiduTieBaClient(AbstractApiClient):
Returns:
"""
pass
cookie_str, cookie_dict = utils.convert_cookies(await browser_context.cookies())
self.headers["Cookie"] = cookie_str
utils.logger.info("[BaiduTieBaClient.update_cookies] Cookie has been updated")
async def get_notes_by_keyword(
self,
@@ -160,7 +219,7 @@ class BaiduTieBaClient(AbstractApiClient):
note_type: SearchNoteType = SearchNoteType.FIXED_THREAD,
) -> List[TiebaNote]:
"""
根据关键词搜索贴吧帖子
根据关键词搜索贴吧帖子 (使用Playwright访问页面,避免API检测)
Args:
keyword: 关键词
page: 分页第几页
@@ -170,30 +229,81 @@ class BaiduTieBaClient(AbstractApiClient):
Returns:
"""
uri = "/f/search/res"
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_notes_by_keyword] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based search")
# 构造搜索URL
# 示例: https://tieba.baidu.com/f/search/res?ie=utf-8&qw=编程
search_url = f"{self._host}/f/search/res"
params = {
"isnew": 1,
"ie": "utf-8",
"qw": keyword,
"rn": page_size,
"pn": page,
"sm": sort.value,
"only_thread": note_type.value,
}
page_content = await self.get(uri, params=params, return_ori_content=True)
return self._page_extractor.extract_search_note_list(page_content)
# 拼接完整URL
full_url = f"{search_url}?{urlencode(params)}"
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 访问搜索页面: {full_url}")
try:
# 使用Playwright访问搜索页面
await self.playwright_page.goto(full_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 成功获取搜索页面HTML,长度: {len(page_content)}")
# 提取搜索结果
notes = self._page_extractor.extract_search_note_list(page_content)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_keyword] 提取到 {len(notes)} 条帖子")
return notes
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_keyword] 搜索失败: {e}")
raise
async def get_note_by_id(self, note_id: str) -> TiebaNote:
"""
根据帖子ID获取帖子详情
根据帖子ID获取帖子详情 (使用Playwright访问页面,避免API检测)
Args:
note_id:
note_id: 帖子ID
Returns:
TiebaNote: 帖子详情对象
"""
uri = f"/p/{note_id}"
page_content = await self.get(uri, return_ori_content=True)
return self._page_extractor.extract_note_detail(page_content)
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_note_by_id] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based note detail fetching")
# 构造帖子详情URL
note_url = f"{self._host}/p/{note_id}"
utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] 访问帖子详情页面: {note_url}")
try:
# 使用Playwright访问帖子详情页面
await self.playwright_page.goto(note_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_note_by_id] 成功获取帖子详情HTML,长度: {len(page_content)}")
# 提取帖子详情
note_detail = self._page_extractor.extract_note_detail(page_content)
return note_detail
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_note_by_id] 获取帖子详情失败: {e}")
raise
async def get_note_all_comments(
self,
@@ -203,35 +313,68 @@ class BaiduTieBaClient(AbstractApiClient):
max_count: int = 10,
) -> List[TiebaComment]:
"""
获取指定帖子下的所有一级评论,该方法会一直查找一个帖子下的所有评论信息
获取指定帖子下的所有一级评论 (使用Playwright访问页面,避免API检测)
Args:
note_detail: 帖子详情对象
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
callback: 一次笔记爬取结束后的回调函数
max_count: 一次帖子爬取的最大评论数量
Returns:
List[TiebaComment]: 评论列表
"""
uri = f"/p/{note_detail.note_id}"
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_note_all_comments] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based comment fetching")
result: List[TiebaComment] = []
current_page = 1
while note_detail.total_replay_page >= current_page and len(result) < max_count:
params = {
"pn": current_page,
}
page_content = await self.get(uri, params=params, return_ori_content=True)
comments = self._page_extractor.extract_tieba_note_parment_comments(page_content, note_id=note_detail.note_id)
if not comments:
# 构造评论页URL
comment_url = f"{self._host}/p/{note_detail.note_id}?pn={current_page}"
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 访问评论页面: {comment_url}")
try:
# 使用Playwright访问评论页面
await self.playwright_page.goto(comment_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
# 提取评论
comments = self._page_extractor.extract_tieba_note_parment_comments(
page_content, note_id=note_detail.note_id
)
if not comments:
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 第{current_page}页没有评论,停止爬取")
break
# 限制评论数量
if len(result) + len(comments) > max_count:
comments = comments[:max_count - len(result)]
if callback:
await callback(note_detail.note_id, comments)
result.extend(comments)
# 获取所有子评论
await self.get_comments_all_sub_comments(
comments, crawl_interval=crawl_interval, callback=callback
)
await asyncio.sleep(crawl_interval)
current_page += 1
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_note_all_comments] 获取第{current_page}页评论失败: {e}")
break
if len(result) + len(comments) > max_count:
comments = comments[:max_count - len(result)]
if callback:
await callback(note_detail.note_id, comments)
result.extend(comments)
# 获取所有子评论
await self.get_comments_all_sub_comments(comments, crawl_interval=crawl_interval, callback=callback)
await asyncio.sleep(crawl_interval)
current_page += 1
utils.logger.info(f"[BaiduTieBaClient.get_note_all_comments] 共获取 {len(result)} 条一级评论")
return result
async def get_comments_all_sub_comments(
@@ -241,93 +384,194 @@ class BaiduTieBaClient(AbstractApiClient):
callback: Optional[Callable] = None,
) -> List[TiebaComment]:
"""
获取指定评论下的所有子评论
获取指定评论下的所有子评论 (使用Playwright访问页面,避免API检测)
Args:
comments: 评论列表
crawl_interval: 爬取一次笔记的延迟单位(秒)
callback: 一次笔记爬取结束后
callback: 一次笔记爬取结束后的回调函数
Returns:
List[TiebaComment]: 子评论列表
"""
uri = "/p/comment"
if not config.ENABLE_GET_SUB_COMMENTS:
return []
# # 贴吧获取所有子评论需要登录态
# if self.headers.get("Cookies") == "" or not self.pong():
# raise Exception(f"[BaiduTieBaClient.pong] Cookies is empty, please login first...")
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_comments_all_sub_comments] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based sub-comment fetching")
all_sub_comments: List[TiebaComment] = []
for parment_comment in comments:
if parment_comment.sub_comment_count == 0:
continue
current_page = 1
max_sub_page_num = parment_comment.sub_comment_count // 10 + 1
while max_sub_page_num >= current_page:
params = {
"tid": parment_comment.note_id, # 帖子ID
"pid": parment_comment.comment_id, # 父级评论ID
"fid": parment_comment.tieba_id, # 贴吧ID
"pn": current_page # 页码
}
page_content = await self.get(uri, params=params, return_ori_content=True)
sub_comments = self._page_extractor.extract_tieba_note_sub_comments(page_content, parent_comment=parment_comment)
if not sub_comments:
while max_sub_page_num >= current_page:
# 构造子评论URL
sub_comment_url = (
f"{self._host}/p/comment?"
f"tid={parment_comment.note_id}&"
f"pid={parment_comment.comment_id}&"
f"fid={parment_comment.tieba_id}&"
f"pn={current_page}"
)
utils.logger.info(f"[BaiduTieBaClient.get_comments_all_sub_comments] 访问子评论页面: {sub_comment_url}")
try:
# 使用Playwright访问子评论页面
await self.playwright_page.goto(sub_comment_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
# 提取子评论
sub_comments = self._page_extractor.extract_tieba_note_sub_comments(
page_content, parent_comment=parment_comment
)
if not sub_comments:
utils.logger.info(
f"[BaiduTieBaClient.get_comments_all_sub_comments] "
f"评论{parment_comment.comment_id}{current_page}页没有子评论,停止爬取"
)
break
if callback:
await callback(parment_comment.note_id, sub_comments)
all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval)
current_page += 1
except Exception as e:
utils.logger.error(
f"[BaiduTieBaClient.get_comments_all_sub_comments] "
f"获取评论{parment_comment.comment_id}{current_page}页子评论失败: {e}"
)
break
if callback:
await callback(parment_comment.note_id, sub_comments)
all_sub_comments.extend(sub_comments)
await asyncio.sleep(crawl_interval)
current_page += 1
utils.logger.info(f"[BaiduTieBaClient.get_comments_all_sub_comments] 共获取 {len(all_sub_comments)} 条子评论")
return all_sub_comments
async def get_notes_by_tieba_name(self, tieba_name: str, page_num: int) -> List[TiebaNote]:
"""
根据贴吧名称获取帖子列表
根据贴吧名称获取帖子列表 (使用Playwright访问页面,避免API检测)
Args:
tieba_name: 贴吧名称
page_num: 分页数量
page_num: 分页页码
Returns:
List[TiebaNote]: 帖子列表
"""
uri = f"/f?kw={tieba_name}&pn={page_num}"
page_content = await self.get(uri, return_ori_content=True)
return self._page_extractor.extract_tieba_note_list(page_content)
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_notes_by_tieba_name] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based tieba note fetching")
# 构造贴吧帖子列表URL
tieba_url = f"{self._host}/f?kw={quote(tieba_name)}&pn={page_num}"
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 访问贴吧页面: {tieba_url}")
try:
# 使用Playwright访问贴吧页面
await self.playwright_page.goto(tieba_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 成功获取贴吧页面HTML,长度: {len(page_content)}")
# 提取帖子列表
notes = self._page_extractor.extract_tieba_note_list(page_content)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_tieba_name] 提取到 {len(notes)} 条帖子")
return notes
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_tieba_name] 获取贴吧帖子列表失败: {e}")
raise
async def get_creator_info_by_url(self, creator_url: str) -> str:
"""
根据创作者ID获取创作者信息
根据创作者URL获取创作者信息 (使用Playwright访问页面,避免API检测)
Args:
creator_url: 创作者主页URL
Returns:
str: 页面HTML内容
"""
page_content = await self.request(method="GET", url=creator_url, return_ori_content=True)
return page_content
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_creator_info_by_url] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based creator info fetching")
utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] 访问创作者主页: {creator_url}")
try:
# 使用Playwright访问创作者主页
await self.playwright_page.goto(creator_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面HTML内容
page_content = await self.playwright_page.content()
utils.logger.info(f"[BaiduTieBaClient.get_creator_info_by_url] 成功获取创作者主页HTML,长度: {len(page_content)}")
return page_content
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_creator_info_by_url] 获取创作者主页失败: {e}")
raise
async def get_notes_by_creator(self, user_name: str, page_number: int) -> Dict:
"""
根据创作者获取创作者的所有帖子
根据创作者获取创作者的帖子 (使用Playwright访问页面,避免API检测)
Args:
user_name:
page_number:
user_name: 创作者用户名
page_number: 页码
Returns:
Dict: 包含帖子数据的字典
"""
uri = f"/home/get/getthread"
params = {
"un": user_name,
"pn": page_number,
"id": "utf-8",
"_": utils.get_current_timestamp(),
}
return await self.get(uri, params=params)
if not self.playwright_page:
utils.logger.error("[BaiduTieBaClient.get_notes_by_creator] playwright_page is None, cannot use browser mode")
raise Exception("playwright_page is required for browser-based creator notes fetching")
# 构造创作者帖子列表URL
creator_url = f"{self._host}/home/get/getthread?un={quote(user_name)}&pn={page_number}&id=utf-8&_={utils.get_current_timestamp()}"
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_creator] 访问创作者帖子列表: {creator_url}")
try:
# 使用Playwright访问创作者帖子列表页面
await self.playwright_page.goto(creator_url, wait_until="domcontentloaded")
# 等待页面加载,使用配置文件中的延时设置
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# 获取页面内容(这个接口返回JSON)
page_content = await self.playwright_page.content()
# 提取JSON数据(页面会包含<pre>标签或直接是JSON)
try:
# 尝试从页面中提取JSON
json_text = await self.playwright_page.evaluate("() => document.body.innerText")
result = json.loads(json_text)
utils.logger.info(f"[BaiduTieBaClient.get_notes_by_creator] 成功获取创作者帖子数据")
return result
except json.JSONDecodeError as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] JSON解析失败: {e}")
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] 页面内容: {page_content[:500]}")
raise Exception(f"Failed to parse JSON from creator notes page: {e}")
except Exception as e:
utils.logger.error(f"[BaiduTieBaClient.get_notes_by_creator] 获取创作者帖子列表失败: {e}")
raise
async def get_all_notes_by_creator_user_name(
self,

View File

@@ -11,7 +11,6 @@
import asyncio
import os
# import random # Removed as we now use fixed config.CRAWLER_MAX_SLEEP_SEC intervals
from asyncio import Task
from typing import Dict, List, Optional, Tuple
@@ -26,7 +25,7 @@ from playwright.async_api import (
import config
from base.base_crawler import AbstractCrawler
from model.m_baidu_tieba import TiebaCreator, TiebaNote
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from proxy.proxy_ip_pool import IpInfoModel, ProxyIpPool, create_ip_pool
from store import tieba as tieba_store
from tools import utils
from tools.cdp_browser import CDPBrowserManager
@@ -56,7 +55,7 @@ class TieBaCrawler(AbstractCrawler):
Returns:
"""
ip_proxy_pool, httpx_proxy_format = None, None
playwright_proxy_format, httpx_proxy_format = None, None
if config.ENABLE_IP_PROXY:
utils.logger.info(
"[BaiduTieBaCrawler.start] Begin create ip proxy pool ..."
@@ -65,31 +64,73 @@ class TieBaCrawler(AbstractCrawler):
config.IP_PROXY_POOL_COUNT, enable_validate_ip=True
)
ip_proxy_info: IpInfoModel = await ip_proxy_pool.get_proxy()
_, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
playwright_proxy_format, httpx_proxy_format = utils.format_proxy_info(ip_proxy_info)
utils.logger.info(
f"[BaiduTieBaCrawler.start] Init default ip proxy, value: {httpx_proxy_format}"
)
# Create a client to interact with the baidutieba website.
self.tieba_client = BaiduTieBaClient(
ip_pool=ip_proxy_pool,
default_ip_proxy=httpx_proxy_format,
)
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information.
await self.search()
await self.get_specified_tieba_notes()
elif config.CRAWLER_TYPE == "detail":
# Get the information and comments of the specified post
await self.get_specified_notes()
elif config.CRAWLER_TYPE == "creator":
# Get creator's information and their notes and comments
await self.get_creators_and_notes()
else:
pass
async with async_playwright() as playwright:
# 根据配置选择启动模式
if config.ENABLE_CDP_MODE:
utils.logger.info("[BaiduTieBaCrawler] 使用CDP模式启动浏览器")
self.browser_context = await self.launch_browser_with_cdp(
playwright,
playwright_proxy_format,
self.user_agent,
headless=config.CDP_HEADLESS,
)
else:
utils.logger.info("[BaiduTieBaCrawler] 使用标准模式启动浏览器")
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(
chromium,
playwright_proxy_format,
self.user_agent,
headless=config.HEADLESS,
)
utils.logger.info("[BaiduTieBaCrawler.start] Tieba Crawler finished ...")
# 注入反检测脚本 - 针对百度的特殊检测
await self._inject_anti_detection_scripts()
self.context_page = await self.browser_context.new_page()
# 先访问百度首页,再点击贴吧链接,避免触发安全验证
await self._navigate_to_tieba_via_baidu()
# Create a client to interact with the baidutieba website.
self.tieba_client = await self.create_tieba_client(
httpx_proxy_format,
ip_proxy_pool if config.ENABLE_IP_PROXY else None
)
# Check login status and perform login if necessary
if not await self.tieba_client.pong(browser_context=self.browser_context):
login_obj = BaiduTieBaLogin(
login_type=config.LOGIN_TYPE,
login_phone="", # your phone number
browser_context=self.browser_context,
context_page=self.context_page,
cookie_str=config.COOKIES,
)
await login_obj.begin()
await self.tieba_client.update_cookies(browser_context=self.browser_context)
crawler_type_var.set(config.CRAWLER_TYPE)
if config.CRAWLER_TYPE == "search":
# Search for notes and retrieve their comment information.
await self.search()
await self.get_specified_tieba_notes()
elif config.CRAWLER_TYPE == "detail":
# Get the information and comments of the specified post
await self.get_specified_notes()
elif config.CRAWLER_TYPE == "creator":
# Get creator's information and their notes and comments
await self.get_creators_and_notes()
else:
pass
utils.logger.info("[BaiduTieBaCrawler.start] Tieba Crawler finished ...")
async def search(self) -> None:
"""
@@ -347,6 +388,198 @@ class TieBaCrawler(AbstractCrawler):
f"[WeiboCrawler.get_creators_and_notes] get creator info error, creator_url:{creator_url}"
)
async def _navigate_to_tieba_via_baidu(self):
"""
模拟真实用户访问路径:
1. 先访问百度首页 (https://www.baidu.com/)
2. 等待页面加载
3. 点击顶部导航栏的"贴吧"链接
4. 跳转到贴吧首页
这样做可以避免触发百度的安全验证
"""
utils.logger.info("[TieBaCrawler] 模拟真实用户访问路径...")
try:
# Step 1: 访问百度首页
utils.logger.info("[TieBaCrawler] Step 1: 访问百度首页 https://www.baidu.com/")
await self.context_page.goto("https://www.baidu.com/", wait_until="domcontentloaded")
# Step 2: 等待页面加载,使用配置文件中的延时设置
utils.logger.info(f"[TieBaCrawler] Step 2: 等待 {config.CRAWLER_MAX_SLEEP_SEC}秒 模拟用户浏览...")
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
# Step 3: 查找并点击"贴吧"链接
utils.logger.info("[TieBaCrawler] Step 3: 查找并点击'贴吧'链接...")
# 尝试多种选择器,确保能找到贴吧链接
tieba_selectors = [
'a[href="http://tieba.baidu.com/"]',
'a[href="https://tieba.baidu.com/"]',
'a.mnav:has-text("贴吧")',
'text=贴吧',
]
tieba_link = None
for selector in tieba_selectors:
try:
tieba_link = await self.context_page.wait_for_selector(selector, timeout=5000)
if tieba_link:
utils.logger.info(f"[TieBaCrawler] 找到贴吧链接 (selector: {selector})")
break
except Exception:
continue
if not tieba_link:
utils.logger.warning("[TieBaCrawler] 未找到贴吧链接,直接访问贴吧首页")
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
return
# Step 4: 点击贴吧链接 (检查是否会打开新标签页)
utils.logger.info("[TieBaCrawler] Step 4: 点击贴吧链接...")
# 检查链接的target属性
target_attr = await tieba_link.get_attribute("target")
utils.logger.info(f"[TieBaCrawler] 链接target属性: {target_attr}")
if target_attr == "_blank":
# 如果是新标签页,需要等待新页面并切换
utils.logger.info("[TieBaCrawler] 链接会在新标签页打开,等待新页面...")
async with self.browser_context.expect_page() as new_page_info:
await tieba_link.click()
# 获取新打开的页面
new_page = await new_page_info.value
await new_page.wait_for_load_state("domcontentloaded")
# 关闭旧的百度首页
await self.context_page.close()
# 切换到新的贴吧页面
self.context_page = new_page
utils.logger.info("[TieBaCrawler] ✅ 已切换到新标签页 (贴吧页面)")
else:
# 如果是同一标签页跳转,正常等待导航
utils.logger.info("[TieBaCrawler] 链接在当前标签页跳转...")
async with self.context_page.expect_navigation(wait_until="domcontentloaded"):
await tieba_link.click()
# Step 5: 等待页面稳定,使用配置文件中的延时设置
utils.logger.info(f"[TieBaCrawler] Step 5: 页面加载完成,等待 {config.CRAWLER_MAX_SLEEP_SEC}秒...")
await asyncio.sleep(config.CRAWLER_MAX_SLEEP_SEC)
current_url = self.context_page.url
utils.logger.info(f"[TieBaCrawler] ✅ 成功通过百度首页进入贴吧! 当前URL: {current_url}")
except Exception as e:
utils.logger.error(f"[TieBaCrawler] 通过百度首页访问贴吧失败: {e}")
utils.logger.info("[TieBaCrawler] 回退:直接访问贴吧首页")
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
async def _inject_anti_detection_scripts(self):
"""
注入反检测JavaScript脚本
针对百度贴吧的特殊检测机制
"""
utils.logger.info("[TieBaCrawler] Injecting anti-detection scripts...")
# 轻量级反检测脚本,只覆盖关键检测点
anti_detection_js = """
// 覆盖 navigator.webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
configurable: true
});
// 覆盖 window.navigator.chrome
if (!window.navigator.chrome) {
window.navigator.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: {}
};
}
// 覆盖 Permissions API
const originalQuery = window.navigator.permissions.query;
window.navigator.permissions.query = (parameters) => (
parameters.name === 'notifications' ?
Promise.resolve({ state: Notification.permission }) :
originalQuery(parameters)
);
// 覆盖 plugins 长度(让它看起来有插件)
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
configurable: true
});
// 覆盖 languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en'],
configurable: true
});
// 移除 window.cdc_ 等 ChromeDriver 残留
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;
console.log('[Anti-Detection] Scripts injected successfully');
"""
await self.browser_context.add_init_script(anti_detection_js)
utils.logger.info("[TieBaCrawler] Anti-detection scripts injected")
async def create_tieba_client(
self, httpx_proxy: Optional[str], ip_pool: Optional[ProxyIpPool] = None
) -> BaiduTieBaClient:
"""
Create tieba client with real browser User-Agent and complete headers
Args:
httpx_proxy: HTTP代理
ip_pool: IP代理池
Returns:
BaiduTieBaClient实例
"""
utils.logger.info("[TieBaCrawler.create_tieba_client] Begin create tieba API client...")
# 从真实浏览器提取User-Agent,避免被检测
user_agent = await self.context_page.evaluate("() => navigator.userAgent")
utils.logger.info(f"[TieBaCrawler.create_tieba_client] Extracted User-Agent from browser: {user_agent}")
cookie_str, cookie_dict = utils.convert_cookies(await self.browser_context.cookies())
# 构建完整的浏览器请求头,模拟真实浏览器行为
tieba_client = BaiduTieBaClient(
timeout=10,
ip_pool=ip_pool,
default_ip_proxy=httpx_proxy,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"User-Agent": user_agent, # 使用真实浏览器的UA
"Cookie": cookie_str,
"Host": "tieba.baidu.com",
"Referer": "https://tieba.baidu.com/",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"sec-ch-ua": '"Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
},
playwright_page=self.context_page, # 传入playwright页面对象
)
return tieba_client
async def launch_browser(
self,
chromium: BrowserType,
@@ -381,10 +614,11 @@ class TieBaCrawler(AbstractCrawler):
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent,
channel="chrome", # 使用系统的Chrome稳定版
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)

View File

@@ -77,8 +77,11 @@ class WeiboCrawler(AbstractCrawler):
# Launch a browser context.
chromium = playwright.chromium
self.browser_context = await self.launch_browser(chromium, None, self.mobile_user_agent, headless=config.HEADLESS)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.mobile_index_url)
@@ -340,10 +343,11 @@ class WeiboCrawler(AbstractCrawler):
"height": 1080
},
user_agent=user_agent,
channel="chrome", # 使用系统的Chrome稳定版
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
browser_context = await browser.new_context(viewport={"width": 1920, "height": 1080}, user_agent=user_agent)
return browser_context

View File

@@ -10,23 +10,24 @@
import asyncio
import json
import re
import time
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlencode
import httpx
from playwright.async_api import BrowserContext, Page
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_result
from tenacity import retry, stop_after_attempt, wait_fixed
import config
from base.base_crawler import AbstractApiClient
from tools import utils
from html import unescape
from .exception import DataFetchError, IPBlockError
from .field import SearchNoteType, SearchSortType
from .help import get_search_id, sign
from .extractor import XiaoHongShuExtractor
from .secsign import seccore_signv2_playwright
class XiaoHongShuClient(AbstractApiClient):
@@ -63,15 +64,13 @@ class XiaoHongShuClient(AbstractApiClient):
Returns:
"""
encrypt_params = await self.playwright_page.evaluate(
"([url, data]) => window._webmsxyw(url,data)", [url, data]
)
x_s = await seccore_signv2_playwright(self.playwright_page, url, data)
local_storage = await self.playwright_page.evaluate("() => window.localStorage")
signs = sign(
a1=self.cookie_dict.get("a1", ""),
b1=local_storage.get("b1", ""),
x_s=encrypt_params.get("X-s", ""),
x_t=str(encrypt_params.get("X-t", "")),
x_s=x_s,
x_t=str(int(time.time())),
)
headers = {
@@ -451,13 +450,26 @@ class XiaoHongShuClient(AbstractApiClient):
result.extend(comments)
return result
async def get_creator_info(self, user_id: str) -> Dict:
async def get_creator_info(
self, user_id: str, xsec_token: str = "", xsec_source: str = ""
) -> Dict:
"""
通过解析网页版的用户主页HTML获取用户个人简要信息
PC端用户主页的网页存在window.__INITIAL_STATE__这个变量上的解析它即可
eg: https://www.xiaohongshu.com/user/profile/59d8cb33de5fb4696bf17217
Args:
user_id: 用户ID
xsec_token: 验证token (可选,如果URL中包含此参数则传入)
xsec_source: 渠道来源 (可选,如果URL中包含此参数则传入)
Returns:
Dict: 创作者信息
"""
# 构建URI,如果有xsec参数则添加到URL中
uri = f"/user/profile/{user_id}"
if xsec_token and xsec_source:
uri = f"{uri}?xsec_token={xsec_token}&xsec_source={xsec_source}"
html_content = await self.request(
"GET", self._domain + uri, return_response=True, headers=self.headers
)

View File

@@ -26,7 +26,7 @@ from tenacity import RetryError
import config
from base.base_crawler import AbstractCrawler
from config import CRAWLER_MAX_COMMENTS_COUNT_SINGLENOTES
from model.m_xiaohongshu import NoteUrlInfo
from model.m_xiaohongshu import NoteUrlInfo, CreatorUrlInfo
from proxy.proxy_ip_pool import IpInfoModel, create_ip_pool
from store import xhs as xhs_store
from tools import utils
@@ -36,7 +36,7 @@ from var import crawler_type_var, source_keyword_var
from .client import XiaoHongShuClient
from .exception import DataFetchError
from .field import SearchSortType
from .help import parse_note_info_from_note_url, get_search_id
from .help import parse_note_info_from_note_url, parse_creator_info_from_url, get_search_id
from .login import XiaoHongShuLogin
@@ -79,8 +79,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
self.user_agent,
headless=config.HEADLESS,
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url)
@@ -174,11 +175,24 @@ class XiaoHongShuCrawler(AbstractCrawler):
async def get_creators_and_notes(self) -> None:
"""Get creator's notes and retrieve their comment information."""
utils.logger.info("[XiaoHongShuCrawler.get_creators_and_notes] Begin get xiaohongshu creators")
for user_id in config.XHS_CREATOR_ID_LIST:
# get creator detail info from web html content
createor_info: Dict = await self.xhs_client.get_creator_info(user_id=user_id)
if createor_info:
await xhs_store.save_creator(user_id, creator=createor_info)
for creator_url in config.XHS_CREATOR_ID_LIST:
try:
# Parse creator URL to get user_id and security tokens
creator_info: CreatorUrlInfo = parse_creator_info_from_url(creator_url)
utils.logger.info(f"[XiaoHongShuCrawler.get_creators_and_notes] Parse creator URL info: {creator_info}")
user_id = creator_info.user_id
# get creator detail info from web html content
createor_info: Dict = await self.xhs_client.get_creator_info(
user_id=user_id,
xsec_token=creator_info.xsec_token,
xsec_source=creator_info.xsec_source
)
if createor_info:
await xhs_store.save_creator(user_id, creator=createor_info)
except ValueError as e:
utils.logger.error(f"[XiaoHongShuCrawler.get_creators_and_notes] Failed to parse creator URL: {e}")
continue
# Use fixed crawling interval
crawl_interval = config.CRAWLER_MAX_SLEEP_SEC
@@ -268,16 +282,9 @@ class XiaoHongShuCrawler(AbstractCrawler):
async with semaphore:
try:
utils.logger.info(f"[get_note_detail_async_task] Begin get note detail, note_id: {note_id}")
try:
note_detail = await self.xhs_client.get_note_by_id(note_id, xsec_source, xsec_token)
except RetryError as e:
pass
note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, enable_cookie=True)
if not note_detail:
note_detail = await self.xhs_client.get_note_by_id_from_html(note_id, xsec_source, xsec_token, enable_cookie=True)
if not note_detail:
raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}")
raise Exception(f"[get_note_detail_async_task] Failed to get note detail, Id: {note_id}")
note_detail.update({"xsec_token": xsec_token, "xsec_source": xsec_source})

View File

@@ -15,7 +15,7 @@ import random
import time
import urllib.parse
from model.m_xiaohongshu import NoteUrlInfo
from model.m_xiaohongshu import NoteUrlInfo, CreatorUrlInfo
from tools.crawler_util import extract_url_params_to_dict
@@ -27,16 +27,17 @@ def sign(a1="", b1="", x_s="", x_t=""):
"s0": 3, # getPlatformCode
"s1": "",
"x0": "1", # localStorage.getItem("b1b1")
"x1": "3.7.8-2", # version
"x1": "4.2.2", # version
"x2": "Mac OS",
"x3": "xhs-pc-web",
"x4": "4.27.2",
"x4": "4.74.0",
"x5": a1, # cookie of a1
"x6": x_t,
"x7": x_s,
"x8": b1, # localStorage.getItem("b1")
"x9": mrc(x_t + x_s + b1),
"x10": 154, # getSigCount
"x11": "normal"
}
encode_str = encodeUtf8(json.dumps(common, separators=(',', ':')))
x_s_common = b64Encode(encode_str)
@@ -306,6 +307,37 @@ def parse_note_info_from_note_url(url: str) -> NoteUrlInfo:
return NoteUrlInfo(note_id=note_id, xsec_token=xsec_token, xsec_source=xsec_source)
def parse_creator_info_from_url(url: str) -> CreatorUrlInfo:
"""
从小红书创作者主页URL中解析出创作者信息
支持以下格式:
1. 完整URL: "https://www.xiaohongshu.com/user/profile/5eb8e1d400000000010075ae?xsec_token=AB1nWBKCo1vE2HEkfoJUOi5B6BE5n7wVrbdpHoWIj5xHw=&xsec_source=pc_feed"
2. 纯ID: "5eb8e1d400000000010075ae"
Args:
url: 创作者主页URL或user_id
Returns:
CreatorUrlInfo: 包含user_id, xsec_token, xsec_source的对象
"""
# 如果是纯ID格式(24位十六进制字符),直接返回
if len(url) == 24 and all(c in "0123456789abcdef" for c in url):
return CreatorUrlInfo(user_id=url, xsec_token="", xsec_source="")
# 从URL中提取user_id: /user/profile/xxx
import re
user_pattern = r'/user/profile/([^/?]+)'
match = re.search(user_pattern, url)
if match:
user_id = match.group(1)
# 提取xsec_token和xsec_source参数
params = extract_url_params_to_dict(url)
xsec_token = params.get("xsec_token", "")
xsec_source = params.get("xsec_source", "")
return CreatorUrlInfo(user_id=user_id, xsec_token=xsec_token, xsec_source=xsec_source)
raise ValueError(f"无法从URL中解析出创作者信息: {url}")
if __name__ == '__main__':
_img_url = "https://sns-img-bd.xhscdn.com/7a3abfaf-90c1-a828-5de7-022c80b92aa3"
# 获取一个图片地址在多个cdn下的url地址
@@ -313,4 +345,19 @@ if __name__ == '__main__':
final_img_url = get_img_url_by_trace_id(get_trace_id(_img_url))
print(final_img_url)
# 测试创作者URL解析
print("\n=== 创作者URL解析测试 ===")
test_creator_urls = [
"https://www.xiaohongshu.com/user/profile/5eb8e1d400000000010075ae?xsec_token=AB1nWBKCo1vE2HEkfoJUOi5B6BE5n7wVrbdpHoWIj5xHw=&xsec_source=pc_feed",
"5eb8e1d400000000010075ae",
]
for url in test_creator_urls:
try:
result = parse_creator_info_from_url(url)
print(f"✓ URL: {url[:80]}...")
print(f" 结果: {result}\n")
except Exception as e:
print(f"✗ URL: {url}")
print(f" 错误: {e}\n")

View File

@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
import hashlib
import base64
import json
from typing import Any
def _build_c(e: Any, a: Any) -> str:
c = str(e)
if isinstance(a, (dict, list)):
c += json.dumps(a, separators=(",", ":"), ensure_ascii=False)
elif isinstance(a, str):
c += a
# 其它类型不拼
return c
# ---------------------------
# p.Pu = MD5(c) => hex 小写
# ---------------------------
def _md5_hex(s: str) -> str:
return hashlib.md5(s.encode("utf-8")).hexdigest()
# ============================================================
# Playwright 版本(异步):传入 pagePage 对象)
# 内部用 page.evaluate('window.mnsv2(...)')
# ============================================================
async def seccore_signv2_playwright(
page, # Playwright Page
e: Any,
a: Any,
) -> str:
"""
使用 Playwright 的 page.evaluate 调用 window.mnsv2(c, d) 来生成签名。
需确保 page 上下文中已存在 window.mnsv2比如已注入目标站点脚本
用法:
s = await page.evaluate("(c, d) => window.mnsv2(c, d)", c, d)
"""
c = _build_c(e, a)
d = _md5_hex(c)
# 调用浏览器上下文里的 window.mnsv2
s = await page.evaluate("(c, d) => window.mnsv2(c, d)", [c, d])
f = {
"x0": "4.2.6",
"x1": "xhs-pc-web",
"x2": "Mac OS",
"x3": s,
"x4": a,
}
payload = json.dumps(f, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
token = "XYS_" + base64.b64encode(payload).decode("ascii")
print(token)
return token

View File

@@ -86,8 +86,8 @@ class ZhihuCrawler(AbstractCrawler):
self.browser_context = await self.launch_browser(
chromium, None, self.user_agent, headless=config.HEADLESS
)
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
# stealth.min.js is a js script to prevent the website from detecting the crawler.
await self.browser_context.add_init_script(path="libs/stealth.min.js")
self.context_page = await self.browser_context.new_page()
await self.context_page.goto(self.index_url, wait_until="domcontentloaded")
@@ -429,10 +429,11 @@ class ZhihuCrawler(AbstractCrawler):
proxy=playwright_proxy, # type: ignore
viewport={"width": 1920, "height": 1080},
user_agent=user_agent,
channel="chrome", # 使用系统的Chrome稳定版
)
return browser_context
else:
browser = await chromium.launch(headless=headless, proxy=playwright_proxy) # type: ignore
browser = await chromium.launch(headless=headless, proxy=playwright_proxy, channel="chrome") # type: ignore
browser_context = await browser.new_context(
viewport={"width": 1920, "height": 1080}, user_agent=user_agent
)

25
model/m_bilibili.py Normal file
View File

@@ -0,0 +1,25 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
from pydantic import BaseModel, Field
class VideoUrlInfo(BaseModel):
"""B站视频URL信息"""
video_id: str = Field(title="video id (BV id)")
video_type: str = Field(default="video", title="video type")
class CreatorUrlInfo(BaseModel):
"""B站创作者URL信息"""
creator_id: str = Field(title="creator id (UID)")

View File

@@ -1,12 +1,25 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
from pydantic import BaseModel, Field
class VideoUrlInfo(BaseModel):
"""抖音视频URL信息"""
aweme_id: str = Field(title="aweme id (video id)")
url_type: str = Field(default="normal", title="url type: normal, short, modal")
class CreatorUrlInfo(BaseModel):
"""抖音创作者URL信息"""
sec_user_id: str = Field(title="sec_user_id (creator id)")

View File

@@ -1,12 +1,25 @@
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
# 3. 不得进行大规模爬取或对平台造成运营干扰。
# 4. 应合理控制请求频率,避免给目标平台带来不必要的负担。
# 5. 不得用于任何非法或不当的用途。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
#
# 详细许可条款请参阅项目根目录下的LICENSE文件。
# 使用本代码即表示您同意遵守上述原则和LICENSE中的所有条款。
# -*- coding: utf-8 -*-
from pydantic import BaseModel, Field
class VideoUrlInfo(BaseModel):
"""快手视频URL信息"""
video_id: str = Field(title="video id (photo id)")
url_type: str = Field(default="normal", title="url type: normal")
class CreatorUrlInfo(BaseModel):
"""快手创作者URL信息"""
user_id: str = Field(title="user id (creator id)")

View File

@@ -18,4 +18,11 @@ from pydantic import BaseModel, Field
class NoteUrlInfo(BaseModel):
note_id: str = Field(title="note id")
xsec_token: str = Field(title="xsec token")
xsec_source: str = Field(title="xsec source")
xsec_source: str = Field(title="xsec source")
class CreatorUrlInfo(BaseModel):
"""小红书创作者URL信息"""
user_id: str = Field(title="user id (creator id)")
xsec_token: str = Field(default="", title="xsec token")
xsec_source: str = Field(default="", title="xsec source")

View File

@@ -16,6 +16,7 @@ dependencies = [
"httpx==0.28.1",
"jieba==0.42.1",
"matplotlib==3.9.0",
"motor>=3.3.0",
"opencv-python>=4.11.0.86",
"pandas==2.2.3",
"parsel==1.9.1",

View File

@@ -24,3 +24,4 @@ cryptography>=45.0.7
alembic>=1.16.5
asyncmy>=0.2.10
sqlalchemy>=2.0.43
motor>=3.3.0

View File

@@ -28,13 +28,14 @@ class BiliStoreFactory:
"db": BiliDbStoreImplement,
"json": BiliJsonStoreImplement,
"sqlite": BiliSqliteStoreImplement,
"mongodb": BiliMongoStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = BiliStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[BiliStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -31,13 +31,14 @@ from database.models import BilibiliVideoComment, BilibiliVideo, BilibiliUpInfo,
from tools.async_file_writer import AsyncFileWriter
from tools import utils, words
from var import crawler_type_var
from store.mongodb_store_base import MongoDBStoreBase
class BiliCsvStoreImplement(AbstractStore):
def __init__(self):
self.file_writer = AsyncFileWriter(
crawler_type=crawler_type_var.get(),
platform="bilibili"
platform="bili"
)
async def store_content(self, content_item: Dict):
@@ -220,7 +221,7 @@ class BiliJsonStoreImplement(AbstractStore):
def __init__(self):
self.file_writer = AsyncFileWriter(
crawler_type=crawler_type_var.get(),
platform="bilibili"
platform="bili"
)
async def store_content(self, content_item: Dict):
@@ -297,3 +298,61 @@ class BiliJsonStoreImplement(AbstractStore):
class BiliSqliteStoreImplement(BiliDbStoreImplement):
pass
class BiliMongoStoreImplement(AbstractStore):
"""B站MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="bilibili")
async def store_content(self, content_item: Dict):
"""
存储视频内容到MongoDB
Args:
content_item: 视频内容数据
"""
video_id = content_item.get("video_id")
if not video_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"video_id": video_id},
data=content_item
)
utils.logger.info(f"[BiliMongoStoreImplement.store_content] Saved video {video_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[BiliMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储UP主信息到MongoDB
Args:
creator_item: UP主数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[BiliMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -22,7 +22,7 @@ from tools import utils
class BilibiliVideo(AbstractStoreVideo):
video_store_path: str = "data/bilibili/videos"
video_store_path: str = "data/bili/videos"
async def store_video(self, video_content_item: Dict):
"""

View File

@@ -27,13 +27,14 @@ class DouyinStoreFactory:
"db": DouyinDbStoreImplement,
"json": DouyinJsonStoreImplement,
"sqlite": DouyinSqliteStoreImplement,
"mongodb": DouyinMongoStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = DouyinStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[DouyinStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -28,6 +28,7 @@ from database.models import DouyinAweme, DouyinAwemeComment, DyCreator
from tools import utils, words
from tools.async_file_writer import AsyncFileWriter
from var import crawler_type_var
from store.mongodb_store_base import MongoDBStoreBase
class DouyinCsvStoreImplement(AbstractStore):
@@ -195,4 +196,62 @@ class DouyinJsonStoreImplement(AbstractStore):
class DouyinSqliteStoreImplement(DouyinDbStoreImplement):
pass
pass
class DouyinMongoStoreImplement(AbstractStore):
"""抖音MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="douyin")
async def store_content(self, content_item: Dict):
"""
存储视频内容到MongoDB
Args:
content_item: 视频内容数据
"""
aweme_id = content_item.get("aweme_id")
if not aweme_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"aweme_id": aweme_id},
data=content_item
)
utils.logger.info(f"[DouyinMongoStoreImplement.store_content] Saved aweme {aweme_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[DouyinMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[DouyinMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -26,7 +26,8 @@ class KuaishouStoreFactory:
"csv": KuaishouCsvStoreImplement,
"db": KuaishouDbStoreImplement,
"json": KuaishouJsonStoreImplement,
"sqlite": KuaishouSqliteStoreImplement
"sqlite": KuaishouSqliteStoreImplement,
"mongodb": KuaishouMongoStoreImplement,
}
@staticmethod
@@ -34,7 +35,7 @@ class KuaishouStoreFactory:
store_class = KuaishouStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
"[KuaishouStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -30,6 +30,7 @@ from database.db_session import get_session
from database.models import KuaishouVideo, KuaishouVideoComment
from tools import utils, words
from var import crawler_type_var
from store.mongodb_store_base import MongoDBStoreBase
def calculate_number_of_files(file_store_path: str) -> int:
@@ -157,4 +158,62 @@ class KuaishouJsonStoreImplement(AbstractStore):
class KuaishouSqliteStoreImplement(KuaishouDbStoreImplement):
async def store_creator(self, creator: Dict):
pass
pass
class KuaishouMongoStoreImplement(AbstractStore):
"""快手MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="kuaishou")
async def store_content(self, content_item: Dict):
"""
存储视频内容到MongoDB
Args:
content_item: 视频内容数据
"""
video_id = content_item.get("video_id")
if not video_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"video_id": video_id},
data=content_item
)
utils.logger.info(f"[KuaishouMongoStoreImplement.store_content] Saved video {video_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[KuaishouMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[KuaishouMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

173
store/mongodb_store_base.py Normal file
View File

@@ -0,0 +1,173 @@
# -*- coding: utf-8 -*-
"""
MongoDB存储基类
提供MongoDB连接管理和通用存储方法
"""
import asyncio
from typing import Dict, List, Optional
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from config import db_config
from tools import utils
class MongoDBConnection:
"""MongoDB连接管理单例类"""
_instance = None
_client: Optional[AsyncIOMotorClient] = None
_db: Optional[AsyncIOMotorDatabase] = None
_lock = asyncio.Lock()
def __new__(cls):
if cls._instance is None:
cls._instance = super(MongoDBConnection, cls).__new__(cls)
return cls._instance
async def get_client(self) -> AsyncIOMotorClient:
"""获取MongoDB客户端"""
if self._client is None:
async with self._lock:
if self._client is None:
await self._connect()
return self._client
async def get_db(self) -> AsyncIOMotorDatabase:
"""获取MongoDB数据库"""
if self._db is None:
async with self._lock:
if self._db is None:
await self._connect()
return self._db
async def _connect(self):
"""建立MongoDB连接"""
try:
mongo_config = db_config.mongodb_config
host = mongo_config.get("host", "localhost")
port = mongo_config.get("port", 27017)
user = mongo_config.get("user", "")
password = mongo_config.get("password", "")
db_name = mongo_config.get("db_name", "media_crawler")
# 构建连接URL
if user and password:
connection_url = f"mongodb://{user}:{password}@{host}:{port}/"
else:
connection_url = f"mongodb://{host}:{port}/"
self._client = AsyncIOMotorClient(connection_url, serverSelectionTimeoutMS=5000)
# 测试连接
await self._client.server_info()
self._db = self._client[db_name]
utils.logger.info(f"[MongoDBConnection] Successfully connected to MongoDB at {host}:{port}, database: {db_name}")
except Exception as e:
utils.logger.error(f"[MongoDBConnection] Failed to connect to MongoDB: {e}")
raise
async def close(self):
"""关闭MongoDB连接"""
if self._client is not None:
self._client.close()
self._client = None
self._db = None
utils.logger.info("[MongoDBConnection] MongoDB connection closed")
class MongoDBStoreBase:
"""MongoDB存储基类"""
def __init__(self, collection_prefix: str):
"""
初始化MongoDB存储基类
Args:
collection_prefix: 集合名称前缀xhs, douyin, bilibili等
"""
self.collection_prefix = collection_prefix
self._connection = MongoDBConnection()
async def get_collection(self, collection_suffix: str) -> AsyncIOMotorCollection:
"""
获取MongoDB集合
Args:
collection_suffix: 集合名称后缀contents, comments, creators
Returns:
MongoDB集合对象
"""
db = await self._connection.get_db()
collection_name = f"{self.collection_prefix}_{collection_suffix}"
return db[collection_name]
async def save_or_update(self, collection_suffix: str, query: Dict, data: Dict) -> bool:
"""
保存或更新数据upsert操作
Args:
collection_suffix: 集合名称后缀
query: 查询条件
data: 要保存的数据
Returns:
是否成功
"""
try:
collection = await self.get_collection(collection_suffix)
result = await collection.update_one(
query,
{"$set": data},
upsert=True
)
return True
except Exception as e:
utils.logger.error(f"[MongoDBStoreBase.save_or_update] Failed to save data to {self.collection_prefix}_{collection_suffix}: {e}")
return False
async def find_one(self, collection_suffix: str, query: Dict) -> Optional[Dict]:
"""
查询单条数据
Args:
collection_suffix: 集合名称后缀
query: 查询条件
Returns:
查询结果
"""
try:
collection = await self.get_collection(collection_suffix)
result = await collection.find_one(query)
return result
except Exception as e:
utils.logger.error(f"[MongoDBStoreBase.find_one] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}")
return None
async def find_many(self, collection_suffix: str, query: Dict, limit: int = 0) -> List[Dict]:
"""
查询多条数据
Args:
collection_suffix: 集合名称后缀
query: 查询条件
limit: 限制返回数量0表示不限制
Returns:
查询结果列表
"""
try:
collection = await self.get_collection(collection_suffix)
cursor = collection.find(query)
if limit > 0:
cursor = cursor.limit(limit)
results = await cursor.to_list(length=None)
return results
except Exception as e:
utils.logger.error(f"[MongoDBStoreBase.find_many] Failed to query from {self.collection_prefix}_{collection_suffix}: {e}")
return []
async def create_index(self, collection_suffix: str, keys: List[tuple], unique: bool = False):
"""
创建索引
Args:
collection_suffix: 集合名称后缀
keys: 索引键列表,例如:[("note_id", 1)]
unique: 是否创建唯一索引
"""
try:
collection = await self.get_collection(collection_suffix)
await collection.create_index(keys, unique=unique)
utils.logger.info(f"[MongoDBStoreBase.create_index] Created index on {self.collection_prefix}_{collection_suffix}")
except Exception as e:
utils.logger.error(f"[MongoDBStoreBase.create_index] Failed to create index: {e}")

View File

@@ -23,7 +23,8 @@ class TieBaStoreFactory:
"csv": TieBaCsvStoreImplement,
"db": TieBaDbStoreImplement,
"json": TieBaJsonStoreImplement,
"sqlite": TieBaSqliteStoreImplement
"sqlite": TieBaSqliteStoreImplement,
"mongodb": TieBaMongoStoreImplement,
}
@staticmethod
@@ -31,7 +32,7 @@ class TieBaStoreFactory:
store_class = TieBaStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError(
"[TieBaStoreFactory.create_store] Invalid save option only supported csv or db or json ...")
"[TieBaStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -31,6 +31,7 @@ from tools import utils, words
from database.db_session import get_session
from var import crawler_type_var
from tools.async_file_writer import AsyncFileWriter
from store.mongodb_store_base import MongoDBStoreBase
def calculate_number_of_files(file_store_path: str) -> int:
@@ -190,3 +191,61 @@ class TieBaSqliteStoreImplement(TieBaDbStoreImplement):
Tieba sqlite store implement
"""
pass
class TieBaMongoStoreImplement(AbstractStore):
"""贴吧MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="tieba")
async def store_content(self, content_item: Dict):
"""
存储帖子内容到MongoDB
Args:
content_item: 帖子内容数据
"""
note_id = content_item.get("note_id")
if not note_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"note_id": note_id},
data=content_item
)
utils.logger.info(f"[TieBaMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[TieBaMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[TieBaMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -28,13 +28,14 @@ class WeibostoreFactory:
"db": WeiboDbStoreImplement,
"json": WeiboJsonStoreImplement,
"sqlite": WeiboSqliteStoreImplement,
"mongodb": WeiboMongoStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = WeibostoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[WeibotoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -31,6 +31,7 @@ from tools import utils, words
from tools.async_file_writer import AsyncFileWriter
from database.db_session import get_session
from var import crawler_type_var
from store.mongodb_store_base import MongoDBStoreBase
def calculate_number_of_files(file_store_path: str) -> int:
@@ -212,3 +213,61 @@ class WeiboSqliteStoreImplement(WeiboDbStoreImplement):
Weibo content SQLite storage implementation
"""
pass
class WeiboMongoStoreImplement(AbstractStore):
"""微博MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="weibo")
async def store_content(self, content_item: Dict):
"""
存储微博内容到MongoDB
Args:
content_item: 微博内容数据
"""
note_id = content_item.get("note_id")
if not note_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"note_id": note_id},
data=content_item
)
utils.logger.info(f"[WeiboMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[WeiboMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[WeiboMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -27,13 +27,14 @@ class XhsStoreFactory:
"db": XhsDbStoreImplement,
"json": XhsJsonStoreImplement,
"sqlite": XhsSqliteStoreImplement,
"mongodb": XhsMongoStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = XhsStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[XhsStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()

View File

@@ -18,6 +18,8 @@ from database.models import XhsNote, XhsNoteComment, XhsCreator
from tools.async_file_writer import AsyncFileWriter
from tools.time_util import get_current_timestamp
from var import crawler_type_var
from store.mongodb_store_base import MongoDBStoreBase
from tools import utils
class XhsCsvStoreImplement(AbstractStore):
def __init__(self, **kwargs):
@@ -258,3 +260,62 @@ class XhsDbStoreImplement(AbstractStore):
class XhsSqliteStoreImplement(XhsDbStoreImplement):
def __init__(self, **kwargs):
super().__init__(**kwargs)
class XhsMongoStoreImplement(AbstractStore):
"""小红书MongoDB存储实现"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.mongo_store = MongoDBStoreBase(collection_prefix="xhs")
async def store_content(self, content_item: Dict):
"""
存储笔记内容到MongoDB
Args:
content_item: 笔记内容数据
"""
note_id = content_item.get("note_id")
if not note_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"note_id": note_id},
data=content_item
)
utils.logger.info(f"[XhsMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[XhsMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[XhsMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -18,7 +18,8 @@ from model.m_zhihu import ZhihuComment, ZhihuContent, ZhihuCreator
from ._store_impl import (ZhihuCsvStoreImplement,
ZhihuDbStoreImplement,
ZhihuJsonStoreImplement,
ZhihuSqliteStoreImplement)
ZhihuSqliteStoreImplement,
ZhihuMongoStoreImplement)
from tools import utils
from var import source_keyword_var
@@ -28,14 +29,15 @@ class ZhihuStoreFactory:
"csv": ZhihuCsvStoreImplement,
"db": ZhihuDbStoreImplement,
"json": ZhihuJsonStoreImplement,
"sqlite": ZhihuSqliteStoreImplement
"sqlite": ZhihuSqliteStoreImplement,
"mongodb": ZhihuMongoStoreImplement,
}
@staticmethod
def create_store() -> AbstractStore:
store_class = ZhihuStoreFactory.STORES.get(config.SAVE_DATA_OPTION)
if not store_class:
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite ...")
raise ValueError("[ZhihuStoreFactory.create_store] Invalid save option only supported csv or db or json or sqlite or mongodb ...")
return store_class()
async def batch_update_zhihu_contents(contents: List[ZhihuContent]):

View File

@@ -31,6 +31,7 @@ from database.models import ZhihuContent, ZhihuComment, ZhihuCreator
from tools import utils, words
from var import crawler_type_var
from tools.async_file_writer import AsyncFileWriter
from store.mongodb_store_base import MongoDBStoreBase
def calculate_number_of_files(file_store_path: str) -> int:
"""计算数据保存文件的前部分排序数字,支持每次运行代码不写到同一个文件中
@@ -189,3 +190,61 @@ class ZhihuSqliteStoreImplement(ZhihuDbStoreImplement):
Zhihu content SQLite storage implementation
"""
pass
class ZhihuMongoStoreImplement(AbstractStore):
"""知乎MongoDB存储实现"""
def __init__(self):
self.mongo_store = MongoDBStoreBase(collection_prefix="zhihu")
async def store_content(self, content_item: Dict):
"""
存储内容到MongoDB
Args:
content_item: 内容数据
"""
note_id = content_item.get("note_id")
if not note_id:
return
await self.mongo_store.save_or_update(
collection_suffix="contents",
query={"note_id": note_id},
data=content_item
)
utils.logger.info(f"[ZhihuMongoStoreImplement.store_content] Saved note {note_id} to MongoDB")
async def store_comment(self, comment_item: Dict):
"""
存储评论到MongoDB
Args:
comment_item: 评论数据
"""
comment_id = comment_item.get("comment_id")
if not comment_id:
return
await self.mongo_store.save_or_update(
collection_suffix="comments",
query={"comment_id": comment_id},
data=comment_item
)
utils.logger.info(f"[ZhihuMongoStoreImplement.store_comment] Saved comment {comment_id} to MongoDB")
async def store_creator(self, creator_item: Dict):
"""
存储创作者信息到MongoDB
Args:
creator_item: 创作者数据
"""
user_id = creator_item.get("user_id")
if not user_id:
return
await self.mongo_store.save_or_update(
collection_suffix="creators",
query={"user_id": user_id},
data=creator_item
)
utils.logger.info(f"[ZhihuMongoStoreImplement.store_creator] Saved creator {user_id} to MongoDB")

View File

@@ -5,13 +5,16 @@ import os
import pathlib
from typing import Dict, List
import aiofiles
import config
from tools.utils import utils
from tools.words import AsyncWordCloudGenerator
class AsyncFileWriter:
def __init__(self, platform: str, crawler_type: str):
self.lock = asyncio.Lock()
self.platform = platform
self.crawler_type = crawler_type
self.wordcloud_generator = AsyncWordCloudGenerator() if config.ENABLE_GET_WORDCLOUD else None
def _get_file_path(self, file_type: str, item_type: str) -> str:
base_path = f"data/{self.platform}/{file_type}"
@@ -47,4 +50,58 @@ class AsyncFileWriter:
existing_data.append(item)
async with aiofiles.open(file_path, 'w', encoding='utf-8') as f:
await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))
await f.write(json.dumps(existing_data, ensure_ascii=False, indent=4))
async def generate_wordcloud_from_comments(self):
"""
Generate wordcloud from comments data
Only works when ENABLE_GET_WORDCLOUD and ENABLE_GET_COMMENTS are True
"""
if not config.ENABLE_GET_WORDCLOUD or not config.ENABLE_GET_COMMENTS:
return
if not self.wordcloud_generator:
return
try:
# Read comments from JSON file
comments_file_path = self._get_file_path('json', 'comments')
if not os.path.exists(comments_file_path) or os.path.getsize(comments_file_path) == 0:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No comments file found at {comments_file_path}")
return
async with aiofiles.open(comments_file_path, 'r', encoding='utf-8') as f:
content = await f.read()
if not content:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Comments file is empty")
return
comments_data = json.loads(content)
if not isinstance(comments_data, list):
comments_data = [comments_data]
# Filter comments data to only include 'content' field
# Handle different comment data structures across platforms
filtered_data = []
for comment in comments_data:
if isinstance(comment, dict):
# Try different possible content field names
content_text = comment.get('content') or comment.get('comment_text') or comment.get('text') or ''
if content_text:
filtered_data.append({'content': content_text})
if not filtered_data:
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] No valid comment content found")
return
# Generate wordcloud
words_base_path = f"data/{self.platform}/words"
pathlib.Path(words_base_path).mkdir(parents=True, exist_ok=True)
words_file_prefix = f"{words_base_path}/{self.crawler_type}_comments_{utils.get_current_date()}"
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Generating wordcloud from {len(filtered_data)} comments")
await self.wordcloud_generator.generate_word_frequency_and_cloud(filtered_data, words_file_prefix)
utils.logger.info(f"[AsyncFileWriter.generate_wordcloud_from_comments] Wordcloud generated successfully at {words_file_prefix}")
except Exception as e:
utils.logger.error(f"[AsyncFileWriter.generate_wordcloud_from_comments] Error generating wordcloud: {e}")

View File

@@ -127,23 +127,24 @@ class BrowserLauncher:
"--disable-hang-monitor",
"--disable-prompt-on-repost",
"--disable-sync",
"--disable-web-security", # 可能有助于某些网站的访问
"--disable-features=VizDisplayCompositor",
"--disable-dev-shm-usage", # 避免共享内存问题
"--no-sandbox", # 在CDP模式下关闭沙箱
# 🔥 关键反检测参数
"--disable-blink-features=AutomationControlled", # 禁用自动化控制标记
"--exclude-switches=enable-automation", # 排除自动化开关
"--disable-infobars", # 禁用信息栏
]
# 无头模式
if headless:
args.extend([
"--headless",
"--headless=new", # 使用新的headless模式
"--disable-gpu",
])
else:
# 非无头模式下也保持一些稳定性参数
# 非无头模式的额外参数
args.extend([
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--start-maximized", # 最大化窗口,更像真实用户
])
# 用户数据目录

View File

@@ -26,6 +26,10 @@ def init_loging_config():
)
_logger = logging.getLogger("MediaCrawler")
_logger.setLevel(level)
# 关闭 httpx 的 INFO 日志
logging.getLogger("httpx").setLevel(logging.WARNING)
return _logger

1376
uv.lock generated

File diff suppressed because it is too large Load Diff