feat: supported langflow api provider (#1646)

* add langflow api provider

* chore: migration

* feat: okay for non-stream req

* fix: langflow sse data extracting

* doc: add comment on langflow api

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
ashen
2025-09-11 21:37:45 +08:00
committed by GitHub
parent 1f9f330cef
commit 345c8b113f
6 changed files with 305 additions and 4 deletions

View File

@@ -0,0 +1,45 @@
from .. import migration
import sqlalchemy
from ...entity.persistence import pipeline as persistence_pipeline
@migration.migration_class(6)
class DBMigrateLangflowApiConfig(migration.DBMigration):
"""Langflow API config"""
async def upgrade(self):
"""Upgrade"""
# read all pipelines
pipelines = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_pipeline.LegacyPipeline))
for pipeline in pipelines:
serialized_pipeline = self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline)
config = serialized_pipeline['config']
if 'langflow-api' not in config['ai']:
config['ai']['langflow-api'] = {
'base-url': 'http://localhost:7860',
'api-key': 'your-api-key',
'flow-id': 'your-flow-id',
'input-type': 'chat',
'output-type': 'chat',
'tweaks': '{}',
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)
.where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid'])
.values(
{
'config': config,
'for_version': self.ap.ver_mgr.get_current_version(),
}
)
)
async def downgrade(self):
"""Downgrade"""
pass

View File

@@ -35,6 +35,6 @@ class RequestRunner(abc.ABC):
self.pipeline_config = pipeline_config
@abc.abstractmethod
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]:
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]:
"""运行请求"""
pass
pass

View File

@@ -0,0 +1,180 @@
from __future__ import annotations
import typing
import json
import httpx
import uuid
import traceback
from .. import runner
from ...core import app, entities as core_entities
from .. import entities as llm_entities
@runner.runner_class('langflow-api')
class LangflowAPIRunner(runner.RequestRunner):
"""Langflow API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
async def _build_request_payload(self, query: core_entities.Query) -> dict:
"""构建请求负载
Args:
query: 用户查询对象
Returns:
dict: 请求负载
"""
# 获取用户消息文本
user_message_text = ''
if isinstance(query.user_message.content, str):
user_message_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for item in query.user_message.content:
if item.type == 'text':
user_message_text += item.text
# 从配置中获取 input_type 和 output_type如果未配置则使用默认值
input_type = self.pipeline_config['ai']['langflow-api'].get('input_type', 'chat')
output_type = self.pipeline_config['ai']['langflow-api'].get('output_type', 'chat')
# 构建基本负载
payload = {
'output_type': output_type,
'input_type': input_type,
'input_value': user_message_text,
'session_id': str(uuid.uuid4()),
}
# 如果配置中有tweaks则添加到负载中
tweaks = json.loads(self.pipeline_config['ai']['langflow-api'].get('tweaks'))
if tweaks:
payload['tweaks'] = tweaks
return payload
async def run(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]:
"""运行请求
Args:
query: 用户查询对象
Yields:
Message: 回复消息
"""
# 检查是否支持流式输出
is_stream = False
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# 从配置中获取API参数
base_url = self.pipeline_config['ai']['langflow-api']['base-url']
api_key = self.pipeline_config['ai']['langflow-api']['api-key']
flow_id = self.pipeline_config['ai']['langflow-api']['flow-id']
# 构建API URL
url = f'{base_url.rstrip("/")}/api/v1/run/{flow_id}'
# 构建请求负载
payload = await self._build_request_payload(query)
# 设置请求头
headers = {'Content-Type': 'application/json', 'x-api-key': api_key}
# 发送请求
async with httpx.AsyncClient() as client:
if is_stream:
# 流式请求
async with client.stream('POST', url, json=payload, headers=headers, timeout=120.0) as response:
print(response)
response.raise_for_status()
accumulated_content = ''
message_count = 0
async for line in response.aiter_lines():
data_str = line
if data_str.startswith('data: '):
data_str = data_str[6:] # 移除 "data: " 前缀
try:
data = json.loads(data_str)
# 提取消息内容
message_text = ''
if 'outputs' in data and len(data['outputs']) > 0:
output = data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in data:
messages = data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
if message_text:
# 更新累积内容
accumulated_content = message_text
message_count += 1
# 每8条消息或有新内容时生成一个chunk
if message_count % 8 == 0 or len(message_text) > 0:
yield llm_entities.MessageChunk(
role='assistant', content=accumulated_content, is_final=False
)
except json.JSONDecodeError:
# 如果不是JSON跳过这一行
traceback.print_exc()
continue
# 发送最终消息
yield llm_entities.MessageChunk(role='assistant', content=accumulated_content, is_final=True)
else:
# 非流式请求
response = await client.post(url, json=payload, headers=headers, timeout=120.0)
response.raise_for_status()
# 解析响应
response_data = response.json()
# 提取消息内容
# 根据Langflow API文档响应结构可能在outputs[0].outputs[0].outputs.message.message中
message_text = ''
if 'outputs' in response_data and len(response_data['outputs']) > 0:
output = response_data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in response_data:
messages = response_data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
# 如果仍然没有找到消息,返回完整响应的字符串表示
if not message_text:
message_text = json.dumps(response_data, ensure_ascii=False, indent=2)
# 生成回复消息
if is_stream:
yield llm_entities.MessageChunk(role='assistant', content=message_text, is_final=True)
else:
reply_message = llm_entities.Message(role='assistant', content=message_text)
yield reply_message

View File

@@ -1,6 +1,6 @@
semantic_version = 'v4.2.2'
required_database_version = 5
required_database_version = 6
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
debug_mode = False

View File

@@ -70,6 +70,14 @@
"header-value": "",
"timeout": 120,
"output-key": "response"
},
"langflow-api": {
"base-url": "http://localhost:7860",
"api-key": "your-api-key",
"flow-id": "your-flow-id",
"input-type": "chat",
"output-type": "chat",
"tweaks": "{}"
}
},
"output": {

View File

@@ -35,6 +35,10 @@ stages:
label:
en_US: n8n Workflow API
zh_Hans: n8n 工作流 API
- name: langflow-api
label:
en_US: Langflow API
zh_Hans: Langflow API
- name: local-agent
label:
en_US: Local Agent
@@ -288,4 +292,68 @@ stages:
type: string
required: false
default: 'response'
- name: langflow-api
label:
en_US: Langflow API
zh_Hans: Langflow API
description:
en_US: Configure the Langflow API of the pipeline, call the Langflow flow through the `Simplified Run Flow` interface
zh_Hans: 配置 Langflow API通过 `Simplified Run Flow` 接口调用 Langflow 的流程
config:
- name: base-url
label:
en_US: Base URL
zh_Hans: 基础 URL
description:
en_US: The base URL of the Langflow server
zh_Hans: Langflow 服务器的基础 URL
type: string
required: true
- name: api-key
label:
en_US: API Key
zh_Hans: API 密钥
description:
en_US: The API key for the Langflow server
zh_Hans: Langflow 服务器的 API 密钥
type: string
required: true
- name: flow-id
label:
en_US: Flow ID
zh_Hans: 流程 ID
description:
en_US: The ID of the flow to run
zh_Hans: 要运行的流程 ID
type: string
required: true
- name: input-type
label:
en_US: Input Type
zh_Hans: 输入类型
description:
en_US: The input type for the flow
zh_Hans: 流程的输入类型
type: string
required: false
default: 'chat'
- name: output-type
label:
en_US: Output Type
zh_Hans: 输出类型
description:
en_US: The output type for the flow
zh_Hans: 流程的输出类型
type: string
required: false
default: 'chat'
- name: tweaks
label:
en_US: Tweaks
zh_Hans: 调整参数
description:
en_US: Optional tweaks to apply to the flow
zh_Hans: 可选的流程调整参数
type: json
required: false
default: '{}'