fix rewrite error handing
This commit is contained in:
@@ -22,6 +22,7 @@ import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
@@ -578,10 +579,14 @@ func (s *storage) blockDependencies() block.Dependencies {
|
||||
}
|
||||
|
||||
func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Feed, error) {
|
||||
rewritten := make([]*model.Feed, 0, len(feeds))
|
||||
var wg sync.WaitGroup
|
||||
var errs []error
|
||||
var mu sync.Mutex
|
||||
var (
|
||||
rewritten = make([]*model.Feed, 0, len(feeds))
|
||||
wg sync.WaitGroup
|
||||
mu sync.Mutex
|
||||
errs []error
|
||||
dropped atomic.Int32
|
||||
)
|
||||
|
||||
for _, item := range feeds { // TODO: Limit the concurrency & goroutine number.
|
||||
wg.Add(1)
|
||||
go func(item *model.Feed) {
|
||||
@@ -596,6 +601,7 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe
|
||||
}
|
||||
if len(labels) == 0 {
|
||||
log.Debug(ctx, "drop feed", "id", item.ID)
|
||||
dropped.Add(1)
|
||||
|
||||
return // Drop empty labels.
|
||||
}
|
||||
@@ -607,7 +613,7 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe
|
||||
}(item)
|
||||
}
|
||||
wg.Wait()
|
||||
if allFailed := len(errs) == len(feeds); allFailed {
|
||||
if allFailed := len(errs) == len(feeds)-int(dropped.Load()); allFailed {
|
||||
return nil, errs[0]
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
|
||||
Reference in New Issue
Block a user