From f3920b02f78ea344eafbf53428f0a1c0c039a7ac Mon Sep 17 00:00:00 2001 From: j2rong4cn Date: Fri, 4 Jul 2025 12:50:04 +0800 Subject: [PATCH] fix(net): goroutine deadlock --- internal/net/request.go | 56 +++++++++++++++++++++-------------------- internal/net/serve.go | 6 ++--- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/internal/net/request.go b/internal/net/request.go index 41640972..59db7643 100644 --- a/internal/net/request.go +++ b/internal/net/request.go @@ -3,6 +3,7 @@ package net import ( "bytes" "context" + "errors" "fmt" "io" "net/http" @@ -119,7 +120,7 @@ type ConcurrencyLimit struct { Limit int // 需要大于0 } -var ErrExceedMaxConcurrency = fmt.Errorf("ExceedMaxConcurrency") +var ErrExceedMaxConcurrency = errors.New("ExceedMaxConcurrency") func (l *ConcurrencyLimit) sub() error { l._m.Lock() @@ -279,10 +280,9 @@ func (d *downloader) interrupt() error { err := fmt.Errorf("interrupted") d.err = err } - if d.chunkChannel != nil { + close(d.chunkChannel) + if d.bufs != nil { d.cancel(err) - close(d.chunkChannel) - d.chunkChannel = nil for _, buf := range d.bufs { buf.Close() } @@ -291,8 +291,6 @@ func (d *downloader) interrupt() error { d.concurrency = -d.concurrency } log.Debugf("maxConcurrency:%d", d.cfg.Concurrency+d.concurrency) - } else { - log.Debug("close of closed channel") } return err } @@ -314,31 +312,35 @@ func (d *downloader) finishBuf(id int) (isLast bool, nextBuf *Buf) { // downloadPart is an individual goroutine worker reading from the ch channel // and performing Http request on the data with a given byte range. func (d *downloader) downloadPart() { - //defer d.wg.Done() + defer d.concurrencyFinish() for { - c, ok := <-d.chunkChannel - if !ok { - break - } - if d.getErr() != nil { - // Drain the channel if there is an error, to prevent deadlocking - // of download producer. - break - } - if err := d.downloadChunk(&c); err != nil { - if err == errCancelConcurrency { - break + select { + case <-d.ctx.Done(): + return + case c, ok := <-d.chunkChannel: + if !ok { + return } - if err == context.Canceled { - if e := context.Cause(d.ctx); e != nil { - err = e + if d.getErr() != nil { + // Drain the channel if there is an error, to prevent deadlocking + // of download producer. + return + } + if err := d.downloadChunk(&c); err != nil { + if err == errCancelConcurrency { + return } + if err == context.Canceled { + if e := context.Cause(d.ctx); e != nil { + err = e + } + } + d.setErr(err) + d.cancel(err) + return } - d.setErr(err) - d.cancel(err) } } - d.concurrencyFinish() } // downloadChunk downloads the chunk @@ -390,8 +392,8 @@ func (d *downloader) downloadChunk(ch *chunk) error { return err } -var errCancelConcurrency = fmt.Errorf("cancel concurrency") -var errInfiniteRetry = fmt.Errorf("infinite retry") +var errCancelConcurrency = errors.New("cancel concurrency") +var errInfiniteRetry = errors.New("infinite retry") func (d *downloader) tryDownloadChunk(params *HttpRequestParams, ch *chunk) (int64, error) { resp, err := d.cfg.HttpClient(d.ctx, params) diff --git a/internal/net/serve.go b/internal/net/serve.go index 44258539..0dac0d3d 100644 --- a/internal/net/serve.go +++ b/internal/net/serve.go @@ -120,7 +120,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time reader, err := RangeReadCloser.RangeRead(ctx, http_range.Range{Length: -1}) if err != nil { code = http.StatusRequestedRangeNotSatisfiable - if err == ErrExceedMaxConcurrency { + if errors.Is(err, ErrExceedMaxConcurrency) { code = http.StatusTooManyRequests } http.Error(w, err.Error(), code) @@ -143,7 +143,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time sendContent, err = RangeReadCloser.RangeRead(ctx, ra) if err != nil { code = http.StatusRequestedRangeNotSatisfiable - if err == ErrExceedMaxConcurrency { + if errors.Is(err, ErrExceedMaxConcurrency) { code = http.StatusTooManyRequests } http.Error(w, err.Error(), code) @@ -205,7 +205,7 @@ func ServeHTTP(w http.ResponseWriter, r *http.Request, name string, modTime time log.Warnf("Maybe size incorrect or reader not giving correct/full data, or connection closed before finish. written bytes: %d ,sendSize:%d, ", written, sendSize) } code = http.StatusInternalServerError - if err == ErrExceedMaxConcurrency { + if errors.Is(err, ErrExceedMaxConcurrency) { code = http.StatusTooManyRequests } w.WriteHeader(code)