Compare commits

...

8 Commits

Author SHA1 Message Date
Junyan Qin
f4fa0b42a6 chore: release v3.4.1.2 2024-12-17 01:22:31 +08:00
Junyan Qin
209e89712d Merge pull request #953 from RockChinQ/perf/dify-sv-api
perf: 完善 dify api runner
2024-12-17 01:21:01 +08:00
Junyan Qin
3314a7a9e9 fix: 在设置model为非视觉模型时,非local-agent的runner无法获得图片消息 (#948) 2024-12-17 01:17:57 +08:00
Junyan Qin
793d64303e perf: 完善dify api runner 2024-12-17 01:04:08 +08:00
Junyan Qin
6642498f00 feat: 添加对 agent 应用的支持 (#951) 2024-12-17 00:41:28 +08:00
Junyan Qin
32b400dcb1 fix: dify的timeout无法自定义 (#949) 2024-12-16 23:54:56 +08:00
Junyan Qin
0dcd2d8179 doc: 添加 zeabur 部署方式 2024-12-16 20:02:04 +08:00
Junyan Qin
736f8b613c feat: 为 ollama 支持视觉和函数调用 (#950) 2024-12-15 17:05:56 +08:00
12 changed files with 297 additions and 106 deletions

View File

@@ -48,14 +48,20 @@
>
> 在您开始任何方式部署之前,请务必阅读[新手指引](https://docs.langbot.app/insight/guide.html)。
#### 宝塔面板部署
LangBot 已上架宝塔面板,若您已安装宝塔面板,可以根据[文档](https://docs.langbot.app/deploy/langbot/one-click/bt.html)使用。
#### Docker 部署
#### Docker Compose 部署
适合熟悉 Docker 的用户,查看文档[Docker 部署](https://docs.langbot.app/deploy/langbot/docker.html)。
#### 宝塔面板部署
已上架宝塔面板,若您已安装宝塔面板,可以根据[文档](https://docs.langbot.app/deploy/langbot/one-click/bt.html)使用。
#### Zeabur 云部署
社区贡献的 Zeabur 模板。
[![Deploy on Zeabur](https://zeabur.com/button.svg)](https://zeabur.com/zh-CN/templates/ZKTBDH)
#### 手动部署
直接使用发行版运行,查看文档[手动部署](https://docs.langbot.app/deploy/langbot/manual.html)。

View File

@@ -10,8 +10,8 @@ class TestDifyClient:
async def test_chat_messages(self):
cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL"))
resp = await cln.chat_messages(inputs={}, query="Who are you?", user="test")
print(json.dumps(resp, ensure_ascii=False, indent=4))
async for chunk in cln.chat_messages(inputs={}, query="调用工具查看现在几点?", user="test"):
print(json.dumps(chunk, ensure_ascii=False, indent=4))
async def test_upload_file(self):
cln = client.AsyncDifyServiceClient(api_key=os.getenv("DIFY_API_KEY"), base_url=os.getenv("DIFY_BASE_URL"))
@@ -41,4 +41,4 @@ class TestDifyClient:
print(json.dumps(chunks, ensure_ascii=False, indent=4))
if __name__ == "__main__":
asyncio.run(TestDifyClient().test_workflow_run())
asyncio.run(TestDifyClient().test_chat_messages())

View File

@@ -26,21 +26,22 @@ class AsyncDifyServiceClient:
inputs: dict[str, typing.Any],
query: str,
user: str,
response_mode: str = "blocking", # 当前不支持 streaming
response_mode: str = "streaming", # 当前不支持 blocking
conversation_id: str = "",
files: list[dict[str, typing.Any]] = [],
timeout: float = 30.0,
) -> dict[str, typing.Any]:
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""发送消息"""
if response_mode != "blocking":
raise DifyAPIError("当前仅支持 blocking 模式")
if response_mode != "streaming":
raise DifyAPIError("当前仅支持 streaming 模式")
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
response = await client.post(
async with client.stream(
"POST",
"/chat-messages",
headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
json={
@@ -51,12 +52,14 @@ class AsyncDifyServiceClient:
"conversation_id": conversation_id,
"files": files,
},
)
if response.status_code != 200:
raise DifyAPIError(f"{response.status_code} {response.text}")
return response.json()
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f"{r.status_code} {chunk}")
if chunk.strip() == "":
continue
if chunk.startswith("data:"):
yield json.loads(chunk[5:])
async def workflow_run(
self,
@@ -88,6 +91,8 @@ class AsyncDifyServiceClient:
},
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise DifyAPIError(f"{r.status_code} {chunk}")
if chunk.strip() == "":
continue
if chunk.startswith("data:"):
@@ -100,10 +105,6 @@ class AsyncDifyServiceClient:
timeout: float = 30.0,
) -> str:
"""上传文件"""
# curl -X POST 'http://dify.rockchin.top/v1/files/upload' \
# --header 'Authorization: Bearer {api_key}' \
# --form 'file=@localfile;type=image/[png|jpeg|jpg|webp|gif] \
# --form 'user=abc-123'
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from .. import migration
@migration.migration_class("dify-api-timeout-params", 17)
class DifyAPITimeoutParamsMigration(migration.Migration):
"""迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['chat'] or 'timeout' not in self.ap.provider_cfg.data['dify-service-api']['workflow'] \
or 'agent' not in self.ap.provider_cfg.data['dify-service-api']
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['dify-service-api']['chat']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['workflow']['timeout'] = 120
self.ap.provider_cfg.data['dify-service-api']['agent'] = {
"api-key": "app-1234567890",
"timeout": 120
}
await self.ap.provider_cfg.dump_config()

View File

@@ -7,7 +7,7 @@ from .. import migration
from ..migrations import m001_sensitive_word_migration, m002_openai_config_migration, m003_anthropic_requester_cfg_completion, m004_moonshot_cfg_completion
from ..migrations import m005_deepseek_cfg_completion, m006_vision_config, m007_qcg_center_url, m008_ad_fixwin_config_migrate, m009_msg_truncator_cfg
from ..migrations import m010_ollama_requester_config, m011_command_prefix_config, m012_runner_config, m013_http_api_config, m014_force_delay_config
from ..migrations import m015_gitee_ai_config, m016_dify_service_api
from ..migrations import m015_gitee_ai_config, m016_dify_service_api, m017_dify_api_timeout_params
@stage.stage_class("MigrationStage")

View File

@@ -45,7 +45,7 @@ class PreProcessor(stage.PipelineStage):
# 检查vision是否启用没启用就删除所有图片
if not self.ap.provider_cfg.data['enable-vision'] or not query.use_model.vision_supported:
if not self.ap.provider_cfg.data['enable-vision'] or (self.ap.provider_cfg.data['runner'] == 'local-agent' and not query.use_model.vision_supported):
for msg in query.messages:
if isinstance(msg.content, list):
for me in msg.content:
@@ -60,13 +60,13 @@ class PreProcessor(stage.PipelineStage):
llm_entities.ContentElement.from_text(me.text)
)
elif isinstance(me, platform_message.Image):
if self.ap.provider_cfg.data['enable-vision'] and query.use_model.vision_supported:
if self.ap.provider_cfg.data['enable-vision'] and (self.ap.provider_cfg.data['runner'] != 'local-agent' or query.use_model.vision_supported):
if me.url is not None:
content_list.append(
llm_entities.ContentElement.from_image_url(str(me.url))
)
query.user_message = llm_entities.Message( # TODO 适配多模态输入
query.user_message = llm_entities.Message(
role='user',
content=content_list
)

View File

@@ -91,7 +91,7 @@ class ChatMessageHandler(handler.MessageHandler):
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:
self.ap.logger.error(f'对话({query.query_id})请求失败: {str(e)}')
self.ap.logger.error(f'对话({query.query_id})请求失败: {type(e).__name__} {str(e)}')
yield entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,

View File

@@ -4,6 +4,8 @@ import asyncio
import os
import typing
from typing import Union, Mapping, Any, AsyncIterator
import uuid
import json
import async_lru
import ollama
@@ -60,21 +62,49 @@ class OllamaChatCompletions(requester.LLMAPIRequester):
image_urls.append(image_url)
msg["content"] = "\n".join(text_content)
msg["images"] = [url.split(',')[1] for url in image_urls]
if 'tool_calls' in msg: # LangBot 内部以 str 存储 tool_calls 的参数,这里需要转换为 dict
for tool_call in msg['tool_calls']:
tool_call['function']['arguments'] = json.loads(tool_call['function']['arguments'])
args["messages"] = messages
resp: Mapping[str, Any] | AsyncIterator[Mapping[str, Any]] = await self._req(args)
args["tools"] = []
if user_funcs:
tools = await self.ap.tool_mgr.generate_tools_for_openai(user_funcs)
if tools:
args["tools"] = tools
resp = await self._req(args)
message: llm_entities.Message = await self._make_msg(resp)
return message
async def _make_msg(
self,
chat_completions: Union[Mapping[str, Any], AsyncIterator[Mapping[str, Any]]]) -> llm_entities.Message:
message: Any = chat_completions.pop('message', None)
chat_completions: ollama.ChatResponse) -> llm_entities.Message:
message: ollama.Message = chat_completions.message
if message is None:
raise ValueError("chat_completions must contain a 'message' field")
message.update(chat_completions)
ret_msg: llm_entities.Message = llm_entities.Message(**message)
ret_msg: llm_entities.Message = None
if message.content is not None:
ret_msg = llm_entities.Message(
role="assistant",
content=message.content
)
if message.tool_calls is not None and len(message.tool_calls) > 0:
tool_calls: list[llm_entities.ToolCall] = []
for tool_call in message.tool_calls:
tool_calls.append(llm_entities.ToolCall(
id=uuid.uuid4().hex,
type="function",
function=llm_entities.FunctionCall(
name=tool_call.function.name,
arguments=json.dumps(tool_call.function.arguments)
)
))
ret_msg.tool_calls = tool_calls
return ret_msg
async def call(
@@ -92,7 +122,7 @@ class OllamaChatCompletions(requester.LLMAPIRequester):
msg_dict["content"] = "\n".join(part["text"] for part in content)
req_messages.append(msg_dict)
try:
return await self._closure(req_messages, model)
return await self._closure(req_messages, model, funcs)
except asyncio.TimeoutError:
raise errors.RequesterError('请求超时')

View File

@@ -20,125 +20,259 @@ class DifyServiceAPIRunner(runner.RequestRunner):
async def initialize(self):
"""初始化"""
valid_app_types = ['chat', 'workflow']
if self.ap.provider_cfg.data['dify-service-api']['app-type'] not in valid_app_types:
raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}")
valid_app_types = ["chat", "agent", "workflow"]
if (
self.ap.provider_cfg.data["dify-service-api"]["app-type"]
not in valid_app_types
):
raise errors.DifyAPIError(
f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}"
)
api_key = self.ap.provider_cfg.data['dify-service-api'][self.ap.provider_cfg.data['dify-service-api']['app-type']]['api-key']
api_key = self.ap.provider_cfg.data["dify-service-api"][
self.ap.provider_cfg.data["dify-service-api"]["app-type"]
]["api-key"]
self.dify_client = client.AsyncDifyServiceClient(
api_key=api_key,
base_url=self.ap.provider_cfg.data['dify-service-api']['base-url']
base_url=self.ap.provider_cfg.data["dify-service-api"]["base-url"],
)
async def _preprocess_user_message(self, query: core_entities.Query) -> tuple[str, list[str]]:
async def _preprocess_user_message(
self, query: core_entities.Query
) -> tuple[str, list[str]]:
"""预处理用户消息,提取纯文本,并将图片上传到 Dify 服务
Returns:
tuple[str, list[str]]: 纯文本和图片的 Dify 服务图片 ID
"""
plain_text = ''
plain_text = ""
image_ids = []
if isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
if ce.type == "text":
plain_text += ce.text
elif ce.type == 'image_url':
file_bytes, image_format = await image.get_qq_image_bytes(ce.image_url.url)
elif ce.type == "image_url":
file_bytes, image_format = await image.get_qq_image_bytes(
ce.image_url.url
)
file = ("img.png", file_bytes, f"image/{image_format}")
file_upload_resp = await self.dify_client.upload_file(file, f"{query.session.launcher_type.value}_{query.session.launcher_id}")
image_id = file_upload_resp['id']
file_upload_resp = await self.dify_client.upload_file(
file,
f"{query.session.launcher_type.value}_{query.session.launcher_id}",
)
image_id = file_upload_resp["id"]
image_ids.append(image_id)
elif isinstance(query.user_message.content, str):
plain_text = query.user_message.content
return plain_text, image_ids
async def _chat_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
async def _chat_messages(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or ""
plain_text, image_ids = await self._preprocess_user_message(query)
files = [{
'type': 'image',
'transfer_method': 'local_file',
'upload_file_id': image_id,
} for image_id in image_ids]
files = [
{
"type": "image",
"transfer_method": "local_file",
"upload_file_id": image_id,
}
for image_id in image_ids
]
resp = await self.dify_client.chat_messages(inputs={}, query=plain_text, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", conversation_id=cov_id, files=files)
mode = "basic" # 标记是基础编排还是工作流编排
msg = llm_entities.Message(
role='assistant',
content=resp['answer'],
)
basic_mode_pending_chunk = ''
yield msg
async for chunk in self.dify_client.chat_messages(
inputs={},
query=plain_text,
user=f"{query.session.launcher_type.value}_{query.session.launcher_id}",
conversation_id=cov_id,
files=files,
timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"],
):
self.ap.logger.debug("dify-chat-chunk: ", chunk)
query.session.using_conversation.uuid = resp['conversation_id']
if chunk['event'] == 'workflow_started':
mode = "workflow"
async def _workflow_messages(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
if mode == "workflow":
if chunk['event'] == 'node_finished':
if chunk['data']['node_type'] == 'answer':
yield llm_entities.Message(
role="assistant",
content=chunk['data']['outputs']['answer'],
)
elif mode == "basic":
if chunk['event'] == 'message':
basic_mode_pending_chunk += chunk['answer']
elif chunk['event'] == 'message_end':
yield llm_entities.Message(
role="assistant",
content=basic_mode_pending_chunk,
)
basic_mode_pending_chunk = ''
query.session.using_conversation.uuid = chunk["conversation_id"]
async def _agent_chat_messages(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用聊天助手"""
cov_id = query.session.using_conversation.uuid or ""
plain_text, image_ids = await self._preprocess_user_message(query)
files = [
{
"type": "image",
"transfer_method": "local_file",
"upload_file_id": image_id,
}
for image_id in image_ids
]
ignored_events = ["agent_message"]
async for chunk in self.dify_client.chat_messages(
inputs={},
query=plain_text,
user=f"{query.session.launcher_type.value}_{query.session.launcher_id}",
response_mode="streaming",
conversation_id=cov_id,
files=files,
timeout=self.ap.provider_cfg.data["dify-service-api"]["chat"]["timeout"],
):
self.ap.logger.debug("dify-agent-chunk: ", chunk)
if chunk["event"] in ignored_events:
continue
if chunk["event"] == "agent_thought":
if chunk['tool'] != '' and chunk['observation'] != '': # 工具调用结果,跳过
continue
if chunk['thought'].strip() != '': # 文字回复内容
msg = llm_entities.Message(
role="assistant",
content=chunk["thought"],
)
yield msg
if chunk['tool']:
msg = llm_entities.Message(
role="assistant",
tool_calls=[
llm_entities.ToolCall(
id=chunk['id'],
type="function",
function=llm_entities.FunctionCall(
name=chunk["tool"],
arguments=json.dumps({}),
),
)
],
)
yield msg
query.session.using_conversation.uuid = chunk["conversation_id"]
async def _workflow_messages(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""调用工作流"""
if not query.session.using_conversation.uuid:
query.session.using_conversation.uuid = str(uuid.uuid4())
cov_id = query.session.using_conversation.uuid
plain_text, image_ids = await self._preprocess_user_message(query)
files = [{
'type': 'image',
'transfer_method': 'local_file',
'upload_file_id': image_id,
} for image_id in image_ids]
files = [
{
"type": "image",
"transfer_method": "local_file",
"upload_file_id": image_id,
}
for image_id in image_ids
]
ignored_events = ['text_chunk', 'workflow_started']
ignored_events = ["text_chunk", "workflow_started"]
async for chunk in self.dify_client.workflow_run(inputs={
"langbot_user_message_text": plain_text,
"langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}",
"langbot_conversation_id": cov_id,
}, user=f"{query.session.launcher_type.value}_{query.session.launcher_id}", files=files):
if chunk['event'] in ignored_events:
async for chunk in self.dify_client.workflow_run(
inputs={
"langbot_user_message_text": plain_text,
"langbot_session_id": f"{query.session.launcher_type.value}_{query.session.launcher_id}",
"langbot_conversation_id": cov_id,
},
user=f"{query.session.launcher_type.value}_{query.session.launcher_id}",
files=files,
timeout=self.ap.provider_cfg.data["dify-service-api"]["workflow"]["timeout"],
):
self.ap.logger.debug("dify-workflow-chunk: ", chunk)
if chunk["event"] in ignored_events:
continue
if chunk['event'] == 'node_started':
if chunk['data']['node_type'] == 'start' or chunk['data']['node_type'] == 'end':
if chunk["event"] == "node_started":
if (
chunk["data"]["node_type"] == "start"
or chunk["data"]["node_type"] == "end"
):
continue
msg = llm_entities.Message(
role='assistant',
role="assistant",
content=None,
tool_calls=[llm_entities.ToolCall(
id=chunk['data']['node_id'],
type='function',
function=llm_entities.FunctionCall(
name=chunk['data']['title'],
arguments=json.dumps({}),
),
)],
tool_calls=[
llm_entities.ToolCall(
id=chunk["data"]["node_id"],
type="function",
function=llm_entities.FunctionCall(
name=chunk["data"]["title"],
arguments=json.dumps({}),
),
)
],
)
yield msg
elif chunk['event'] == 'workflow_finished':
elif chunk["event"] == "workflow_finished":
if chunk['data']['error']:
raise errors.DifyAPIError(chunk['data']['error'])
msg = llm_entities.Message(
role='assistant',
content=chunk['data']['outputs'][self.ap.provider_cfg.data['dify-service-api']['workflow']['output-key']],
role="assistant",
content=chunk["data"]["outputs"][
self.ap.provider_cfg.data["dify-service-api"]["workflow"][
"output-key"
]
],
)
yield msg
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
async def run(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message, None]:
"""运行请求"""
if self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'chat':
if self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "chat":
async for msg in self._chat_messages(query):
yield msg
elif self.ap.provider_cfg.data['dify-service-api']['app-type'] == 'workflow':
elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "agent":
async for msg in self._agent_chat_messages(query):
yield msg
elif self.ap.provider_cfg.data["dify-service-api"]["app-type"] == "workflow":
async for msg in self._workflow_messages(query):
yield msg
else:
raise errors.DifyAPIError(f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}")
raise errors.DifyAPIError(
f"不支持的 Dify 应用类型: {self.ap.provider_cfg.data['dify-service-api']['app-type']}"
)

View File

@@ -1,4 +1,4 @@
semantic_version = "v3.4.1.1"
semantic_version = "v3.4.1.2"
debug_mode = False

View File

@@ -22,13 +22,3 @@ def install_requirements(file):
pipmain(['install', '-r', file, "-i", "https://pypi.tuna.tsinghua.edu.cn/simple",
"--trusted-host", "pypi.tuna.tsinghua.edu.cn"])
# log.reset_logging()
if __name__ == "__main__":
try:
install("openai11")
except Exception as e:
print(111)
print(e)
print(222)

View File

@@ -62,11 +62,17 @@
"base-url": "https://api.dify.ai/v1",
"app-type": "chat",
"chat": {
"api-key": "app-1234567890"
"api-key": "app-1234567890",
"timeout": 120
},
"agent": {
"api-key": "app-1234567890",
"timeout": 120
},
"workflow": {
"api-key": "app-1234567890",
"output-key": "summary"
"output-key": "summary",
"timeout": 120
}
}
}