mirror of
https://github.com/tgbot-collection/YYeTsBot.git
synced 2025-11-25 03:15:05 +08:00
146 lines
5.7 KiB
Python
146 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
import contextlib
|
|
import logging
|
|
import re
|
|
from http import HTTPStatus
|
|
from urllib.parse import unquote
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
from retry import retry
|
|
|
|
from databases import DOUBAN_DETAIL, DOUBAN_SEARCH
|
|
from databases.base import Mongo
|
|
from databases.other import Captcha
|
|
|
|
|
|
class Douban(Mongo):
|
|
def get_douban_data(self, rid: int) -> dict:
|
|
with contextlib.suppress(Exception):
|
|
return self.find_douban(rid)
|
|
return {"posterData": None}
|
|
|
|
def get_douban_image(self, rid: int) -> bytes:
|
|
db_data = self.get_douban_data(rid)
|
|
return db_data["posterData"]
|
|
|
|
@retry(IndexError, tries=3, delay=5)
|
|
def find_douban(self, resource_id: int):
|
|
session = requests.Session()
|
|
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
|
|
session.headers.update({"User-Agent": ua})
|
|
|
|
douban_col = self.db["douban"]
|
|
yyets_col = self.db["yyets"]
|
|
data = douban_col.find_one({"resourceId": resource_id}, {"_id": False, "raw": False})
|
|
if data:
|
|
logging.info("Existing data for %s", resource_id)
|
|
return data
|
|
|
|
# data not found, craw from douban
|
|
projection = {
|
|
"data.info.cnname": True,
|
|
"data.info.enname": True,
|
|
"data.info.aliasname": True,
|
|
}
|
|
names = yyets_col.find_one({"data.info.id": resource_id}, projection=projection)
|
|
if names is None:
|
|
return {}
|
|
cname = names["data"]["info"]["cnname"]
|
|
logging.info("cnname for douban is %s", cname)
|
|
|
|
search_html = session.get(DOUBAN_SEARCH.format(cname)).text
|
|
logging.info("Analysis search html...length %s", len(search_html))
|
|
soup = BeautifulSoup(search_html, "html.parser")
|
|
douban_item = soup.find_all("div", class_="content")
|
|
|
|
fwd_link = unquote(douban_item[0].a["href"])
|
|
douban_id = re.findall(r"https://movie\.douban\.com/subject/(\d*)/.*", fwd_link)[0]
|
|
final_data = self.get_craw_data(cname, douban_id, resource_id, search_html, session)
|
|
douban_col.insert_one(final_data.copy())
|
|
final_data.pop("raw")
|
|
return final_data
|
|
|
|
@staticmethod
|
|
def get_craw_data(cname, douban_id, resource_id, search_html, session):
|
|
detail_link = DOUBAN_DETAIL.format(douban_id)
|
|
detail_html = session.get(detail_link).text
|
|
logging.info("Analysis detail html...%s", detail_link)
|
|
soup = BeautifulSoup(detail_html, "html.parser")
|
|
|
|
directors = [i.text for i in (soup.find_all("a", rel="v:directedBy"))]
|
|
release_date = poster_image_link = rating = year_text = intro = writers = episode_count = episode_duration = ""
|
|
with contextlib.suppress(IndexError):
|
|
episode_duration = soup.find_all("span", property="v:runtime")[0].text
|
|
for i in soup.find_all("span", class_="pl"):
|
|
if i.text == "编剧":
|
|
writers = re.sub(r"\s", "", list(i.next_siblings)[1].text).split("/")
|
|
if i.text == "集数:":
|
|
episode_count = str(i.nextSibling)
|
|
if i.text == "单集片长:" and not episode_duration:
|
|
episode_duration = str(i.nextSibling)
|
|
actors = [i.text for i in soup.find_all("a", rel="v:starring")]
|
|
genre = [i.text for i in soup.find_all("span", property="v:genre")]
|
|
|
|
with contextlib.suppress(IndexError):
|
|
release_date = soup.find_all("span", property="v:initialReleaseDate")[0].text
|
|
with contextlib.suppress(IndexError):
|
|
poster_image_link = soup.find_all("div", id="mainpic")[0].a.img["src"]
|
|
with contextlib.suppress(IndexError):
|
|
rating = soup.find_all("strong", class_="ll rating_num")[0].text
|
|
with contextlib.suppress(IndexError):
|
|
year_text = re.sub(r"[()]", "", soup.find_all("span", class_="year")[0].text)
|
|
with contextlib.suppress(IndexError):
|
|
intro = re.sub(r"\s", "", soup.find_all("span", property="v:summary")[0].text)
|
|
|
|
final_data = {
|
|
"name": cname,
|
|
"raw": {
|
|
"search_url": DOUBAN_SEARCH.format(cname),
|
|
"detail_url": detail_link,
|
|
"search_html": search_html,
|
|
"detail_html": detail_html,
|
|
},
|
|
"doubanId": int(douban_id),
|
|
"doubanLink": detail_link,
|
|
"posterLink": poster_image_link,
|
|
"posterData": session.get(poster_image_link).content,
|
|
"resourceId": resource_id,
|
|
"rating": rating,
|
|
"actors": actors,
|
|
"directors": directors,
|
|
"genre": genre,
|
|
"releaseDate": release_date,
|
|
"episodeCount": episode_count,
|
|
"episodeDuration": episode_duration,
|
|
"writers": writers,
|
|
"year": year_text,
|
|
"introduction": intro,
|
|
}
|
|
return final_data
|
|
|
|
|
|
class DoubanReport(Mongo):
|
|
def get_error(self) -> dict:
|
|
return dict(data=list(self.db["douban_error"].find(projection={"_id": False})))
|
|
|
|
def report_error(self, captcha: str, captcha_id: int, content: str, resource_id: int) -> dict:
|
|
returned = {"status_code": 0, "message": ""}
|
|
verify_result = Captcha().verify_code(captcha, captcha_id)
|
|
if not verify_result["status"]:
|
|
returned["status_code"] = HTTPStatus.BAD_REQUEST
|
|
returned["message"] = verify_result["message"]
|
|
return returned
|
|
|
|
count = (
|
|
self.db["douban_error"]
|
|
.update_one(
|
|
{"resource_id": resource_id},
|
|
{"$push": {"content": content}},
|
|
upsert=True,
|
|
)
|
|
.matched_count
|
|
)
|
|
return dict(count=count)
|