mirror of
https://github.com/tgbot-collection/YYeTsBot.git
synced 2025-11-25 19:37:34 +08:00
254 lines
7.8 KiB
Python
254 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
# coding: utf-8
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
|
|
import fakeredis
|
|
import meilisearch
|
|
import pymongo
|
|
import redis
|
|
|
|
faker_redis = fakeredis.FakeStrictRedis()
|
|
|
|
|
|
class Mongo:
|
|
def __init__(self):
|
|
self.client = pymongo.MongoClient(
|
|
host=os.getenv("MONGO", "localhost"),
|
|
connect=False,
|
|
connectTimeoutMS=5000,
|
|
serverSelectionTimeoutMS=5000,
|
|
)
|
|
self.db = self.client["zimuzu"]
|
|
super().__init__()
|
|
|
|
def __del__(self):
|
|
self.client.close()
|
|
|
|
def is_admin(self, username: str) -> bool:
|
|
data = self.db["users"].find_one({"username": username, "group": {"$in": ["admin"]}})
|
|
if data:
|
|
return True
|
|
|
|
def is_user_blocked(self, username: str) -> str:
|
|
r = self.db["users"].find_one({"username": username, "status.disable": True})
|
|
if r:
|
|
return r["status"]["reason"]
|
|
|
|
def is_old_user(self, username: str) -> bool:
|
|
return bool(self.db["users"].find_one({"username": username, "oldUser": True}))
|
|
|
|
|
|
class Redis:
|
|
def __init__(self):
|
|
try:
|
|
self.r = redis.StrictRedis(host=os.getenv("REDIS", "localhost"), decode_responses=True)
|
|
self.r.ping()
|
|
except redis.exceptions.ConnectionError:
|
|
logging.warning("%s Using fakeredis now... %s", "#" * 10, "#" * 10)
|
|
self.r = faker_redis
|
|
super().__init__()
|
|
|
|
def __del__(self):
|
|
self.r.close()
|
|
|
|
@classmethod
|
|
def cache(cls, timeout: int):
|
|
def func(fun):
|
|
def inner(*args, **kwargs):
|
|
func_name = fun.__name__
|
|
cache_value = cls().r.get(func_name)
|
|
if cache_value:
|
|
logging.info("Retrieving %s data from redis", func_name)
|
|
return json.loads(cache_value)
|
|
else:
|
|
logging.info("Cache expired. Executing %s", func_name)
|
|
res = fun(*args, **kwargs)
|
|
cls().r.set(func_name, json.dumps(res), ex=timeout)
|
|
return res
|
|
|
|
return inner
|
|
|
|
return func
|
|
|
|
|
|
class SearchEngine(Mongo):
|
|
yyets_projection = {
|
|
"data.info.cnname": 1,
|
|
"data.info.enname": 1,
|
|
"data.info.aliasname": 1,
|
|
"data.info.area": 1,
|
|
"data.info.id": 1,
|
|
"data.info.channel_cn": 1,
|
|
"data.info.channel": 1,
|
|
"_id": {"$toString": "$_id"},
|
|
"origin": "yyets",
|
|
}
|
|
|
|
douban_projection = {
|
|
"_id": {"$toString": "$_id"},
|
|
"id": "$resourceId",
|
|
"cnname": {"$first": "$resource.data.info.cnname"},
|
|
"enname": {"$first": "$resource.data.info.enname"},
|
|
"aliasname": {"$first": "$resource.data.info.aliasname"},
|
|
"area": {"$first": "$resource.data.info.area"},
|
|
"channel_cn": {"$first": "$resource.data.info.channel_cn"},
|
|
"channel": {"$first": "$resource.data.info.channel"},
|
|
"origin": "yyets",
|
|
"actors": 1,
|
|
"directors": 1,
|
|
"genres": 1,
|
|
"writers": 1,
|
|
"introduction": 1,
|
|
}
|
|
|
|
douban_lookup = {
|
|
"from": "yyets",
|
|
"localField": "resourceId",
|
|
"foreignField": "data.info.id",
|
|
"as": "resource",
|
|
}
|
|
comment_projection = {
|
|
"username": 1,
|
|
"date": 1,
|
|
"comment": "$content",
|
|
"commentID": {"$toString": "$_id"},
|
|
"origin": "comment",
|
|
"hasAvatar": "yes",
|
|
"resourceID": "$resource_id",
|
|
"resourceName": {"$first": "$resource.data.info.cnname"},
|
|
"_id": {"$toString": "$_id"},
|
|
}
|
|
comment_lookup = {
|
|
"from": "yyets",
|
|
"localField": "resource_id",
|
|
"foreignField": "data.info.id",
|
|
"as": "resource",
|
|
}
|
|
|
|
def __init__(self):
|
|
self.search_client = meilisearch.Client(os.getenv("MEILISEARCH"), "masterKey")
|
|
self.yyets_index = self.search_client.index("yyets")
|
|
self.comment_index = self.search_client.index("comment")
|
|
self.douban_index = self.search_client.index("douban")
|
|
super().__init__()
|
|
|
|
def __del__(self):
|
|
pass
|
|
|
|
def __get_yyets(self):
|
|
return self.db["yyets"].aggregate(
|
|
[
|
|
{"$project": self.yyets_projection},
|
|
{
|
|
"$replaceRoot": {
|
|
"newRoot": {
|
|
"$mergeObjects": [
|
|
{"origin": "yyets"},
|
|
"$data.info",
|
|
{"_id": "$_id"},
|
|
]
|
|
}
|
|
}
|
|
},
|
|
]
|
|
)
|
|
|
|
def __get_comment(self):
|
|
return self.db["comment"].aggregate(
|
|
[
|
|
{"$lookup": self.comment_lookup},
|
|
{"$project": self.comment_projection},
|
|
]
|
|
)
|
|
|
|
def __get_douban(self):
|
|
return self.db["douban"].aggregate(
|
|
[
|
|
{"$lookup": self.douban_lookup},
|
|
{"$project": self.douban_projection},
|
|
]
|
|
)
|
|
|
|
def add_yyets(self):
|
|
logging.info("Adding yyets data to search engine")
|
|
data = list(self.__get_yyets())
|
|
self.yyets_index.add_documents(data, primary_key="_id")
|
|
|
|
def add_comment(self):
|
|
logging.info("Adding comment data to search engine")
|
|
data = list(self.__get_comment())
|
|
self.comment_index.add_documents(data, primary_key="_id")
|
|
|
|
def add_douban(self):
|
|
logging.info("Adding douban data to search engine")
|
|
data = list(self.__get_douban())
|
|
self.douban_index.add_documents(data, primary_key="_id")
|
|
|
|
def search_yyets(self, keyword: "str"):
|
|
return self.yyets_index.search(keyword, {"matchingStrategy": "all"})["hits"]
|
|
|
|
def search_comment(self, keyword: "str"):
|
|
return self.comment_index.search(keyword, {"matchingStrategy": "all"})["hits"]
|
|
|
|
def search_douban(self, keyword: "str"):
|
|
return self.douban_index.search(keyword, {"matchingStrategy": "all"})["hits"]
|
|
|
|
def run_import(self):
|
|
t0 = time.time()
|
|
self.add_yyets()
|
|
self.add_comment()
|
|
self.add_douban()
|
|
logging.info(f"Import data to search engine in {time.time() - t0:.2f}s")
|
|
|
|
def __monitor(self, col, fun):
|
|
cursor = self.db[col].watch()
|
|
for change in cursor:
|
|
op_type = change["operationType"]
|
|
_id = change["documentKey"]["_id"]
|
|
search_index = getattr(self, f"{col}_index")
|
|
logging.info("%s %s change stream for %s", col, op_type, _id)
|
|
|
|
if op_type == "delete":
|
|
search_index.delete_document(_id)
|
|
else:
|
|
data = fun(_id)
|
|
search_index.add_documents(data, primary_key="_id")
|
|
|
|
def monitor_yyets(self):
|
|
def get_data(_id) -> list:
|
|
data = self.db.yyets.find_one({"_id": _id}, projection=self.yyets_projection)["data"]["info"]
|
|
data["_id"] = str(_id)
|
|
data["origin"] = "yyets"
|
|
return [data]
|
|
|
|
self.__monitor("yyets", get_data)
|
|
|
|
def monitor_douban(self):
|
|
def get_data(_id) -> list:
|
|
data = self.db.douban.aggregate(
|
|
[
|
|
{"$match": {"_id": _id}},
|
|
{"$lookup": self.douban_lookup},
|
|
{"$project": self.douban_projection},
|
|
]
|
|
)
|
|
return list(data)
|
|
|
|
self.__monitor("douban", get_data)
|
|
|
|
def monitor_comment(self):
|
|
def get_data(_id) -> list:
|
|
data = self.db.comment.aggregate(
|
|
[
|
|
{"$match": {"_id": _id}},
|
|
{"$lookup": self.comment_lookup},
|
|
{"$project": self.comment_projection},
|
|
]
|
|
)
|
|
return list(data)
|
|
|
|
self.__monitor("comment", get_data)
|