diff --git a/pkg/persistence/migrations/dbm006_langflow_api_config.py b/pkg/persistence/migrations/dbm006_langflow_api_config.py new file mode 100644 index 00000000..07c3dbbf --- /dev/null +++ b/pkg/persistence/migrations/dbm006_langflow_api_config.py @@ -0,0 +1,45 @@ +from .. import migration + +import sqlalchemy + +from ...entity.persistence import pipeline as persistence_pipeline + + +@migration.migration_class(6) +class DBMigrateLangflowApiConfig(migration.DBMigration): + """Langflow API config""" + + async def upgrade(self): + """Upgrade""" + # read all pipelines + pipelines = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_pipeline.LegacyPipeline)) + + for pipeline in pipelines: + serialized_pipeline = self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline) + + config = serialized_pipeline['config'] + + if 'langflow-api' not in config['ai']: + config['ai']['langflow-api'] = { + 'base-url': 'http://localhost:7860', + 'api-key': 'your-api-key', + 'flow-id': 'your-flow-id', + 'input-type': 'chat', + 'output-type': 'chat', + 'tweaks': '{}', + } + + await self.ap.persistence_mgr.execute_async( + sqlalchemy.update(persistence_pipeline.LegacyPipeline) + .where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid']) + .values( + { + 'config': config, + 'for_version': self.ap.ver_mgr.get_current_version(), + } + ) + ) + + async def downgrade(self): + """Downgrade""" + pass diff --git a/pkg/provider/runner.py b/pkg/provider/runner.py index a74a2dc5..4af191ac 100644 --- a/pkg/provider/runner.py +++ b/pkg/provider/runner.py @@ -35,6 +35,6 @@ class RequestRunner(abc.ABC): self.pipeline_config = pipeline_config @abc.abstractmethod - async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message, None]: + async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]: """运行请求""" - pass + pass \ No newline at end of file diff --git a/pkg/provider/runners/langflowapi.py b/pkg/provider/runners/langflowapi.py new file mode 100644 index 00000000..467072b5 --- /dev/null +++ b/pkg/provider/runners/langflowapi.py @@ -0,0 +1,180 @@ +from __future__ import annotations + +import typing +import json +import httpx +import uuid +import traceback + +from .. import runner +from ...core import app, entities as core_entities +from .. import entities as llm_entities + + +@runner.runner_class('langflow-api') +class LangflowAPIRunner(runner.RequestRunner): + """Langflow API 对话请求器""" + + def __init__(self, ap: app.Application, pipeline_config: dict): + self.ap = ap + self.pipeline_config = pipeline_config + + async def _build_request_payload(self, query: core_entities.Query) -> dict: + """构建请求负载 + + Args: + query: 用户查询对象 + + Returns: + dict: 请求负载 + """ + # 获取用户消息文本 + user_message_text = '' + if isinstance(query.user_message.content, str): + user_message_text = query.user_message.content + elif isinstance(query.user_message.content, list): + for item in query.user_message.content: + if item.type == 'text': + user_message_text += item.text + + # 从配置中获取 input_type 和 output_type,如果未配置则使用默认值 + input_type = self.pipeline_config['ai']['langflow-api'].get('input_type', 'chat') + output_type = self.pipeline_config['ai']['langflow-api'].get('output_type', 'chat') + + # 构建基本负载 + payload = { + 'output_type': output_type, + 'input_type': input_type, + 'input_value': user_message_text, + 'session_id': str(uuid.uuid4()), + } + + # 如果配置中有tweaks,则添加到负载中 + tweaks = json.loads(self.pipeline_config['ai']['langflow-api'].get('tweaks')) + if tweaks: + payload['tweaks'] = tweaks + + return payload + + async def run( + self, query: core_entities.Query + ) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]: + """运行请求 + + Args: + query: 用户查询对象 + + Yields: + Message: 回复消息 + """ + # 检查是否支持流式输出 + is_stream = False + try: + is_stream = await query.adapter.is_stream_output_supported() + except AttributeError: + is_stream = False + + # 从配置中获取API参数 + base_url = self.pipeline_config['ai']['langflow-api']['base-url'] + api_key = self.pipeline_config['ai']['langflow-api']['api-key'] + flow_id = self.pipeline_config['ai']['langflow-api']['flow-id'] + + # 构建API URL + url = f'{base_url.rstrip("/")}/api/v1/run/{flow_id}' + + # 构建请求负载 + payload = await self._build_request_payload(query) + + # 设置请求头 + headers = {'Content-Type': 'application/json', 'x-api-key': api_key} + + # 发送请求 + async with httpx.AsyncClient() as client: + if is_stream: + # 流式请求 + async with client.stream('POST', url, json=payload, headers=headers, timeout=120.0) as response: + print(response) + response.raise_for_status() + + accumulated_content = '' + message_count = 0 + + async for line in response.aiter_lines(): + data_str = line + + if data_str.startswith('data: '): + data_str = data_str[6:] # 移除 "data: " 前缀 + + try: + data = json.loads(data_str) + + # 提取消息内容 + message_text = '' + if 'outputs' in data and len(data['outputs']) > 0: + output = data['outputs'][0] + if 'outputs' in output and len(output['outputs']) > 0: + inner_output = output['outputs'][0] + if 'outputs' in inner_output and 'message' in inner_output['outputs']: + message_data = inner_output['outputs']['message'] + if 'message' in message_data: + message_text = message_data['message'] + + # 如果没有找到消息,尝试其他可能的路径 + if not message_text and 'messages' in data: + messages = data['messages'] + if messages and len(messages) > 0: + message_text = messages[0].get('message', '') + + if message_text: + # 更新累积内容 + accumulated_content = message_text + message_count += 1 + + # 每8条消息或有新内容时生成一个chunk + if message_count % 8 == 0 or len(message_text) > 0: + yield llm_entities.MessageChunk( + role='assistant', content=accumulated_content, is_final=False + ) + except json.JSONDecodeError: + # 如果不是JSON,跳过这一行 + traceback.print_exc() + continue + + # 发送最终消息 + yield llm_entities.MessageChunk(role='assistant', content=accumulated_content, is_final=True) + else: + # 非流式请求 + response = await client.post(url, json=payload, headers=headers, timeout=120.0) + response.raise_for_status() + + # 解析响应 + response_data = response.json() + + # 提取消息内容 + # 根据Langflow API文档,响应结构可能在outputs[0].outputs[0].outputs.message.message中 + message_text = '' + if 'outputs' in response_data and len(response_data['outputs']) > 0: + output = response_data['outputs'][0] + if 'outputs' in output and len(output['outputs']) > 0: + inner_output = output['outputs'][0] + if 'outputs' in inner_output and 'message' in inner_output['outputs']: + message_data = inner_output['outputs']['message'] + if 'message' in message_data: + message_text = message_data['message'] + + # 如果没有找到消息,尝试其他可能的路径 + if not message_text and 'messages' in response_data: + messages = response_data['messages'] + if messages and len(messages) > 0: + message_text = messages[0].get('message', '') + + # 如果仍然没有找到消息,返回完整响应的字符串表示 + if not message_text: + message_text = json.dumps(response_data, ensure_ascii=False, indent=2) + + # 生成回复消息 + if is_stream: + yield llm_entities.MessageChunk(role='assistant', content=message_text, is_final=True) + else: + reply_message = llm_entities.Message(role='assistant', content=message_text) + yield reply_message diff --git a/pkg/utils/constants.py b/pkg/utils/constants.py index 74fe232b..eb9609a2 100644 --- a/pkg/utils/constants.py +++ b/pkg/utils/constants.py @@ -1,6 +1,6 @@ semantic_version = 'v4.2.2' -required_database_version = 5 +required_database_version = 6 """Tag the version of the database schema, used to check if the database needs to be migrated""" debug_mode = False diff --git a/templates/default-pipeline-config.json b/templates/default-pipeline-config.json index 2184f982..f7801a47 100644 --- a/templates/default-pipeline-config.json +++ b/templates/default-pipeline-config.json @@ -70,6 +70,14 @@ "header-value": "", "timeout": 120, "output-key": "response" + }, + "langflow-api": { + "base-url": "http://localhost:7860", + "api-key": "your-api-key", + "flow-id": "your-flow-id", + "input-type": "chat", + "output-type": "chat", + "tweaks": "{}" } }, "output": { diff --git a/templates/metadata/pipeline/ai.yaml b/templates/metadata/pipeline/ai.yaml index 4564f097..b37c753b 100644 --- a/templates/metadata/pipeline/ai.yaml +++ b/templates/metadata/pipeline/ai.yaml @@ -35,6 +35,10 @@ stages: label: en_US: n8n Workflow API zh_Hans: n8n 工作流 API + - name: langflow-api + label: + en_US: Langflow API + zh_Hans: Langflow API - name: local-agent label: en_US: Local Agent @@ -288,4 +292,68 @@ stages: type: string required: false default: 'response' - + - name: langflow-api + label: + en_US: Langflow API + zh_Hans: Langflow API + description: + en_US: Configure the Langflow API of the pipeline, call the Langflow flow through the `Simplified Run Flow` interface + zh_Hans: 配置 Langflow API,通过 `Simplified Run Flow` 接口调用 Langflow 的流程 + config: + - name: base-url + label: + en_US: Base URL + zh_Hans: 基础 URL + description: + en_US: The base URL of the Langflow server + zh_Hans: Langflow 服务器的基础 URL + type: string + required: true + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: The API key for the Langflow server + zh_Hans: Langflow 服务器的 API 密钥 + type: string + required: true + - name: flow-id + label: + en_US: Flow ID + zh_Hans: 流程 ID + description: + en_US: The ID of the flow to run + zh_Hans: 要运行的流程 ID + type: string + required: true + - name: input-type + label: + en_US: Input Type + zh_Hans: 输入类型 + description: + en_US: The input type for the flow + zh_Hans: 流程的输入类型 + type: string + required: false + default: 'chat' + - name: output-type + label: + en_US: Output Type + zh_Hans: 输出类型 + description: + en_US: The output type for the flow + zh_Hans: 流程的输出类型 + type: string + required: false + default: 'chat' + - name: tweaks + label: + en_US: Tweaks + zh_Hans: 调整参数 + description: + en_US: Optional tweaks to apply to the flow + zh_Hans: 可选的流程调整参数 + type: json + required: false + default: '{}' \ No newline at end of file