chore(clone): Optimize concurrent processing, improve error handling and annotations

This commit is contained in:
begoniezhao
2025-08-29 14:49:48 +08:00
parent a1473fe731
commit 7be2e99aa2
3 changed files with 67 additions and 51 deletions

View File

@@ -14,7 +14,6 @@ import (
"slices"
"sort"
"strings"
"sync"
"time"
"github.com/Tencent/WeKnora/internal/application/service/retriever"
@@ -271,9 +270,11 @@ func (s *knowledgeService) CreateKnowledgeFromURL(ctx context.Context,
// Check if URL already exists in the knowledge base
tenantID := ctx.Value(types.TenantIDContextKey).(uint)
logger.Infof(ctx, "Checking if URL exists, tenant ID: %d", tenantID)
fileHash := calculateStr(url)
exists, existingKnowledge, err := s.repo.CheckKnowledgeExists(ctx, tenantID, kbID, &types.KnowledgeCheckParams{
Type: "url",
URL: url,
Type: "url",
URL: url,
FileHash: fileHash,
})
if err != nil {
logger.Errorf(ctx, "Failed to check knowledge existence: %v", err)
@@ -306,6 +307,7 @@ func (s *knowledgeService) CreateKnowledgeFromURL(ctx context.Context,
KnowledgeBaseID: kbID,
Type: "url",
Source: url,
FileHash: fileHash,
ParseStatus: "pending",
EnableStatus: "disabled",
CreatedAt: time.Now(),
@@ -1375,49 +1377,47 @@ func (s *knowledgeService) CloneKnowledgeBase(ctx context.Context, srcID, dstID
logger.Infof(ctx, "Knowledge after update to add: %d, delete: %d", len(addKnowledge), len(delKnowledge))
batch := 10
wg := sync.WaitGroup{}
errCh := make(chan error, len(delKnowledge))
g, gctx := errgroup.WithContext(ctx)
for ids := range slices.Chunk(delKnowledge, batch) {
wg.Add(1)
go func(ids []string) {
defer wg.Done()
if err := s.DeleteKnowledgeList(ctx, ids); err != nil {
errCh <- fmt.Errorf("delete knowledge %v: %w", ids, err)
ids = ids
g.Go(func() error {
err := s.DeleteKnowledgeList(gctx, ids)
if err != nil {
logger.Errorf(gctx, "delete partial knowledge %v: %w", ids, err)
return err
}
}(ids)
return nil
})
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
return err
}
err = g.Wait()
if err != nil {
logger.Errorf(ctx, "delete total knowledge %d: %v", len(delKnowledge), err)
return err
}
wg = sync.WaitGroup{}
errCh = make(chan error, len(addKnowledge)+len(delKnowledge))
for ids := range slices.Chunk(addKnowledge, batch) {
wg.Add(1)
go func(ids []string) {
defer wg.Done()
for _, kID := range ids {
srcKn, err := s.repo.GetKnowledgeByID(ctx, srcKB.TenantID, kID)
if err != nil {
errCh <- fmt.Errorf("get knowledge %s: %w", kID, err)
continue
}
if err := s.cloneKnowledge(ctx, srcKn, dstKB); err != nil {
errCh <- fmt.Errorf("move knowledge %s: %w", kID, err)
}
// Copy context out of auto-stop task
g, gctx = errgroup.WithContext(ctx)
g.SetLimit(batch)
for _, knowledge := range addKnowledge {
knowledge = knowledge
g.Go(func() error {
srcKn, err := s.repo.GetKnowledgeByID(gctx, srcKB.TenantID, knowledge)
if err != nil {
logger.Errorf(gctx, "get knowledge %s: %w", knowledge, err)
return err
}
}(ids)
err = s.cloneKnowledge(gctx, srcKn, dstKB)
if err != nil {
logger.Errorf(gctx, "clone knowledge %s: %w", knowledge, err)
return err
}
return nil
})
}
wg.Wait()
close(errCh)
for err := range errCh {
if err != nil {
return err
}
err = g.Wait()
if err != nil {
logger.Errorf(ctx, "add total knowledge %d: %v", len(addKnowledge), err)
return err
}
return nil
}