mirror of
https://github.com/tgbot-collection/YYeTsBot.git
synced 2025-11-25 19:37:34 +08:00
class style, rename
This commit is contained in:
4
.gitignore
vendored
4
.gitignore
vendored
@@ -110,6 +110,6 @@ venv.bak/
|
||||
/.idea/modules.xml
|
||||
/.idea/vcs.xml
|
||||
.idea/
|
||||
/data/cookies.dump
|
||||
/yyetsbot/data/cookies.dump
|
||||
/.idea/inspectionProfiles/profiles_settings.xml
|
||||
data/
|
||||
yyetsbot/data/
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
dir to storage request id and data.
|
||||
127
html_request.py
127
html_request.py
@@ -1,127 +0,0 @@
|
||||
# coding: utf-8
|
||||
# YYeTsBot - html_request.py
|
||||
# 2019/8/15 18:30
|
||||
|
||||
__author__ = 'Benny <benny.think@gmail.com>'
|
||||
|
||||
import os
|
||||
import logging
|
||||
import requests
|
||||
import feedparser
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from config import SEARCH_URL, GET_USER, RSS_URL, BASE_URL, SHARE_WEB, SHARE_URL, WORKERS, SHARE_API
|
||||
from utils import load_cookies, cookie_file, login
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
|
||||
|
||||
s = requests.Session()
|
||||
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
|
||||
s.headers.update({"User-Agent": ua})
|
||||
|
||||
|
||||
def get_search_html(kw: str) -> str:
|
||||
if not os.path.exists(cookie_file):
|
||||
logging.warning("Cookie file not found")
|
||||
login()
|
||||
if not is_cookie_valid():
|
||||
login()
|
||||
cookie = load_cookies()
|
||||
logging.info("Searching for %s", kw)
|
||||
r = s.get(SEARCH_URL.format(kw=kw), cookies=cookie)
|
||||
|
||||
r.close()
|
||||
return r.text
|
||||
|
||||
|
||||
def get_detail_page(url: str) -> dict:
|
||||
logging.info("Loading detail page %s", url)
|
||||
share_link, api_res = analysis_share_page(url)
|
||||
cnname = api_res["data"]["info"]["cnname"]
|
||||
|
||||
logging.info("Loading rss...")
|
||||
rss_url = RSS_URL.format(id=url.split("/")[-1])
|
||||
rss_result = analyse_rss(rss_url)
|
||||
|
||||
# get search_content from here...
|
||||
if not rss_result:
|
||||
rss_result = api_res
|
||||
|
||||
return {"all": rss_result, "share": share_link, "cnname": cnname}
|
||||
|
||||
|
||||
def analyse_search_html(html: str) -> dict:
|
||||
logging.info('Parsing html...')
|
||||
soup = BeautifulSoup(html, 'lxml')
|
||||
link_list = soup.find_all("div", class_="clearfix search-item")
|
||||
list_result = {}
|
||||
for block in link_list:
|
||||
name = block.find_all('a')[-1].text
|
||||
url = BASE_URL + block.find_all('a')[-1].attrs['href']
|
||||
list_result[url] = name
|
||||
|
||||
return list_result
|
||||
|
||||
|
||||
def analyse_rss(feed_url: str) -> dict:
|
||||
# feed parser is meaningless now
|
||||
return {}
|
||||
# d = feedparser.parse(feed_url)
|
||||
# # data['feed']['title']
|
||||
# result = {}
|
||||
# for item in d['entries']:
|
||||
# download = {
|
||||
# "title": getattr(item, "title", ""),
|
||||
# "ed2k": getattr(item, "ed2k", ""),
|
||||
# "magnet": getattr(item, "magnet", ""),
|
||||
# "pan": getattr(item, "pan", "")}
|
||||
# result[item.guid] = download
|
||||
# return result
|
||||
|
||||
|
||||
def analysis_share_page(detail_url: str) -> (str, dict):
|
||||
rid = detail_url.split('/')[-1]
|
||||
|
||||
res = s.post(SHARE_URL, data={"rid": rid}, cookies=load_cookies()).json()
|
||||
share_code = res['data'].split('/')[-1]
|
||||
share_url = SHARE_WEB.format(code=share_code)
|
||||
logging.info("Share url is %s", share_url)
|
||||
|
||||
# get api response
|
||||
api_response = s.get(SHARE_API.format(code=share_code)).json()
|
||||
return share_url, api_response
|
||||
|
||||
|
||||
def is_cookie_valid() -> bool:
|
||||
cookie = load_cookies()
|
||||
r = s.get(GET_USER, cookies=cookie)
|
||||
return r.json()['status'] == 1
|
||||
|
||||
|
||||
def offline_search(search_content):
|
||||
# from cloudflare workers
|
||||
# no redis cache for now
|
||||
logging.info("Loading data from cfkv...")
|
||||
index = WORKERS.format(id="index")
|
||||
data: dict = requests.get(index).json()
|
||||
logging.info("Loading complete, searching now...")
|
||||
|
||||
results = {}
|
||||
for name, rid in data.items():
|
||||
if search_content in name:
|
||||
fake_url = f"http://www.rrys2020.com/resource/{rid}"
|
||||
results[fake_url] = name.replace("\n", " ")
|
||||
logging.info("Search complete")
|
||||
return results
|
||||
|
||||
|
||||
def offline_link(resource_url) -> str:
|
||||
rid = resource_url.split("/")[-1]
|
||||
query_url = WORKERS.format(id=rid)
|
||||
# TODO: too lazy to optimize cloudflare worker page.
|
||||
return query_url
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
a = offline_search("越狱")
|
||||
print(a)
|
||||
10
tests/test_fansub.py
Normal file
10
tests/test_fansub.py
Normal file
@@ -0,0 +1,10 @@
|
||||
import unittest
|
||||
|
||||
|
||||
class MyTestCase(unittest.TestCase):
|
||||
def test_something(self):
|
||||
self.assertEqual(True, False)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -19,12 +19,11 @@ from telebot import types, apihelper
|
||||
from tgbot_ping import get_runtime
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
|
||||
from html_request import get_search_html, analyse_search_html, get_detail_page, offline_search, offline_link
|
||||
from utils import (save_error_dump, save_to_cache, yyets_get_from_cache, get_error_dump,
|
||||
reset_request, today_request, show_usage,
|
||||
redis_announcement
|
||||
from fansub import YYeTs
|
||||
from utils import (save_error_dump, get_error_dump, reset_request, today_request,
|
||||
show_usage, redis_announcement
|
||||
)
|
||||
from config import PROXY, TOKEN, SEARCH_URL, MAINTAINER, REPORT, WORKERS, OFFLINE
|
||||
from config import PROXY, TOKEN, SEARCH_URL, MAINTAINER, REPORT, OFFLINE
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
|
||||
if PROXY:
|
||||
@@ -157,6 +156,7 @@ def send_my_response(message):
|
||||
|
||||
@bot.message_handler(content_types=["photo", "text"])
|
||||
def send_search(message):
|
||||
yyets = YYeTs()
|
||||
bot.send_chat_action(message.chat.id, 'typing')
|
||||
|
||||
today_request("total")
|
||||
@@ -170,7 +170,7 @@ def send_search(message):
|
||||
logging.info('Receiving message: %s from user %s(%s)', name, message.chat.username, message.chat.id)
|
||||
if name is None:
|
||||
today_request("invalid")
|
||||
with open('assets/warning.webp', 'rb') as sti:
|
||||
with open('warning.webp', 'rb') as sti:
|
||||
bot.send_message(message.chat.id, "不要调戏我!我会报警的")
|
||||
bot.send_sticker(message.chat.id, sti)
|
||||
return
|
||||
@@ -178,12 +178,10 @@ def send_search(message):
|
||||
if OFFLINE:
|
||||
logging.warning("☢️ Going offline mode!!!")
|
||||
bot.send_message(message.chat.id, "人人影视官网不可用,目前在使用离线模式,可能没有最新的剧集。")
|
||||
html = ""
|
||||
bot.send_chat_action(message.chat.id, 'upload_document')
|
||||
result = offline_search(name)
|
||||
result = yyets.offline_search_preview(name)
|
||||
else:
|
||||
html = get_search_html(name)
|
||||
result = analyse_search_html(html)
|
||||
result = yyets.online_search_preview(name)
|
||||
|
||||
markup = types.InlineKeyboardMarkup()
|
||||
for url, detail in result.items():
|
||||
@@ -216,30 +214,26 @@ def send_search(message):
|
||||
问题发生时间:{time.strftime("%Y-%m-%data %H:%M:%S", time.localtime(message.date))}
|
||||
请求内容:{name}
|
||||
请求URL:{SEARCH_URL.format(kw=encoded)}\n\n
|
||||
返回内容:{html}
|
||||
|
||||
"""
|
||||
save_error_dump(message.chat.id, content)
|
||||
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: re.findall(r"choose(\S*)", call.data))
|
||||
def choose_link(call):
|
||||
yyets = YYeTs()
|
||||
bot.send_chat_action(call.message.chat.id, 'typing')
|
||||
# call.data is url, http://www.rrys2020.com/resource/36588
|
||||
resource_url = re.findall(r"choose(\S*)", call.data)[0]
|
||||
markup = types.InlineKeyboardMarkup()
|
||||
|
||||
if OFFLINE:
|
||||
worker_page = offline_link(resource_url)
|
||||
worker_page = yyets.offline_search_result(resource_url)
|
||||
btn1 = types.InlineKeyboardButton("打开网页", url=worker_page)
|
||||
markup.add(btn1)
|
||||
bot.send_message(call.message.chat.id, "离线模式,点击按钮打开网页获取结果", reply_markup=markup)
|
||||
return
|
||||
|
||||
link = yyets_get_from_cache(resource_url)
|
||||
if not link:
|
||||
link = get_detail_page(resource_url)
|
||||
save_to_cache(resource_url, link)
|
||||
|
||||
btn1 = types.InlineKeyboardButton("分享页面", callback_data="share%s" % resource_url)
|
||||
btn2 = types.InlineKeyboardButton("我全都要", callback_data="all%s" % resource_url)
|
||||
markup.add(btn1, btn2)
|
||||
@@ -251,18 +245,20 @@ def choose_link(call):
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: re.findall(r"share(\S*)", call.data))
|
||||
def share_page(call):
|
||||
yyets = YYeTs()
|
||||
bot.send_chat_action(call.message.chat.id, 'typing')
|
||||
resource_url = re.findall(r"share(\S*)", call.data)[0]
|
||||
result = yyets_get_from_cache(resource_url)
|
||||
result = yyets.online_search_result(resource_url)
|
||||
bot.send_message(call.message.chat.id, result['share'])
|
||||
|
||||
|
||||
@bot.callback_query_handler(func=lambda call: re.findall(r"all(\S*)", call.data))
|
||||
def all_episode(call):
|
||||
# just send a file
|
||||
yyets = YYeTs()
|
||||
bot.send_chat_action(call.message.chat.id, 'typing')
|
||||
resource_url = re.findall(r"all(\S*)", call.data)[0]
|
||||
result = yyets_get_from_cache(resource_url)
|
||||
result = yyets.online_search_result(resource_url)
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='wb+', prefix=result["cnname"], suffix=".txt") as tmp:
|
||||
bytes_data = json.dumps(result["all"], ensure_ascii=False, indent=4).encode('u8')
|
||||
@@ -9,8 +9,6 @@ import os
|
||||
BASE_URL = "http://www.rrys2020.com"
|
||||
LOGIN_URL = "http://www.rrys2020.com/user/login"
|
||||
GET_USER = "http://www.rrys2020.com/user/login/getCurUserTopInfo"
|
||||
# rss is unavailable as of 2021.01.10
|
||||
RSS_URL = "http://rss.rrys.tv/rss/feed/{id}"
|
||||
SEARCH_URL = "http://www.rrys2020.com/search?keyword={kw}&type=resource"
|
||||
AJAX_LOGIN = "http://www.rrys2020.com/User/Login/ajaxLogin"
|
||||
SHARE_URL = "http://www.rrys2020.com/resource/ushare"
|
||||
@@ -28,3 +26,6 @@ MAINTAINER = os.environ.get("MAINTAINER")
|
||||
REDIS = os.environ.get("REDIS") or "redis"
|
||||
REPORT = os.environ.get("REPORT") or False
|
||||
OFFLINE = os.environ.get("OFFLINE") or False
|
||||
|
||||
FIX_RESOURCE = "https://www.zimuxia.cn/portfolio/{name}"
|
||||
FIX_SEARCH = "https://www.zimuxia.cn/?s={name}"
|
||||
233
yyetsbot/fansub.py
Normal file
233
yyetsbot/fansub.py
Normal file
@@ -0,0 +1,233 @@
|
||||
# coding: utf-8
|
||||
# YYeTsBot - fansub.py
|
||||
# 2019/8/15 18:30
|
||||
|
||||
__author__ = 'Benny <benny.think@gmail.com>'
|
||||
|
||||
import os
|
||||
import logging
|
||||
import requests
|
||||
import pickle
|
||||
import sys
|
||||
import json
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from config import (SEARCH_URL, GET_USER, BASE_URL, SHARE_WEB,
|
||||
SHARE_URL, WORKERS, SHARE_API, USERNAME, PASSWORD,
|
||||
AJAX_LOGIN, REDIS)
|
||||
import redis
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s [%(levelname)s]: %(message)s')
|
||||
|
||||
session = requests.Session()
|
||||
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
|
||||
session.headers.update({"User-Agent": ua})
|
||||
|
||||
|
||||
class BaseFansub:
|
||||
"""
|
||||
all the subclass should implement three kinds of methods:
|
||||
1. online search, contains preview for bot and complete result
|
||||
2. offline search (set pass if not applicable)
|
||||
3. login and check (set pass if not applicable)
|
||||
4. search_result this is critical for bot to draw markup
|
||||
|
||||
"""
|
||||
label = None
|
||||
cookie_file = None
|
||||
|
||||
def __init__(self):
|
||||
self.data = None
|
||||
self.url = None
|
||||
self.redis = redis.StrictRedis(host=REDIS, decode_responses=True)
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
# implement how to get the unique id for this resource
|
||||
return None
|
||||
|
||||
def __get_search_html__(self, kw: str) -> str:
|
||||
# return html text of search page
|
||||
pass
|
||||
|
||||
def online_search_preview(self, search_text: str) -> dict:
|
||||
# try to retrieve critical information from html
|
||||
# this result must return to bot for manual selection
|
||||
# {"url1": "name1", "url2": "name2"}
|
||||
pass
|
||||
|
||||
def online_search_result(self, resource_url: str) -> dict:
|
||||
"""
|
||||
This will happen when user click one of the button, only by then we can know the resource link
|
||||
From the information above, try to get a detail dict structure.
|
||||
This method should check cache first if applicable
|
||||
This method should set self.link and self.data
|
||||
This method should call __execute_online_search
|
||||
:param resource_url:
|
||||
:return: {"all": rss_result, "share": share_link, "cnname": cnname}
|
||||
|
||||
"""
|
||||
pass
|
||||
|
||||
def __execute_online_search_result(self) -> dict:
|
||||
"""
|
||||
Do the real search job, without any cache mechanism
|
||||
:return: {"all": rss_result, "share": share_link, "cnname": cnname}
|
||||
"""
|
||||
pass
|
||||
|
||||
def offline_search_preview(self, search_text: str) -> dict:
|
||||
# this result must return to bot for manual selection
|
||||
# the same as online
|
||||
pass
|
||||
|
||||
def offline_search_result(self, resource_url) -> dict:
|
||||
"""
|
||||
Same as online_search_result
|
||||
:param resource_url:
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
def __execute_offline_search_result(self) -> dict:
|
||||
"""
|
||||
Do the search job, without any cache mechanism
|
||||
:return: {"all": rss_result, "share": share_link, "cnname": cnname}
|
||||
"""
|
||||
pass
|
||||
|
||||
def __login_check(self):
|
||||
pass
|
||||
|
||||
def __manual_login(self):
|
||||
pass
|
||||
|
||||
def __save_cookies(self, requests_cookiejar):
|
||||
with open(self.cookie_file, 'wb') as f:
|
||||
pickle.dump(requests_cookiejar, f)
|
||||
|
||||
def __load_cookies(self):
|
||||
with open(self.cookie_file, 'rb') as f:
|
||||
return pickle.load(f)
|
||||
|
||||
def __get_from_cache(self, url: str, method_name: str) -> dict:
|
||||
logging.info("Reading %s data from cache %s", self.label, url)
|
||||
data = self.redis.get(url)
|
||||
if data:
|
||||
logging.info("Cache hit")
|
||||
return json.loads(data)
|
||||
else:
|
||||
logging.info("Cache miss")
|
||||
result_method = getattr(self, method_name)
|
||||
self.__save_to_cache(url, result_method(url))
|
||||
return self.__get_from_cache(url, method_name)
|
||||
|
||||
def __save_to_cache(self, url: str, value: dict, ex=3600 * 12) -> None:
|
||||
data = json.dumps(value, ensure_ascii=False)
|
||||
self.redis.set(url, data, ex=ex)
|
||||
|
||||
|
||||
class YYeTs(BaseFansub):
|
||||
label = "yyets"
|
||||
cookie_file = os.path.join("data", "cookies.dump")
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
# implement how to get the unique id for this resource
|
||||
rid = self.url.split('/')[-1]
|
||||
return rid
|
||||
|
||||
def __get_search_html__(self, kw: str) -> str:
|
||||
self.__login_check()
|
||||
cookie = self.__load_cookies()
|
||||
logging.info("Searching for %s", kw)
|
||||
r = session.get(SEARCH_URL.format(kw=kw), cookies=cookie)
|
||||
r.close()
|
||||
return r.text
|
||||
|
||||
def online_search_preview(self, search_text: str) -> dict:
|
||||
html_text = self.__get_search_html__(search_text)
|
||||
logging.info('Parsing html...')
|
||||
soup = BeautifulSoup(html_text, 'lxml')
|
||||
link_list = soup.find_all("div", class_="clearfix search-item")
|
||||
dict_result = {}
|
||||
for block in link_list:
|
||||
name = block.find_all('a')[-1].text
|
||||
url = BASE_URL + block.find_all('a')[-1].attrs['href']
|
||||
dict_result[url] = name
|
||||
|
||||
return dict_result
|
||||
|
||||
def online_search_result(self, resource_url: str) -> dict:
|
||||
self.url = resource_url
|
||||
self.data = self.__get_from_cache(self.url, self.__execute_online_search_result.__name__)
|
||||
return self.data
|
||||
|
||||
def __execute_online_search_result(self) -> dict:
|
||||
logging.info("Loading detail page %s", self.url)
|
||||
share_link, api_res = self.__get_share_page()
|
||||
cnname = api_res["data"]["info"]["cnname"]
|
||||
self.data = {"all": api_res, "share": share_link, "cnname": cnname}
|
||||
return self.data
|
||||
|
||||
def offline_search_preview(self, search_text: str) -> dict:
|
||||
# from cloudflare workers
|
||||
# no redis cache for now
|
||||
logging.info("Loading data from cfkv...")
|
||||
index = WORKERS.format(id="index")
|
||||
data: dict = requests.get(index).json()
|
||||
logging.info("Loading complete, searching now...")
|
||||
|
||||
results = {}
|
||||
for name, rid in data.items():
|
||||
if search_text in name:
|
||||
fake_url = f"http://www.rrys2020.com/resource/{rid}"
|
||||
results[fake_url] = name.replace("\n", " ")
|
||||
logging.info("Search complete")
|
||||
return results
|
||||
|
||||
def offline_search_result(self, resource_url) -> dict:
|
||||
self.url = resource_url
|
||||
query_url = WORKERS.format(id=self.id)
|
||||
self.data = {"all": None, "share": query_url, "cnname": None}
|
||||
return self.data
|
||||
|
||||
def __login_check(self):
|
||||
if not os.path.exists(self.cookie_file):
|
||||
logging.warning("Cookie file not found")
|
||||
self.__manual_login()
|
||||
|
||||
cookie = self.__load_cookies()
|
||||
r = session.get(GET_USER, cookies=cookie)
|
||||
if not r.json()['status'] == 1:
|
||||
self.__manual_login()
|
||||
|
||||
def __manual_login(self):
|
||||
data = {"account": USERNAME, "password": PASSWORD, "remember": 1}
|
||||
logging.info("Login in as %s", data)
|
||||
r = requests.post(AJAX_LOGIN, data=data)
|
||||
resp = r.json()
|
||||
if resp.get('status') == 1:
|
||||
logging.info("Login success! %s", r.cookies)
|
||||
self.__save_cookies(r.cookies)
|
||||
else:
|
||||
logging.error("Login failed! %s", resp)
|
||||
sys.exit(1)
|
||||
r.close()
|
||||
|
||||
def __get_share_page(self):
|
||||
rid = self.id
|
||||
|
||||
res = session.post(SHARE_URL, data={"rid": rid}, cookies=self.__load_cookies()).json()
|
||||
share_code = res['data'].split('/')[-1]
|
||||
share_url = SHARE_WEB.format(code=share_code)
|
||||
logging.info("Share url is %s", share_url)
|
||||
|
||||
# get api response
|
||||
api_response = session.get(SHARE_API.format(code=share_code)).json()
|
||||
return share_url, api_response
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
y = YYeTs()
|
||||
@@ -19,23 +19,6 @@ r = redis.StrictRedis(host=REDIS, decode_responses=True)
|
||||
cookie_file = os.path.join(os.path.dirname(__file__), 'data', 'cookies.dump')
|
||||
|
||||
|
||||
def save_to_cache(url: str, value: dict) -> None:
|
||||
data = json.dumps(value, ensure_ascii=False)
|
||||
r.set(url, data, ex=3600 * 12)
|
||||
|
||||
|
||||
def yyets_get_from_cache(url: str) -> dict:
|
||||
logging.info("Reading data from cache %s", url)
|
||||
from html_request import get_detail_page
|
||||
|
||||
data = r.get(url)
|
||||
if data:
|
||||
logging.info("Cache hit")
|
||||
return json.loads(data)
|
||||
else:
|
||||
logging.info("Cache miss")
|
||||
save_to_cache(url, get_detail_page(url))
|
||||
return yyets_get_from_cache(url)
|
||||
|
||||
|
||||
def save_error_dump(uid, err: str):
|
||||
@@ -59,30 +42,6 @@ def redis_announcement(content="", op="get"):
|
||||
r.delete("announcement")
|
||||
|
||||
|
||||
def save_cookies(requests_cookiejar):
|
||||
with open(cookie_file, 'wb') as f:
|
||||
pickle.dump(requests_cookiejar, f)
|
||||
|
||||
|
||||
def load_cookies():
|
||||
with open(cookie_file, 'rb') as f:
|
||||
return pickle.load(f)
|
||||
|
||||
|
||||
def login():
|
||||
data = {"account": USERNAME, "password": PASSWORD, "remember": 1}
|
||||
logging.info("Login in as %s", data)
|
||||
r = requests.post(AJAX_LOGIN, data=data)
|
||||
resp = r.json()
|
||||
if resp.get('status') == 1:
|
||||
logging.info("Login success! %s", r.cookies)
|
||||
save_cookies(r.cookies)
|
||||
else:
|
||||
logging.error("Login failed! %s", resp)
|
||||
sys.exit(1)
|
||||
r.close()
|
||||
|
||||
|
||||
def today_request(request_type: str):
|
||||
if r.exists("usage"):
|
||||
data: str = r.get("usage")
|
||||
|
Before Width: | Height: | Size: 19 KiB After Width: | Height: | Size: 19 KiB |
Reference in New Issue
Block a user