Compare commits

...

3 Commits

Author SHA1 Message Date
Junyan Qin
b7c4c21796 feat: add message_chain field to *NormalMessageReceived events 2025-11-22 14:59:12 +08:00
Copilot
66602da9cb feat: add model_config parameter support for Dify assistant type apps (#1796)
* Initial plan

* feat: add model_config parameter support for Dify assistant type

- Add model_config parameter to AsyncDifyServiceClient.chat_messages method
- Add _get_model_config helper method to DifyServiceAPIRunner
- Pass model_config from pipeline configuration to all chat_messages calls
- Add model-config configuration field to dify-service-api schema in ai.yaml
- Support optional model configuration for assistant type apps in open-source Dify

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* refactor: improve model_config implementation based on code review

- Simplify _get_model_config method logic
- Add more descriptive comment about model_config usage
- Clarify when model_config is used (assistant type apps)

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* feat: only modify client.py

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
2025-11-22 14:38:26 +08:00
wrj97
31b483509c fix: fix n8n streaming support issue (#1787)
* fix: fix n8n streaming support issue

Add streaming support detection and proper message type handling for
n8n service API runner. Previously, when streaming was enabled, n8n
integration would fail due to incorrect message type usage.

1. Added streaming capability detection by checking adapter's
is_stream_output_supported method
2. Implemented conditional message generation using MessageChunk for
streaming mode and Message for non-streaming mode
3. Added proper error handling for adapters that don't support streaming
detection

* fix: add n8n webhook streaming model ,Optimized the streaming output when calling n8n.

---------

Co-authored-by: Dong_master <2213070223@qq.com>
2025-11-22 14:17:46 +08:00
3 changed files with 95 additions and 30 deletions

View File

@@ -32,6 +32,7 @@ class AsyncDifyServiceClient:
conversation_id: str = '',
files: list[dict[str, typing.Any]] = [],
timeout: float = 30.0,
model_config: dict[str, typing.Any] | None = None,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""发送消息"""
if response_mode != 'streaming':
@@ -42,6 +43,16 @@ class AsyncDifyServiceClient:
trust_env=True,
timeout=timeout,
) as client:
payload = {
'inputs': inputs,
'query': query,
'user': user,
'response_mode': response_mode,
'conversation_id': conversation_id,
'files': files,
'model_config': model_config or {},
}
async with client.stream(
'POST',
'/chat-messages',
@@ -49,14 +60,7 @@ class AsyncDifyServiceClient:
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json',
},
json={
'inputs': inputs,
'query': query,
'user': user,
'response_mode': response_mode,
'conversation_id': conversation_id,
'files': files,
},
json=payload,
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:

View File

@@ -40,6 +40,7 @@ class ChatMessageHandler(handler.MessageHandler):
launcher_id=query.launcher_id,
sender_id=query.sender_id,
text_message=str(query.message_chain),
message_chain=query.message_chain,
query=query,
)

View File

@@ -68,6 +68,33 @@ class N8nServiceAPIRunner(runner.RequestRunner):
return plain_text
async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[
provider_message.Message, None]:
"""处理流式响应"""
full_content = ""
message_idx = 0
is_final = False
async for chunk in response.content.iter_chunked(1024):
if not chunk:
continue
try:
data = json.loads(chunk)
if data.get('type') == 'item' and 'content' in data:
message_idx += 1
content = data['content']
full_content += content
elif data.get('type') == 'end':
is_final = True
if is_final or message_idx % 8 == 0:
yield provider_message.MessageChunk(
role='assistant',
content=full_content,
is_final=is_final,
)
except json.JSONDecodeError:
self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}")
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用n8n webhook"""
# 生成会话ID如果不存在
@@ -80,6 +107,7 @@ class N8nServiceAPIRunner(runner.RequestRunner):
# 准备请求数据
payload = {
# 基本消息内容
'chatInput' :plain_text, # 考虑到之前用户直接用的message model这里添加新键
'message': plain_text,
'user_message_text': plain_text,
'conversation_id': query.session.using_conversation.uuid,
@@ -91,6 +119,11 @@ class N8nServiceAPIRunner(runner.RequestRunner):
# 添加所有变量到payload
payload.update(query.variables)
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
try:
# 准备请求头和认证信息
headers = {}
@@ -126,30 +159,57 @@ class N8nServiceAPIRunner(runner.RequestRunner):
# 调用webhook
async with aiohttp.ClientSession() as session:
async with session.post(
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
if is_stream:
# 流式请求
async with session.post(
self.webhook_url,
json=payload,
headers=headers,
auth=auth,
timeout=self.timeout
) as response:
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
# 解析响应
response_data = await response.json()
self.ap.logger.debug(f'n8n webhook response: {response_data}')
# 从响应中提取输出
if self.output_key in response_data:
output_content = response_data[self.output_key]
# 处理流式响应
async for chunk in self._process_stream_response(response):
yield chunk
else:
# 如果没有指定的输出键,则使用整个响应
output_content = json.dumps(response_data, ensure_ascii=False)
async with session.post(
self.webhook_url,
json=payload,
headers=headers,
auth=auth,
timeout=self.timeout
) as response:
try:
async for chunk in self._process_stream_response(response):
output_content = chunk.content if chunk.is_final else ''
except:
# 非流式请求(保持原有逻辑)
if response.status != 200:
error_text = await response.text()
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
# 返回消息
yield provider_message.Message(
role='assistant',
content=output_content,
)
# 解析响应
response_data = await response.json()
self.ap.logger.debug(f'n8n webhook response: {response_data}')
# 从响应中提取输出
if self.output_key in response_data:
output_content = response_data[self.output_key]
else:
# 如果没有指定的输出键,则使用整个响应
output_content = json.dumps(response_data, ensure_ascii=False)
# 返回消息
yield provider_message.Message(
role='assistant',
content=output_content,
)
except Exception as e:
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
@@ -157,4 +217,4 @@ class N8nServiceAPIRunner(runner.RequestRunner):
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""
async for msg in self._call_webhook(query):
yield msg
yield msg