mirror of
https://github.com/langbot-app/LangBot.git
synced 2025-11-26 03:44:58 +08:00
64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import List
|
|
from pkg.rag.knowledge.services import base_service
|
|
from pkg.core import app
|
|
|
|
|
|
class Chunker(base_service.BaseService):
|
|
"""
|
|
A class for splitting long texts into smaller, overlapping chunks.
|
|
"""
|
|
|
|
def __init__(self, ap: app.Application, chunk_size: int = 500, chunk_overlap: int = 50):
|
|
self.ap = ap
|
|
self.chunk_size = chunk_size
|
|
self.chunk_overlap = chunk_overlap
|
|
if self.chunk_overlap >= self.chunk_size:
|
|
self.ap.logger.warning(
|
|
'Chunk overlap is greater than or equal to chunk size. This may lead to empty or malformed chunks.'
|
|
)
|
|
|
|
def _split_text_sync(self, text: str) -> List[str]:
|
|
"""
|
|
Synchronously splits a long text into chunks with specified overlap.
|
|
This is a CPU-bound operation, intended to be run in a separate thread.
|
|
"""
|
|
if not text:
|
|
return []
|
|
# words = text.split()
|
|
# chunks = []
|
|
# current_chunk = []
|
|
|
|
# for word in words:
|
|
# current_chunk.append(word)
|
|
# if len(current_chunk) > self.chunk_size:
|
|
# chunks.append(" ".join(current_chunk[:self.chunk_size]))
|
|
# current_chunk = current_chunk[self.chunk_size - self.chunk_overlap:]
|
|
|
|
# if current_chunk:
|
|
# chunks.append(" ".join(current_chunk))
|
|
|
|
# A more robust chunking strategy (e.g., using recursive character text splitter)
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|
chunk_size=self.chunk_size,
|
|
chunk_overlap=self.chunk_overlap,
|
|
length_function=len,
|
|
is_separator_regex=False,
|
|
)
|
|
return text_splitter.split_text(text)
|
|
|
|
async def chunk(self, text: str) -> List[str]:
|
|
"""
|
|
Asynchronously chunks a given text into smaller pieces.
|
|
"""
|
|
self.ap.logger.info(f'Chunking text (length: {len(text)})...')
|
|
# Run the synchronous splitting logic in a separate thread
|
|
chunks = await self._run_sync(self._split_text_sync, text)
|
|
self.ap.logger.info(f'Text chunked into {len(chunks)} pieces.')
|
|
self.ap.logger.debug(f'Chunks: {json.dumps(chunks, indent=4, ensure_ascii=False)}')
|
|
return chunks
|