meilisearch 导入同步数据

close #206 #207
This commit is contained in:
Benny
2023-03-10 23:59:40 +01:00
parent 94572b5f45
commit 3602519253
3 changed files with 162 additions and 107 deletions

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3
# coding: utf-8
import contextlib
import logging
import os
import time
@@ -9,18 +10,46 @@ import meilisearch
from Mongo import Mongo
from utils import setup_logger
# YYeTsBot - fulltext.py
# 2023-03-08 19:35
setup_logger()
class SearchEngine(Mongo):
yyets_projection = {
"data.info.cnname": 1,
"data.info.enname": 1,
"data.info.aliasname": 1,
"data.info.area": 1,
"data.info.id": 1,
}
douban_projection = {
"_id": 0,
"doubanLink": 0,
"posterLink": 0,
"posterData": 0,
}
comment_projection = {
"username": 1,
"date": 1,
"comment": "$content",
"_id": 0,
"commentID": {"$toString": "$_id"},
"origin": "comment",
"hasAvatar": {"$toBool": "$avatar"},
"resourceID": "$resource_id",
"resourceName": {"$first": "$resource.data.info.cnname"},
"id": {"$toString": "$_id"},
}
comment_lookup = {
"from": "yyets",
"localField": "resource_id",
"foreignField": "data.info.id",
"as": "resource",
}
def __init__(self):
self.search_client = meilisearch.Client(
os.getenv("MEILISEARCH", "http://127.0.0.1:7700"), "masterKey"
)
self.search_client = meilisearch.Client(os.getenv("MEILISEARCH", "http://127.0.0.1:7700"), "masterKey")
self.yyets_index = self.search_client.index("yyets")
self.comment_index = self.search_client.index("comment")
self.douban_index = self.search_client.index("douban")
@@ -29,15 +58,7 @@ class SearchEngine(Mongo):
def __get_yyets(self):
return self.db["yyets"].aggregate(
[
{
"$project": {
"data.info.cnname": 1,
"data.info.enname": 1,
"data.info.aliasname": 1,
"data.info.area": 1,
"data.info.id": 1,
}
},
{"$project": self.yyets_projection},
{"$replaceRoot": {"newRoot": "$data.info"}},
]
)
@@ -45,44 +66,13 @@ class SearchEngine(Mongo):
def __get_comment(self):
return self.db["comment"].aggregate(
[
{
"$lookup": {
"from": "yyets",
"localField": "resource_id",
"foreignField": "data.info.id",
"as": "resource",
}
},
{
"$project": {
"username": 1,
"date": 1,
"comment": "$content",
"_id": 0,
"commentID": {"$toString": "$_id"},
"origin": "comment",
"hasAvatar": {"$toBool": "$avatar"},
"resourceID": "$resource_id",
"resourceName": {"$first": "$resource.data.info.cnname"},
"id": {"$toString": "$_id"},
}
},
{"$lookup": self.comment_lookup},
{"$project": self.comment_projection},
]
)
def __get_douban(self):
return self.db["douban"].aggregate(
[
{
"$project": {
"_id": 0,
"doubanLink": 0,
"posterLink": 0,
"posterData": 0,
}
}
]
)
return self.db["douban"].aggregate([{"$project": self.douban_projection}])
def add_yyets(self):
logging.info("Adding yyets data to search engine")
@@ -115,6 +105,41 @@ class SearchEngine(Mongo):
self.add_douban()
logging.info(f"Imported data to search engine in {time.time() - t0:.2f}s")
def monitor_yyets(self):
cursor = self.db.yyets.watch()
for change in cursor:
with contextlib.suppress(Exception):
key = change["documentKey"]["_id"]
data = self.db.yyets.find_one({"_id": key}, projection=self.yyets_projection)
index = data["data"]["info"]
logging.info("Updating yyets index: %s", index["cnname"])
self.yyets_index.add_documents([index])
def monitor_douban(self):
cursor = self.db.douban.watch()
for change in cursor:
with contextlib.suppress(Exception):
key = change["documentKey"]["_id"]
data = self.db.douban.find_one({"_id": key}, projection=self.douban_projection)
logging.info("Updating douban index: %s", data["name"])
self.douban_index.add_documents([data], primary_key="resourceId")
def monitor_comment(self):
cursor = self.db.comment.watch()
for change in cursor:
with contextlib.suppress(Exception):
key = change["documentKey"]["_id"]
data = self.db.comment.aggregate(
[
{"$match": {"_id": key}},
{"$lookup": self.comment_lookup},
{"$project": self.comment_projection},
]
)
data = list(data)
logging.info("Updating comment index: %s", data[0]["commentID"])
self.comment_index.add_documents(data, primary_key="commentID")
if __name__ == "__main__":
# docker run -it --rm -p 7700:7700 -e MEILI_HTTP_PAYLOAD_SIZE_LIMIT=1073741824 getmeili/meilisearch:v1.0

View File

@@ -10,7 +10,6 @@ __author__ = "Benny <benny.think@gmail.com>"
import logging
import os
import pathlib
import platform
import threading
import pytz
@@ -23,18 +22,41 @@ from tornado.log import enable_pretty_logging
from Mongo import OtherMongoResource, ResourceLatestMongoResource
from commands.douban_sync import sync_douban
from dump_db import entry_dump
from handler import (AnnouncementHandler, BlacklistHandler, CaptchaHandler,
CategoryHandler, CommentChildHandler, CommentHandler,
CommentNewestHandler, CommentReactionHandler,
DBDumpHandler, DoubanHandler, DoubanReportHandler,
FacebookAuth2LoginHandler, GitHubOAuth2LoginHandler,
GoogleOAuth2LoginHandler, GrafanaIndexHandler,
GrafanaQueryHandler, GrafanaSearchHandler, IndexHandler,
LikeHandler, MetricsHandler, MSOAuth2LoginHandler,
NameHandler, NotFoundHandler, NotificationHandler,
ResourceHandler, ResourceLatestHandler,
SpamProcessHandler, TopHandler, TwitterOAuth2LoginHandler,
UserAvatarHandler, UserEmailHandler, UserHandler)
from fulltext import SearchEngine
from handler import (
AnnouncementHandler,
BlacklistHandler,
CaptchaHandler,
CategoryHandler,
CommentChildHandler,
CommentHandler,
CommentNewestHandler,
CommentReactionHandler,
DBDumpHandler,
DoubanHandler,
DoubanReportHandler,
FacebookAuth2LoginHandler,
GitHubOAuth2LoginHandler,
GoogleOAuth2LoginHandler,
GrafanaIndexHandler,
GrafanaQueryHandler,
GrafanaSearchHandler,
IndexHandler,
LikeHandler,
MetricsHandler,
MSOAuth2LoginHandler,
NameHandler,
NotFoundHandler,
NotificationHandler,
ResourceHandler,
ResourceLatestHandler,
SpamProcessHandler,
TopHandler,
TwitterOAuth2LoginHandler,
UserAvatarHandler,
UserEmailHandler,
UserHandler,
)
from sync import YYSub
from utils import Cloudflare, setup_logger
@@ -46,44 +68,45 @@ if os.getenv("debug"):
class RunServer:
static_path = pathlib.Path(__file__).parent.joinpath('templates')
static_path = pathlib.Path(__file__).parent.joinpath("templates")
handlers = [
(r'/', IndexHandler),
(r'/api/resource', ResourceHandler),
(r'/api/resource/latest', ResourceLatestHandler),
(r'/api/top', TopHandler),
(r'/api/like', LikeHandler),
(r'/api/user', UserHandler),
(r'/api/user/avatar/?(.*)', UserAvatarHandler),
(r'/api/user/email', UserEmailHandler),
(r'/api/name', NameHandler),
(r'/api/comment', CommentHandler),
(r'/api/comment/reaction', CommentReactionHandler),
(r'/api/comment/child', CommentChildHandler),
(r'/api/comment/newest', CommentNewestHandler),
(r'/api/captcha', CaptchaHandler),
(r'/api/metrics', MetricsHandler),
(r'/api/grafana/', GrafanaIndexHandler),
(r'/api/grafana/search', GrafanaSearchHandler),
(r'/api/grafana/query', GrafanaQueryHandler),
(r'/api/blacklist', BlacklistHandler),
(r'/api/db_dump', DBDumpHandler),
(r'/api/announcement', AnnouncementHandler),
(r'/api/douban', DoubanHandler),
(r'/api/douban/report', DoubanReportHandler),
(r'/api/notification', NotificationHandler),
(r'/api/category', CategoryHandler),
(r'/api/admin/spam', SpamProcessHandler),
(r'/auth/github', GitHubOAuth2LoginHandler),
(r'/auth/google', GoogleOAuth2LoginHandler),
(r'/auth/twitter', TwitterOAuth2LoginHandler),
(r'/auth/microsoft', MSOAuth2LoginHandler),
(r'/auth/facebook', FacebookAuth2LoginHandler),
(r'/(.*\.html|.*\.js|.*\.css|.*\.png|.*\.jpg|.*\.ico|.*\.gif|.*\.woff2|.*\.gz|.*\.zip|'
r'.*\.svg|.*\.json|.*\.txt)',
web.StaticFileHandler,
{'path': static_path}),
(r"/", IndexHandler),
(r"/api/resource", ResourceHandler),
(r"/api/resource/latest", ResourceLatestHandler),
(r"/api/top", TopHandler),
(r"/api/like", LikeHandler),
(r"/api/user", UserHandler),
(r"/api/user/avatar/?(.*)", UserAvatarHandler),
(r"/api/user/email", UserEmailHandler),
(r"/api/name", NameHandler),
(r"/api/comment", CommentHandler),
(r"/api/comment/reaction", CommentReactionHandler),
(r"/api/comment/child", CommentChildHandler),
(r"/api/comment/newest", CommentNewestHandler),
(r"/api/captcha", CaptchaHandler),
(r"/api/metrics", MetricsHandler),
(r"/api/grafana/", GrafanaIndexHandler),
(r"/api/grafana/search", GrafanaSearchHandler),
(r"/api/grafana/query", GrafanaQueryHandler),
(r"/api/blacklist", BlacklistHandler),
(r"/api/db_dump", DBDumpHandler),
(r"/api/announcement", AnnouncementHandler),
(r"/api/douban", DoubanHandler),
(r"/api/douban/report", DoubanReportHandler),
(r"/api/notification", NotificationHandler),
(r"/api/category", CategoryHandler),
(r"/api/admin/spam", SpamProcessHandler),
(r"/auth/github", GitHubOAuth2LoginHandler),
(r"/auth/google", GoogleOAuth2LoginHandler),
(r"/auth/twitter", TwitterOAuth2LoginHandler),
(r"/auth/microsoft", MSOAuth2LoginHandler),
(r"/auth/facebook", FacebookAuth2LoginHandler),
(
r"/(.*\.html|.*\.js|.*\.css|.*\.png|.*\.jpg|.*\.ico|.*\.gif|.*\.woff2|.*\.gz|.*\.zip|"
r".*\.svg|.*\.json|.*\.txt)",
web.StaticFileHandler,
{"path": static_path},
),
]
settings = {
"cookie_secret": os.getenv("cookie_secret", "eo2kcgpKwXj8Q3PKYj6nIL1J4j3b58DX"),
@@ -102,14 +125,14 @@ class RunServer:
def run_server(port, host):
tornado_server = httpserver.HTTPServer(RunServer.application, xheaders=True)
tornado_server.bind(port, host)
if platform.uname().system in ("Windows", "Darwin"):
if os.getenv("PYTHON_DEV"):
tornado_server.start(1)
tornado.autoreload.start()
else:
tornado_server.start(0)
try:
print('Server is running on http://{}:{}'.format(host, port))
print("Server is running on http://{}:{}".format(host, port))
ioloop.IOLoop.instance().current().start()
except KeyboardInterrupt:
ioloop.IOLoop.instance().stop()
@@ -117,23 +140,29 @@ class RunServer:
if __name__ == "__main__":
timez = pytz.timezone('Asia/Shanghai')
timez = pytz.timezone("Asia/Shanghai")
engine = SearchEngine()
scheduler = BackgroundScheduler(timezone=timez)
scheduler.add_job(OtherMongoResource().reset_top, trigger=CronTrigger.from_crontab("0 0 1 * *"))
scheduler.add_job(sync_douban, trigger=CronTrigger.from_crontab("1 1 1 * *"))
scheduler.add_job(entry_dump, trigger=CronTrigger.from_crontab("2 2 1 * *"))
scheduler.add_job(ResourceLatestMongoResource().refresh_latest_resource, 'interval', hours=1)
scheduler.add_job(OtherMongoResource().import_ban_user, 'interval', seconds=300)
scheduler.add_job(ResourceLatestMongoResource().refresh_latest_resource, "interval", hours=1)
scheduler.add_job(OtherMongoResource().import_ban_user, "interval", seconds=300)
scheduler.add_job(cf.clear_fw, trigger=CronTrigger.from_crontab("0 0 */5 * *"))
scheduler.add_job(YYSub().run, trigger=CronTrigger.from_crontab("0 1 * * *"))
scheduler.start()
logging.info("Triggering dump database now...")
logging.info("Dumping database and ingesting data for meilisearh...")
if not os.getenv("PYTHON_DEV"):
threading.Thread(target=entry_dump).start()
# meilisearch tasks
threading.Thread(target=engine.run_import).start()
threading.Thread(target=engine.monitor_yyets).start()
threading.Thread(target=engine.monitor_douban).start()
threading.Thread(target=engine.monitor_comment).start()
options.define("p", default=8888, help="running port", type=int)
options.define("h", default='127.0.0.1', help="listen address", type=str)
options.define("h", default="127.0.0.1", help="listen address", type=str)
options.parse_command_line()
p = options.options.p
h = options.options.h