Files
WeKnora/mcp-server/weknora_mcp_server.py
2025-09-08 19:43:45 +08:00

654 lines
25 KiB
Python

#!/usr/bin/env python3
"""
WeKnora MCP Server
A Model Context Protocol server that provides access to the WeKnora knowledge management API.
"""
import os
import json
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
import requests
from requests.exceptions import RequestException
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
WEKNORA_BASE_URL = os.getenv("WEKNORA_BASE_URL", "http://localhost:8080/api/v1")
WEKNORA_API_KEY = os.getenv("WEKNORA_API_KEY", "")
class WeKnoraClient:
"""Client for interacting with WeKnora API"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Make a request to the WeKnora API"""
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
except RequestException as e:
logger.error(f"API request failed: {e}")
raise
# Tenant Management
def create_tenant(self, name: str, description: str, business: str, retriever_engines: Dict) -> Dict:
"""Create a new tenant"""
data = {
"name": name,
"description": description,
"business": business,
"retriever_engines": retriever_engines
}
return self._request("POST", "/tenants", json=data)
def get_tenant(self, tenant_id: str) -> Dict:
"""Get tenant information"""
return self._request("GET", f"/tenants/{tenant_id}")
def list_tenants(self) -> Dict:
"""List all tenants"""
return self._request("GET", "/tenants")
# Knowledge Base Management
def create_knowledge_base(self, name: str, description: str, config: Dict) -> Dict:
"""Create a new knowledge base"""
data = {
"name": name,
"description": description,
**config
}
return self._request("POST", "/knowledge-bases", json=data)
def list_knowledge_bases(self) -> Dict:
"""List all knowledge bases"""
return self._request("GET", "/knowledge-bases")
def get_knowledge_base(self, kb_id: str) -> Dict:
"""Get knowledge base details"""
return self._request("GET", f"/knowledge-bases/{kb_id}")
def update_knowledge_base(self, kb_id: str, updates: Dict) -> Dict:
"""Update knowledge base"""
return self._request("PUT", f"/knowledge-bases/{kb_id}", json=updates)
def delete_knowledge_base(self, kb_id: str) -> Dict:
"""Delete knowledge base"""
return self._request("DELETE", f"/knowledge-bases/{kb_id}")
def hybrid_search(self, kb_id: str, query: str, config: Dict) -> Dict:
"""Perform hybrid search in knowledge base"""
data = {
"query_text": query,
**config
}
return self._request("GET", f"/knowledge-bases/{kb_id}/hybrid-search", json=data)
# Knowledge Management
def create_knowledge_from_file(self, kb_id: str, file_path: str, enable_multimodel: bool = True) -> Dict:
"""Create knowledge from file"""
with open(file_path, 'rb') as f:
files = {'file': f}
data = {'enable_multimodel': str(enable_multimodel).lower()}
# Temporarily remove Content-Type for multipart request
headers = self.session.headers.copy()
del headers['Content-Type']
response = requests.post(
f"{self.base_url}/knowledge-bases/{kb_id}/knowledge/file",
headers=headers,
files=files,
data=data
)
response.raise_for_status()
return response.json()
def create_knowledge_from_url(self, kb_id: str, url: str, enable_multimodel: bool = True) -> Dict:
"""Create knowledge from URL"""
data = {
"url": url,
"enable_multimodel": enable_multimodel
}
return self._request("POST", f"/knowledge-bases/{kb_id}/knowledge/url", json=data)
def list_knowledge(self, kb_id: str, page: int = 1, page_size: int = 20) -> Dict:
"""List knowledge in a knowledge base"""
params = {"page": page, "page_size": page_size}
return self._request("GET", f"/knowledge-bases/{kb_id}/knowledge", params=params)
def get_knowledge(self, knowledge_id: str) -> Dict:
"""Get knowledge details"""
return self._request("GET", f"/knowledge/{knowledge_id}")
def delete_knowledge(self, knowledge_id: str) -> Dict:
"""Delete knowledge"""
return self._request("DELETE", f"/knowledge/{knowledge_id}")
# Model Management
def create_model(self, name: str, model_type: str, source: str, description: str, parameters: Dict, is_default: bool = False) -> Dict:
"""Create a new model"""
data = {
"name": name,
"type": model_type,
"source": source,
"description": description,
"parameters": parameters,
"is_default": is_default
}
return self._request("POST", "/models", json=data)
def list_models(self) -> Dict:
"""List all models"""
return self._request("GET", "/models")
def get_model(self, model_id: str) -> Dict:
"""Get model details"""
return self._request("GET", f"/models/{model_id}")
# Session Management
def create_session(self, kb_id: str, strategy: Dict) -> Dict:
"""Create a new chat session"""
data = {
"knowledge_base_id": kb_id,
"session_strategy": strategy
}
return self._request("POST", "/sessions", json=data)
def get_session(self, session_id: str) -> Dict:
"""Get session details"""
return self._request("GET", f"/sessions/{session_id}")
def list_sessions(self, page: int = 1, page_size: int = 20) -> Dict:
"""List sessions"""
params = {"page": page, "page_size": page_size}
return self._request("GET", "/sessions", params=params)
def delete_session(self, session_id: str) -> Dict:
"""Delete session"""
return self._request("DELETE", f"/sessions/{session_id}")
# Chat Functionality
def chat(self, session_id: str, query: str) -> Dict:
"""Send a chat message"""
data = {"query": query}
# Note: This returns SSE stream, simplified here
return self._request("POST", f"/knowledge-chat/{session_id}", json=data)
# Chunk Management
def list_chunks(self, knowledge_id: str, page: int = 1, page_size: int = 20) -> Dict:
"""List chunks of knowledge"""
params = {"page": page, "page_size": page_size}
return self._request("GET", f"/chunks/{knowledge_id}", params=params)
def delete_chunk(self, knowledge_id: str, chunk_id: str) -> Dict:
"""Delete a chunk"""
return self._request("DELETE", f"/chunks/{knowledge_id}/{chunk_id}")
# Initialize MCP server
app = Server("weknora-server")
client = WeKnoraClient(WEKNORA_BASE_URL, WEKNORA_API_KEY)
# Tool definitions
@app.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List all available WeKnora tools"""
return [
# Tenant Management
types.Tool(
name="create_tenant",
description="Create a new tenant in WeKnora",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Tenant name"},
"description": {"type": "string", "description": "Tenant description"},
"business": {"type": "string", "description": "Business type"},
"retriever_engines": {
"type": "object",
"description": "Retriever engine configuration",
"properties": {
"engines": {
"type": "array",
"items": {
"type": "object",
"properties": {
"retriever_type": {"type": "string"},
"retriever_engine_type": {"type": "string"}
}
}
}
}
}
},
"required": ["name", "description", "business"]
}
),
types.Tool(
name="list_tenants",
description="List all tenants",
inputSchema={"type": "object", "properties": {}}
),
# Knowledge Base Management
types.Tool(
name="create_knowledge_base",
description="Create a new knowledge base",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Knowledge base name"},
"description": {"type": "string", "description": "Knowledge base description"},
"embedding_model_id": {"type": "string", "description": "Embedding model ID"},
"summary_model_id": {"type": "string", "description": "Summary model ID"}
},
"required": ["name", "description"]
}
),
types.Tool(
name="list_knowledge_bases",
description="List all knowledge bases",
inputSchema={"type": "object", "properties": {}}
),
types.Tool(
name="get_knowledge_base",
description="Get knowledge base details",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"}
},
"required": ["kb_id"]
}
),
types.Tool(
name="delete_knowledge_base",
description="Delete a knowledge base",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"}
},
"required": ["kb_id"]
}
),
types.Tool(
name="hybrid_search",
description="Perform hybrid search in knowledge base",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"},
"query": {"type": "string", "description": "Search query"},
"vector_threshold": {"type": "number", "description": "Vector similarity threshold", "default": 0.5},
"keyword_threshold": {"type": "number", "description": "Keyword match threshold", "default": 0.3},
"match_count": {"type": "integer", "description": "Number of results to return", "default": 5}
},
"required": ["kb_id", "query"]
}
),
# Knowledge Management
types.Tool(
name="create_knowledge_from_url",
description="Create knowledge from URL",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"},
"url": {"type": "string", "description": "URL to create knowledge from"},
"enable_multimodel": {"type": "boolean", "description": "Enable multimodal processing", "default": True}
},
"required": ["kb_id", "url"]
}
),
types.Tool(
name="list_knowledge",
description="List knowledge in a knowledge base",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"},
"page": {"type": "integer", "description": "Page number", "default": 1},
"page_size": {"type": "integer", "description": "Page size", "default": 20}
},
"required": ["kb_id"]
}
),
types.Tool(
name="get_knowledge",
description="Get knowledge details",
inputSchema={
"type": "object",
"properties": {
"knowledge_id": {"type": "string", "description": "Knowledge ID"}
},
"required": ["knowledge_id"]
}
),
types.Tool(
name="delete_knowledge",
description="Delete knowledge",
inputSchema={
"type": "object",
"properties": {
"knowledge_id": {"type": "string", "description": "Knowledge ID"}
},
"required": ["knowledge_id"]
}
),
# Model Management
types.Tool(
name="create_model",
description="Create a new model",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Model name"},
"type": {"type": "string", "description": "Model type (KnowledgeQA, Embedding, Rerank)"},
"source": {"type": "string", "description": "Model source", "default": "local"},
"description": {"type": "string", "description": "Model description"},
"base_url": {"type": "string", "description": "Model API base URL", "default": ""},
"api_key": {"type": "string", "description": "Model API key", "default": ""},
"is_default": {"type": "boolean", "description": "Set as default model", "default": False}
},
"required": ["name", "type", "description"]
}
),
types.Tool(
name="list_models",
description="List all models",
inputSchema={"type": "object", "properties": {}}
),
types.Tool(
name="get_model",
description="Get model details",
inputSchema={
"type": "object",
"properties": {
"model_id": {"type": "string", "description": "Model ID"}
},
"required": ["model_id"]
}
),
# Session Management
types.Tool(
name="create_session",
description="Create a new chat session",
inputSchema={
"type": "object",
"properties": {
"kb_id": {"type": "string", "description": "Knowledge base ID"},
"max_rounds": {"type": "integer", "description": "Maximum conversation rounds", "default": 5},
"enable_rewrite": {"type": "boolean", "description": "Enable query rewriting", "default": True},
"fallback_response": {"type": "string", "description": "Fallback response", "default": "Sorry, I cannot answer this question."},
"summary_model_id": {"type": "string", "description": "Summary model ID"}
},
"required": ["kb_id"]
}
),
types.Tool(
name="get_session",
description="Get session details",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"}
},
"required": ["session_id"]
}
),
types.Tool(
name="list_sessions",
description="List chat sessions",
inputSchema={
"type": "object",
"properties": {
"page": {"type": "integer", "description": "Page number", "default": 1},
"page_size": {"type": "integer", "description": "Page size", "default": 20}
}
}
),
types.Tool(
name="delete_session",
description="Delete a session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"}
},
"required": ["session_id"]
}
),
# Chat Functionality
types.Tool(
name="chat",
description="Send a chat message to a session",
inputSchema={
"type": "object",
"properties": {
"session_id": {"type": "string", "description": "Session ID"},
"query": {"type": "string", "description": "User query"}
},
"required": ["session_id", "query"]
}
),
# Chunk Management
types.Tool(
name="list_chunks",
description="List chunks of knowledge",
inputSchema={
"type": "object",
"properties": {
"knowledge_id": {"type": "string", "description": "Knowledge ID"},
"page": {"type": "integer", "description": "Page number", "default": 1},
"page_size": {"type": "integer", "description": "Page size", "default": 20}
},
"required": ["knowledge_id"]
}
),
types.Tool(
name="delete_chunk",
description="Delete a chunk",
inputSchema={
"type": "object",
"properties": {
"knowledge_id": {"type": "string", "description": "Knowledge ID"},
"chunk_id": {"type": "string", "description": "Chunk ID"}
},
"required": ["knowledge_id", "chunk_id"]
}
)
]
@app.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""Handle tool execution"""
try:
args = arguments or {}
# Tenant Management
if name == "create_tenant":
result = client.create_tenant(
args["name"],
args["description"],
args["business"],
args.get("retriever_engines", {
"engines": [
{"retriever_type": "keywords", "retriever_engine_type": "postgres"},
{"retriever_type": "vector", "retriever_engine_type": "postgres"}
]
})
)
elif name == "list_tenants":
result = client.list_tenants()
# Knowledge Base Management
elif name == "create_knowledge_base":
config = {
"chunking_config": args.get("chunking_config", {
"chunk_size": 1000,
"chunk_overlap": 200,
"separators": ["."],
"enable_multimodal": True
}),
"embedding_model_id": args.get("embedding_model_id", ""),
"summary_model_id": args.get("summary_model_id", "")
}
result = client.create_knowledge_base(
args["name"],
args["description"],
config
)
elif name == "list_knowledge_bases":
result = client.list_knowledge_bases()
elif name == "get_knowledge_base":
result = client.get_knowledge_base(args["kb_id"])
elif name == "delete_knowledge_base":
result = client.delete_knowledge_base(args["kb_id"])
elif name == "hybrid_search":
config = {
"vector_threshold": args.get("vector_threshold", 0.5),
"keyword_threshold": args.get("keyword_threshold", 0.3),
"match_count": args.get("match_count", 5)
}
result = client.hybrid_search(args["kb_id"], args["query"], config)
# Knowledge Management
elif name == "create_knowledge_from_url":
result = client.create_knowledge_from_url(
args["kb_id"],
args["url"],
args.get("enable_multimodel", True)
)
elif name == "list_knowledge":
result = client.list_knowledge(
args["kb_id"],
args.get("page", 1),
args.get("page_size", 20)
)
elif name == "get_knowledge":
result = client.get_knowledge(args["knowledge_id"])
elif name == "delete_knowledge":
result = client.delete_knowledge(args["knowledge_id"])
# Model Management
elif name == "create_model":
parameters = {
"base_url": args.get("base_url", ""),
"api_key": args.get("api_key", "")
}
result = client.create_model(
args["name"],
args["type"],
args.get("source", "local"),
args["description"],
parameters,
args.get("is_default", False)
)
elif name == "list_models":
result = client.list_models()
elif name == "get_model":
result = client.get_model(args["model_id"])
# Session Management
elif name == "create_session":
strategy = {
"max_rounds": args.get("max_rounds", 5),
"enable_rewrite": args.get("enable_rewrite", True),
"fallback_strategy": "FIXED_RESPONSE",
"fallback_response": args.get("fallback_response", "Sorry, I cannot answer this question."),
"embedding_top_k": 10,
"keyword_threshold": 0.5,
"vector_threshold": 0.7,
"summary_model_id": args.get("summary_model_id", "")
}
result = client.create_session(args["kb_id"], strategy)
elif name == "get_session":
result = client.get_session(args["session_id"])
elif name == "list_sessions":
result = client.list_sessions(
args.get("page", 1),
args.get("page_size", 20)
)
elif name == "delete_session":
result = client.delete_session(args["session_id"])
# Chat Functionality
elif name == "chat":
result = client.chat(args["session_id"], args["query"])
# Chunk Management
elif name == "list_chunks":
result = client.list_chunks(
args["knowledge_id"],
args.get("page", 1),
args.get("page_size", 20)
)
elif name == "delete_chunk":
result = client.delete_chunk(args["knowledge_id"], args["chunk_id"])
else:
return [types.TextContent(
type="text",
text=f"Unknown tool: {name}"
)]
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
except Exception as e:
logger.error(f"Tool execution failed: {e}")
return [types.TextContent(
type="text",
text=f"Error executing {name}: {str(e)}"
)]
async def run():
"""Run the MCP server"""
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="weknora-server",
server_version="1.0.0",
capabilities=app.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
def main():
"""主函数入口点,用于 console_scripts"""
import asyncio
asyncio.run(run())
if __name__ == "__main__":
main()