diff --git a/pkg/storage/feed/feed.go b/pkg/storage/feed/feed.go index 5c41b72..2fa3877 100644 --- a/pkg/storage/feed/feed.go +++ b/pkg/storage/feed/feed.go @@ -22,6 +22,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -578,10 +579,14 @@ func (s *storage) blockDependencies() block.Dependencies { } func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Feed, error) { - rewritten := make([]*model.Feed, 0, len(feeds)) - var wg sync.WaitGroup - var errs []error - var mu sync.Mutex + var ( + rewritten = make([]*model.Feed, 0, len(feeds)) + wg sync.WaitGroup + mu sync.Mutex + errs []error + dropped atomic.Int32 + ) + for _, item := range feeds { // TODO: Limit the concurrency & goroutine number. wg.Add(1) go func(item *model.Feed) { @@ -596,6 +601,7 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe } if len(labels) == 0 { log.Debug(ctx, "drop feed", "id", item.ID) + dropped.Add(1) return // Drop empty labels. } @@ -607,7 +613,7 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe }(item) } wg.Wait() - if allFailed := len(errs) == len(feeds); allFailed { + if allFailed := len(errs) == len(feeds)-int(dropped.Load()); allFailed { return nil, errs[0] } if len(errs) > 0 {