fix: make file docker build not work

This commit is contained in:
wizardchen
2025-09-10 15:06:24 +08:00
committed by lyingbug
parent 0e1d7edca3
commit 19d2493afc
2 changed files with 261 additions and 178 deletions

View File

@@ -1,4 +1,4 @@
.PHONY: help build run test clean docker-build docker-run migrate-up migrate-down docker-restart docker-stop start-all stop-all start-ollama stop-ollama build-images build-images-app build-images-docreader build-images-frontend clean-images
.PHONY: help build run test clean docker-build docker-build-docreader docker-build-frontend docker-build-all docker-run migrate-up migrate-down docker-restart docker-stop start-all stop-all start-ollama stop-ollama build-images build-images-app build-images-docreader build-images-frontend clean-images check-env list-containers pull-images
# Show help
help:
@@ -11,10 +11,13 @@ help:
@echo " clean 清理构建文件"
@echo ""
@echo "Docker 命令:"
@echo " docker-build 构建 Docker 镜像"
@echo " docker-run 运行 Docker 容器"
@echo " docker-stop 停止 Docker 容器"
@echo " docker-restart 重启 Docker 容器"
@echo " docker-build-app 构建应用 Docker 镜像 (wechatopenai/weknora-app)"
@echo " docker-build-docreader 构建文档读取器镜像 (wechatopenai/weknora-docreader)"
@echo " docker-build-frontend 构建前端镜像 (wechatopenai/weknora-ui)"
@echo " docker-build-all 构建所有 Docker 镜像"
@echo " docker-run 运行 Docker 容器"
@echo " docker-stop 停止 Docker 容器"
@echo " docker-restart 重启 Docker 容器"
@echo ""
@echo "服务管理:"
@echo " start-all 启动所有服务"
@@ -37,13 +40,18 @@ help:
@echo " lint 代码检查"
@echo " deps 安装依赖"
@echo " docs 生成 API 文档"
@echo ""
@echo "环境检查:"
@echo " check-env 检查环境配置"
@echo " list-containers 列出运行中的容器"
@echo " pull-images 拉取最新镜像"
# Go related variables
BINARY_NAME=WeKnora
MAIN_PATH=./cmd/server
# Docker related variables
DOCKER_IMAGE=WeKnora
DOCKER_IMAGE=wechatopenai/weknora-app
DOCKER_TAG=latest
# Build the application
@@ -64,8 +72,19 @@ clean:
rm -f $(BINARY_NAME)
# Build Docker image
docker-build:
docker build -t $(DOCKER_IMAGE):$(DOCKER_TAG) .
docker-build-app:
docker build -f docker/Dockerfile.app -t $(DOCKER_IMAGE):$(DOCKER_TAG) .
# Build docreader Docker image
docker-build-docreader:
docker build -f docker/Dockerfile.docreader -t wechatopenai/weknora-docreader:latest .
# Build frontend Docker image
docker-build-frontend:
docker build -f frontend/Dockerfile -t wechatopenai/weknora-ui:latest frontend/
# Build all Docker images
docker-build-all: docker-build-app docker-build-docreader docker-build-frontend
# Run Docker container (传统方式)
docker-run:
@@ -107,10 +126,10 @@ build-images-frontend:
clean-images:
./scripts/build_images.sh --clean
# Restart Docker container (stop, rebuild, start)
# Restart Docker container (stop, start)
docker-restart:
docker-compose stop -t 60
docker-compose up --build
docker-compose up
# Database migrations
migrate-up:
@@ -151,4 +170,16 @@ clean-db:
docker volume rm weknora_redis_data; \
fi
# Environment check
check-env:
./scripts/start_all.sh --check
# List containers
list-containers:
./scripts/start_all.sh --list
# Pull latest images
pull-images:
./scripts/start_all.sh --pull

View File

@@ -33,7 +33,9 @@ except ImportError:
except ImportError:
# If both imports fail, set to None
Caption = None
logging.warning("Failed to import Caption, image captioning will be unavailable")
logging.warning(
"Failed to import Caption, image captioning will be unavailable"
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -78,7 +80,9 @@ class BaseParser(ABC):
"""
if cls._ocr_engine is None and not cls._ocr_engine_failed:
try:
cls._ocr_engine = OCREngine.get_instance(backend_type=backend_type, **kwargs)
cls._ocr_engine = OCREngine.get_instance(
backend_type=backend_type, **kwargs
)
if cls._ocr_engine is None:
cls._ocr_engine_failed = True
logger.error(f"Failed to initialize OCR engine ({backend_type})")
@@ -89,7 +93,6 @@ class BaseParser(ABC):
logger.error(f"Failed to initialize OCR engine: {str(e)}")
return None
return cls._ocr_engine
def __init__(
self,
@@ -135,7 +138,7 @@ class BaseParser(ABC):
self.max_concurrent_tasks = max_concurrent_tasks
self.max_chunks = max_chunks
self.chunking_config = chunking_config
logger.info(
f"Initializing {self.__class__.__name__} for file: {file_name}, type: {self.file_type}"
)
@@ -174,10 +177,14 @@ class BaseParser(ABC):
resized_image = self._resize_image_if_needed(image)
# Get OCR engine
ocr_engine = self.get_ocr_engine(backend_type=self.ocr_backend, **self.ocr_config)
ocr_engine = self.get_ocr_engine(
backend_type=self.ocr_backend, **self.ocr_config
)
if ocr_engine is None:
logger.error(f"OCR engine ({self.ocr_backend}) initialization failed or unavailable, "
"skipping OCR recognition")
logger.error(
f"OCR engine ({self.ocr_backend}) initialization failed or unavailable, "
"skipping OCR recognition"
)
return ""
# Execute OCR prediction
@@ -199,11 +206,13 @@ class BaseParser(ABC):
return ocr_result
except Exception as e:
process_time = time.time() - start_time
logger.error(f"OCR recognition error: {str(e)}, time: {process_time:.2f} seconds")
logger.error(
f"OCR recognition error: {str(e)}, time: {process_time:.2f} seconds"
)
return ""
finally:
# Release image resources
if resized_image is not image and hasattr(resized_image, 'close'):
if resized_image is not image and hasattr(resized_image, "close"):
# Only close the new image we created, not the original image
resized_image.close()
@@ -218,25 +227,33 @@ class BaseParser(ABC):
"""
try:
# If it's a PIL Image
if hasattr(image, 'size'):
if hasattr(image, "size"):
width, height = image.size
if width > self.max_image_size or height > self.max_image_size:
logger.info(f"Resizing PIL image, original size: {width}x{height}")
scale = min(self.max_image_size / width, self.max_image_size / height)
scale = min(
self.max_image_size / width, self.max_image_size / height
)
new_width = int(width * scale)
new_height = int(height * scale)
resized_image = image.resize((new_width, new_height))
logger.info(f"Resized to: {new_width}x{new_height}")
return resized_image
else:
logger.info(f"PIL image size {width}x{height} is within limits, no resizing needed")
logger.info(
f"PIL image size {width}x{height} is within limits, no resizing needed"
)
return image
# If it's a numpy array
elif hasattr(image, 'shape'):
elif hasattr(image, "shape"):
height, width = image.shape[:2]
if width > self.max_image_size or height > self.max_image_size:
logger.info(f"Resizing numpy image, original size: {width}x{height}")
scale = min(self.max_image_size / width, self.max_image_size / height)
logger.info(
f"Resizing numpy image, original size: {width}x{height}"
)
scale = min(
self.max_image_size / width, self.max_image_size / height
)
new_width = int(width * scale)
new_height = int(height * scale)
# Use PIL for resizing numpy arrays
@@ -246,7 +263,9 @@ class BaseParser(ABC):
logger.info(f"Resized to: {new_width}x{new_height}")
return resized_image
else:
logger.info(f"Numpy image size {width}x{height} is within limits, no resizing needed")
logger.info(
f"Numpy image size {width}x{height} is within limits, no resizing needed"
)
return image
else:
logger.warning(f"Unknown image type: {type(image)}, cannot resize")
@@ -278,7 +297,9 @@ class BaseParser(ABC):
caption = ""
if self.caption_parser:
logger.info(f"OCR successfully extracted {len(ocr_text)} characters, continuing to get caption")
logger.info(
f"OCR successfully extracted {len(ocr_text)} characters, continuing to get caption"
)
# Convert image to base64 for caption generation
img_base64 = image_to_base64(image)
if img_base64:
@@ -295,7 +316,7 @@ class BaseParser(ABC):
# Release image resources
del image
return ocr_text, caption, image_url
async def process_image_async(self, image, image_url=None):
@@ -325,13 +346,17 @@ class BaseParser(ABC):
ocr_task = loop.run_in_executor(None, self.perform_ocr, resized_image)
ocr_text = await asyncio.wait_for(ocr_task, timeout=30.0)
except asyncio.TimeoutError:
logger.error("OCR processing timed out (30 seconds), skipping this image")
logger.error(
"OCR processing timed out (30 seconds), skipping this image"
)
ocr_text = ""
except Exception as e:
logger.error(f"OCR processing error: {str(e)}")
ocr_text = ""
logger.info(f"OCR successfully extracted {len(ocr_text)} characters, continuing to get caption")
logger.info(
f"OCR successfully extracted {len(ocr_text)} characters, continuing to get caption"
)
caption = ""
if self.caption_parser:
try:
@@ -340,9 +365,13 @@ class BaseParser(ABC):
if img_base64:
# Add timeout to avoid blocking caption retrieval (30 seconds timeout)
caption_task = self.get_image_caption_async(img_base64)
image_data, caption = await asyncio.wait_for(caption_task, timeout=30.0)
image_data, caption = await asyncio.wait_for(
caption_task, timeout=30.0
)
if caption:
logger.info(f"Successfully obtained image caption: {caption}")
logger.info(
f"Successfully obtained image caption: {caption}"
)
else:
logger.warning("Failed to get caption")
else:
@@ -353,27 +382,20 @@ class BaseParser(ABC):
except Exception as e:
logger.error(f"Failed to get caption: {str(e)}")
else:
logger.info("Caption service not initialized, skipping caption retrieval")
logger.info(
"Caption service not initialized, skipping caption retrieval"
)
return ocr_text, caption, image_url
finally:
# Release image resources
if resized_image is not image and hasattr(resized_image, 'close'):
if resized_image is not image and hasattr(resized_image, "close"):
# Only close the new image we created, not the original image
resized_image.close()
async def process_with_limit(self, idx, image, url, semaphore, current_request_id=None):
async def process_with_limit(self, idx, image, url, semaphore):
"""Function to process a single image using a semaphore"""
try:
# Set request ID in the asynchronous task
if current_request_id:
try:
from utils.request import set_request_id
set_request_id(current_request_id)
logger.info(f"Asynchronous task {idx+1} setting request ID: {current_request_id}")
except Exception as e:
logger.warning(f"Failed to set request ID in asynchronous task: {str(e)}")
logger.info(f"Waiting to process image {idx+1}")
async with semaphore: # Use semaphore to control concurrency
logger.info(f"Starting to process image {idx+1}")
@@ -385,7 +407,7 @@ class BaseParser(ABC):
return ("", "", url) # Return empty result to avoid overall failure
finally:
# Manually release image resources
if hasattr(image, 'close'):
if hasattr(image, "close"):
image.close()
async def process_multiple_images(self, images_data):
@@ -404,26 +426,19 @@ class BaseParser(ABC):
return []
# Set max concurrency, reduce concurrency to avoid resource contention
max_concurrency = min(self.max_concurrent_tasks, 5) # Reduce concurrency to prevent excessive memory usage
max_concurrency = min(
self.max_concurrent_tasks, 1
) # Reduce concurrency to prevent excessive memory usage
# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrency)
# Store results to avoid overall failure due to task failure
results = []
# Get current request ID to set in each asynchronous task
current_request_id = None
try:
from utils.request import get_request_id
current_request_id = get_request_id()
logger.info(f"Capturing current request ID before async processing: {current_request_id}")
except Exception as e:
logger.warning(f"Failed to get current request ID: {str(e)}")
# Create all tasks, but use semaphore to limit actual concurrency
tasks = [
self.process_with_limit(i, img, url, semaphore, current_request_id)
self.process_with_limit(i, img, url, semaphore)
for i, (img, url) in enumerate(images_data)
]
@@ -434,7 +449,9 @@ class BaseParser(ABC):
# Handle possible exception results
for i, result in enumerate(completed_results):
if isinstance(result, Exception):
logger.error(f"Image {i+1} processing returned an exception: {str(result)}")
logger.error(
f"Image {i+1} processing returned an exception: {str(result)}"
)
# For exceptions, add empty results
if i < len(images_data):
results.append(("", "", images_data[i][1]))
@@ -449,7 +466,9 @@ class BaseParser(ABC):
images_data.clear()
logger.info("Image processing resource cleanup complete")
logger.info(f"Completed concurrent processing of {len(results)}/{len(images_data)} images")
logger.info(
f"Completed concurrent processing of {len(results)}/{len(images_data)} images"
)
return results
def decode_bytes(self, content: bytes) -> str:
@@ -529,9 +548,13 @@ class BaseParser(ABC):
def __init_storage(self):
"""Initialize storage client based on configuration"""
if self._storage is None:
storage_config = self.chunking_config.storage_config if self.chunking_config else None
storage_config = (
self.chunking_config.storage_config if self.chunking_config else None
)
self._storage = create_storage(storage_config)
logger.info(f"Initialized storage client: {self._storage.__class__.__name__}")
logger.info(
f"Initialized storage client: {self._storage.__class__.__name__}"
)
return self._storage
def upload_file(self, file_path: str) -> str:
@@ -605,40 +628,50 @@ class BaseParser(ABC):
logger.info(f"Beginning chunking process for text")
chunks = self.chunk_text(text)
logger.info(f"Created {len(chunks)} chunks from document")
# Limit the number of returned chunks
if len(chunks) > self.max_chunks:
logger.warning(f"Limiting chunks from {len(chunks)} to maximum {self.max_chunks}")
chunks = chunks[:self.max_chunks]
logger.warning(
f"Limiting chunks from {len(chunks)} to maximum {self.max_chunks}"
)
chunks = chunks[: self.max_chunks]
# If multimodal is enabled and file type is supported, process images in each chunk
if self.enable_multimodal:
# Get file extension and convert to lowercase
file_ext = (
os.path.splitext(self.file_name)[1].lower()
if self.file_name
else (
self.file_type.lower()
if self.file_type
else ""
)
else (self.file_type.lower() if self.file_type else "")
)
# Define allowed file types for image processing
allowed_types = [
'.pdf', # PDF files
'.md', '.markdown', # Markdown files
'.doc', '.docx', # Word documents
".pdf", # PDF files
".md",
".markdown", # Markdown files
".doc",
".docx", # Word documents
# Image files
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'
".jpg",
".jpeg",
".png",
".gif",
".bmp",
".tiff",
".webp",
]
if file_ext in allowed_types:
logger.info(f"Processing images in each chunk for file type: {file_ext}")
logger.info(
f"Processing images in each chunk for file type: {file_ext}"
)
chunks = self.process_chunks_images(chunks, image_map)
else:
logger.info(f"Skipping image processing for unsupported file type: {file_ext}")
logger.info(
f"Skipping image processing for unsupported file type: {file_ext}"
)
return ParseResult(text=text, chunks=chunks)
def _split_into_units(self, text: str) -> List[str]:
@@ -649,11 +682,13 @@ class BaseParser(ABC):
Returns:
基本单元的列表
"""
logger.info(f"Splitting text into basic units with robust structure protection, text length: {len(text)}")
logger.info(
f"Splitting text into basic units with robust structure protection, text length: {len(text)}"
)
# 定义所有需要作为整体保护的结构模式 ---
table_pattern = r"(?m)(^\|.*\|[ \t]*\r?\n(?:[ \t]*\r?\n)?^\|\s*:?--+.*\r?\n(?:^\|.*\|\r?\n?)*)"
# 其他需要保护的结构(代码块、公式块、行内元素)
code_block_pattern = r"```[\s\S]*?```"
math_block_pattern = r"\$\$[\s\S]*?\$\$"
@@ -661,7 +696,12 @@ class BaseParser(ABC):
# 查找所有受保护结构的位置 ---
protected_ranges = []
for pattern in [table_pattern, code_block_pattern, math_block_pattern, inline_pattern]:
for pattern in [
table_pattern,
code_block_pattern,
math_block_pattern,
inline_pattern,
]:
for match in re.finditer(pattern, text):
# 确保匹配到的不是空字符串,避免无效范围
if match.group(0).strip():
@@ -669,8 +709,10 @@ class BaseParser(ABC):
# 按起始位置排序
protected_ranges.sort(key=lambda x: x[0])
logger.info(f"Found {len(protected_ranges)} protected structures (tables, code, formulas, images, links).")
logger.info(
f"Found {len(protected_ranges)} protected structures (tables, code, formulas, images, links)."
)
# 合并可能重叠的保护范围 ---
# 确保我们有一组不相交的、需要保护的文本块
if protected_ranges:
@@ -685,15 +727,17 @@ class BaseParser(ABC):
# 如果不重叠,则完成当前范围并开始一个新的范围
merged_ranges.append((current_start, current_end))
current_start, current_end = next_start, next_end
merged_ranges.append((current_start, current_end))
protected_ranges = merged_ranges
logger.info(f"After merging overlaps, {len(protected_ranges)} protected ranges remain.")
logger.info(
f"After merging overlaps, {len(protected_ranges)} protected ranges remain."
)
# 根据保护范围和分隔符来分割文本 ---
units = []
last_end = 0
# 定义分隔符的正则表达式,通过加括号来保留分隔符本身
separator_pattern = f"({'|'.join(re.escape(s) for s in self.separators)})"
@@ -703,7 +747,7 @@ class BaseParser(ABC):
pre_text = text[last_end:start]
# 对这部分非保护文本进行分割,并保留分隔符
segments = re.split(separator_pattern, pre_text)
units.extend([s for s in segments if s]) # 添加所有非空部分
units.extend([s for s in segments if s]) # 添加所有非空部分
# b. 将整个受保护的块(例如,一个完整的表格)作为一个单独的、不可分割的单元添加
protected_text = text[start:end]
@@ -715,10 +759,11 @@ class BaseParser(ABC):
if last_end < len(text):
post_text = text[last_end:]
segments = re.split(separator_pattern, post_text)
units.extend([s for s in segments if s]) # 添加所有非空部分
units.extend([s for s in segments if s]) # 添加所有非空部分
logger.info(f"Text splitting complete, created {len(units)} final basic units.")
return units
def _find_complete_units(self, units: List[str], target_size: int) -> List[str]:
"""Find a list of complete units that do not exceed the target size
@@ -884,71 +929,58 @@ class BaseParser(ABC):
"""
logger.info(f"Extracting image information from Chunk #{chunk.seq}")
text = chunk.content
# Regex to extract image information from text, supporting Markdown images and HTML images
img_pattern = r'!\[([^\]]*)\]\(([^)]+)\)|<img [^>]*src="([^"]+)" [^>]*>'
# Extract image information
img_matches = list(re.finditer(img_pattern, text))
logger.info(f"Chunk #{chunk.seq} found {len(img_matches)} images")
images_info = []
for match_idx, match in enumerate(img_matches):
# Process image URL
img_url = match.group(2) if match.group(2) else match.group(3)
alt_text = match.group(1) if match.group(1) else ""
# Record image information
image_info = {
"original_url": img_url,
"start": match.start(),
"end": match.end(),
"alt_text": alt_text,
"match_text": text[match.start():match.end()]
"match_text": text[match.start() : match.end()],
}
images_info.append(image_info)
logger.info(
f"Image in Chunk #{chunk.seq} {match_idx+1}: "
f"URL={img_url[:50]}..."
f"Image in Chunk #{chunk.seq} {match_idx+1}: " f"URL={img_url[:50]}..."
if len(img_url) > 50
else f"Image in Chunk #{chunk.seq} {match_idx+1}: URL={img_url}"
)
return images_info
async def download_and_upload_image(self, img_url: str, current_request_id=None, image_map=None):
async def download_and_upload_image(self, img_url: str):
"""Download image and upload to object storage, if it's already an object storage path or local path, use directly
Args:
img_url: Image URL or local path
current_request_id: Current request ID
image_map: Optional dictionary mapping image URLs to Image objects
Returns:
tuple: (original URL, storage URL, image object), if failed returns (original URL, None, None)
"""
# Set request ID context in the asynchronous task
try:
if current_request_id:
from utils.request import set_request_id
set_request_id(current_request_id)
logger.info(f"Asynchronous task setting request ID: {current_request_id}")
except Exception as e:
logger.warning(f"Failed to set request ID in asynchronous task: {str(e)}")
try:
import requests
from PIL import Image
import io
# Check if image is already in the image_map
if image_map and img_url in image_map:
logger.info(f"Image already in image_map: {img_url}, using cached object")
return img_url, img_url, image_map[img_url]
# Check if it's already a storage URL (COS or MinIO)
is_storage_url = any(pattern in img_url for pattern in ["cos", "myqcloud.com", "minio", ".s3."])
is_storage_url = any(
pattern in img_url
for pattern in ["cos", "myqcloud.com", "minio", ".s3."]
)
if is_storage_url:
logger.info(f"Image already on COS: {img_url}, no need to re-upload")
try:
@@ -961,7 +993,7 @@ class BaseParser(ABC):
proxies["http"] = http_proxy
if https_proxy:
proxies["https"] = https_proxy
response = requests.get(img_url, timeout=5, proxies=proxies)
if response.status_code == 200:
image = Image.open(io.BytesIO(response.content))
@@ -972,12 +1004,14 @@ class BaseParser(ABC):
# Image will be closed by the caller
pass
else:
logger.warning(f"Failed to get storage image: {response.status_code}")
logger.warning(
f"Failed to get storage image: {response.status_code}"
)
return img_url, img_url, None
except Exception as e:
logger.error(f"Error getting storage image: {str(e)}")
return img_url, img_url, None
# Check if it's a local file path
elif os.path.exists(img_url) and os.path.isfile(img_url):
logger.info(f"Using local image file: {img_url}")
@@ -986,17 +1020,19 @@ class BaseParser(ABC):
# Read local image
image = Image.open(img_url)
# Upload to storage
with open(img_url, 'rb') as f:
with open(img_url, "rb") as f:
content = f.read()
storage_url = self.upload_bytes(content)
logger.info(f"Successfully uploaded local image to storage: {storage_url}")
logger.info(
f"Successfully uploaded local image to storage: {storage_url}"
)
return img_url, storage_url, image
except Exception as e:
logger.error(f"Error processing local image: {str(e)}")
if image and hasattr(image, 'close'):
if image and hasattr(image, "close"):
image.close()
return img_url, None, None
# Normal remote URL download handling
else:
# Get proxy settings from environment variables
@@ -1007,17 +1043,21 @@ class BaseParser(ABC):
proxies["http"] = http_proxy
if https_proxy:
proxies["https"] = https_proxy
logger.info(f"Downloading image {img_url}, using proxy: {proxies if proxies else 'None'}")
logger.info(
f"Downloading image {img_url}, using proxy: {proxies if proxies else 'None'}"
)
response = requests.get(img_url, timeout=5, proxies=proxies)
if response.status_code == 200:
# Download successful, create image object
image = Image.open(io.BytesIO(response.content))
try:
# Upload to storage using the method in BaseParser
storage_url = self.upload_bytes(response.content)
logger.info(f"Successfully uploaded image to storage: {storage_url}")
logger.info(
f"Successfully uploaded image to storage: {storage_url}"
)
return img_url, storage_url, image
finally:
# Image will be closed by the caller
@@ -1025,66 +1065,79 @@ class BaseParser(ABC):
else:
logger.warning(f"Failed to download image: {response.status_code}")
return img_url, None, None
except Exception as e:
logger.error(f"Error downloading or processing image: {str(e)}")
return img_url, None, None
async def process_chunk_images_async(self, chunk, chunk_idx, total_chunks, current_request_id=None, image_map=None):
async def process_chunk_images_async(
self, chunk, chunk_idx, total_chunks, image_map=None
):
"""Asynchronously process images in a single Chunk
Args:
chunk: Chunk object to process
chunk_idx: Chunk index
total_chunks: Total number of chunks
current_request_id: Current request ID
image_map: Optional dictionary mapping image URLs to Image objects
Returns:
Processed Chunk object
"""
logger.info(f"Starting to process images in Chunk #{chunk_idx+1}/{total_chunks}")
logger.info(
f"Starting to process images in Chunk #{chunk_idx+1}/{total_chunks}"
)
# Extract image information from the Chunk
images_info = self.extract_images_from_chunk(chunk)
if not images_info:
logger.info(f"Chunk #{chunk_idx+1} found no images")
return chunk
# Prepare images that need to be downloaded and processed
images_to_process = []
url_to_info_map = {} # Map URL to image information
# Record all image URLs that need to be processed
for img_info in images_info:
url = img_info["original_url"]
url_to_info_map[url] = img_info
# Create an asynchronous event loop (current loop)
loop = asyncio.get_event_loop()
# Concurrent download and upload of images
tasks = [self.download_and_upload_image(url, current_request_id, image_map) for url in url_to_info_map.keys()]
results = await asyncio.gather(*tasks)
results = []
download_tasks = []
for img_url in url_to_info_map.keys(): # Check if image is already in the image_map
if image_map and img_url in image_map:
logger.info(f"Image already in image_map: {img_url}, using cached object")
results.append((img_url, img_url, image_map[img_url]))
else:
download_task = self.download_and_upload_image(img_url)
download_tasks.append(download_task)
# Concurrent download and upload of images, ignore images that are already in the image_map
results.extend(await asyncio.gather(*download_tasks))
# Process download results, prepare for OCR processing
for orig_url, cos_url, image in results:
if cos_url and image:
img_info = url_to_info_map[orig_url]
img_info["cos_url"] = cos_url
images_to_process.append((image, cos_url))
# If no images were successfully downloaded and uploaded, return the original Chunk
if not images_to_process:
logger.info(f"Chunk #{chunk_idx+1} found no successfully downloaded and uploaded images")
logger.info(
f"Chunk #{chunk_idx+1} found no successfully downloaded and uploaded images"
)
return chunk
# Concurrent processing of all images (OCR + caption)
logger.info(f"Processing {len(images_to_process)} images in Chunk #{chunk_idx+1}")
logger.info(
f"Processing {len(images_to_process)} images in Chunk #{chunk_idx+1}"
)
# Concurrent processing of all images
processed_results = await self.process_multiple_images(images_to_process)
# Process OCR and Caption results
for ocr_text, caption, img_url in processed_results:
# Find the corresponding original URL
@@ -1092,22 +1145,24 @@ class BaseParser(ABC):
if info.get("cos_url") == img_url:
info["ocr_text"] = ocr_text if ocr_text else ""
info["caption"] = caption if caption else ""
if ocr_text:
logger.info(f"Image OCR extracted {len(ocr_text)} characters: {img_url}")
logger.info(
f"Image OCR extracted {len(ocr_text)} characters: {img_url}"
)
if caption:
logger.info(f"Obtained image description: '{caption}'")
break
# Add processed image information to the Chunk
processed_images = []
for img_info in images_info:
if "cos_url" in img_info:
processed_images.append(img_info)
# Update image information in the Chunk
chunk.images = processed_images
logger.info(f"Completed image processing in Chunk #{chunk_idx+1}")
return chunk
@@ -1120,42 +1175,37 @@ class BaseParser(ABC):
Returns:
List of processed document chunks
"""
logger.info(f"Starting concurrent processing of images in all {len(chunks)} chunks")
logger.info(
f"Starting concurrent processing of images in all {len(chunks)} chunks"
)
if not chunks:
logger.warning("No chunks to process")
return chunks
# Get current request ID to pass to asynchronous tasks
current_request_id = None
try:
from utils.request import get_request_id
current_request_id = get_request_id()
logger.info(f"Capturing current request ID before async processing: {current_request_id}")
except Exception as e:
logger.warning(f"Failed to get current request ID: {str(e)}")
# Create and run all Chunk concurrent processing tasks
async def process_all_chunks():
# Set max concurrency, reduce concurrency to avoid resource contention
max_concurrency = min(self.max_concurrent_tasks, 1) # Reduce concurrency
# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrency)
async def process_with_limit(chunk, idx, total):
"""Use semaphore to control concurrent processing of Chunks"""
async with semaphore:
return await self.process_chunk_images_async(chunk, idx, total, current_request_id, image_map)
return await self.process_chunk_images_async(
chunk, idx, total, image_map
)
# Create tasks for all Chunks
tasks = [
process_with_limit(chunk, idx, len(chunks))
for idx, chunk in enumerate(chunks)
]
# Execute all tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle possible exceptions
processed_chunks = []
for i, result in enumerate(results):
@@ -1166,9 +1216,9 @@ class BaseParser(ABC):
processed_chunks.append(chunks[i])
else:
processed_chunks.append(result)
return processed_chunks
# Create event loop and run all tasks
try:
# Check if event loop already exists
@@ -1181,11 +1231,13 @@ class BaseParser(ABC):
# If no event loop, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Execute processing for all Chunks
processed_chunks = loop.run_until_complete(process_all_chunks())
logger.info(f"Successfully completed concurrent processing of {len(processed_chunks)}/{len(chunks)} chunks")
logger.info(
f"Successfully completed concurrent processing of {len(processed_chunks)}/{len(chunks)} chunks"
)
return processed_chunks
except Exception as e:
logger.error(f"Error during concurrent chunk processing: {str(e)}")