mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-25 11:29:39 +08:00
feat: 更新操作
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
@@ -58,7 +59,101 @@ class VersionManager:
|
||||
return rls_list
|
||||
|
||||
async def update_all(self):
|
||||
pass
|
||||
"""检查更新并下载源码"""
|
||||
start_time = time.time()
|
||||
|
||||
current_tag = self.get_current_version()
|
||||
old_tag = current_tag
|
||||
|
||||
rls_list = await self.get_release_list()
|
||||
|
||||
latest_rls = {}
|
||||
rls_notes = []
|
||||
latest_tag_name = ""
|
||||
for rls in rls_list:
|
||||
rls_notes.append(rls['name']) # 使用发行名称作为note
|
||||
if latest_tag_name == "":
|
||||
latest_tag_name = rls['tag_name']
|
||||
|
||||
if rls['tag_name'] == current_tag:
|
||||
break
|
||||
|
||||
if latest_rls == {}:
|
||||
latest_rls = rls
|
||||
self.ap.logger.info("更新日志: {}".format(rls_notes))
|
||||
|
||||
if latest_rls == {} and not self.is_newer(latest_tag_name, current_tag): # 没有新版本
|
||||
return False
|
||||
|
||||
# 下载最新版本的zip到temp目录
|
||||
self.ap.logger.info("开始下载最新版本: {}".format(latest_rls['zipball_url']))
|
||||
|
||||
zip_url = latest_rls['zipball_url']
|
||||
zip_resp = requests.get(
|
||||
url=zip_url,
|
||||
proxies=self.ap.proxy_mgr.get_forward_proxies()
|
||||
)
|
||||
zip_data = zip_resp.content
|
||||
|
||||
# 检查temp/updater目录
|
||||
if not os.path.exists("temp"):
|
||||
os.mkdir("temp")
|
||||
if not os.path.exists("temp/updater"):
|
||||
os.mkdir("temp/updater")
|
||||
with open("temp/updater/{}.zip".format(latest_rls['tag_name']), "wb") as f:
|
||||
f.write(zip_data)
|
||||
|
||||
self.ap.logger.info("下载最新版本完成: {}".format("temp/updater/{}.zip".format(latest_rls['tag_name'])))
|
||||
|
||||
# 解压zip到temp/updater/<tag_name>/
|
||||
import zipfile
|
||||
# 检查目标文件夹
|
||||
if os.path.exists("temp/updater/{}".format(latest_rls['tag_name'])):
|
||||
import shutil
|
||||
shutil.rmtree("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
os.mkdir("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
with zipfile.ZipFile("temp/updater/{}.zip".format(latest_rls['tag_name']), 'r') as zip_ref:
|
||||
zip_ref.extractall("temp/updater/{}".format(latest_rls['tag_name']))
|
||||
|
||||
# 覆盖源码
|
||||
source_root = ""
|
||||
# 找到temp/updater/<tag_name>/中的第一个子目录路径
|
||||
for root, dirs, files in os.walk("temp/updater/{}".format(latest_rls['tag_name'])):
|
||||
if root != "temp/updater/{}".format(latest_rls['tag_name']):
|
||||
source_root = root
|
||||
break
|
||||
|
||||
# 覆盖源码
|
||||
import shutil
|
||||
for root, dirs, files in os.walk(source_root):
|
||||
# 覆盖所有子文件子目录
|
||||
for file in files:
|
||||
src = os.path.join(root, file)
|
||||
dst = src.replace(source_root, ".")
|
||||
if os.path.exists(dst):
|
||||
os.remove(dst)
|
||||
|
||||
# 检查目标文件夹是否存在
|
||||
if not os.path.exists(os.path.dirname(dst)):
|
||||
os.makedirs(os.path.dirname(dst))
|
||||
# 检查目标文件是否存在
|
||||
if not os.path.exists(dst):
|
||||
# 创建目标文件
|
||||
open(dst, "w").close()
|
||||
|
||||
shutil.copy(src, dst)
|
||||
|
||||
# 把current_tag写入文件
|
||||
current_tag = latest_rls['tag_name']
|
||||
with open("current_tag", "w") as f:
|
||||
f.write(current_tag)
|
||||
|
||||
self.ap.ctr_mgr.main.post_update_record(
|
||||
spent_seconds=int(time.time()-start_time),
|
||||
infer_reason="update",
|
||||
old_version=old_tag,
|
||||
new_version=current_tag,
|
||||
)
|
||||
|
||||
async def is_new_version_available(self) -> bool:
|
||||
"""检查是否有新版本"""
|
||||
|
||||
Reference in New Issue
Block a user