From 18cc247532c212a89ed448c551739a2b50e77c1e Mon Sep 17 00:00:00 2001 From: glidea <740696441@qq.com> Date: Mon, 28 Apr 2025 23:29:34 +0800 Subject: [PATCH] add summary for notification & misc fix --- docs/config-zh.md | 17 +++--- docs/config.md | 17 +++--- main.go | 1 + pkg/config/config.go | 6 +++ pkg/notify/channel/email.go | 44 +++++++++++++-- pkg/notify/notify.go | 17 ++++-- pkg/notify/route/route.go | 95 ++++++++++++++++++++++++++++----- pkg/notify/route/route_test.go | 10 +++- pkg/rewrite/rewrite.go | 4 +- pkg/schedule/rule/periodic.go | 2 +- pkg/storage/feed/block/block.go | 8 +-- pkg/storage/feed/feed.go | 5 +- 12 files changed, 183 insertions(+), 43 deletions(-) diff --git a/docs/config-zh.md b/docs/config-zh.md index f5057fc..6100e4f 100644 --- a/docs/config-zh.md +++ b/docs/config-zh.md @@ -142,13 +142,16 @@ 此结构可以使用 `sub_routes` 进行嵌套。Feed 会首先尝试匹配子路由;如果没有子路由匹配,则应用父路由的配置。 -| 字段 | 类型 | 描述 | 默认值 | 是否必需 | -| :--------------------------------- | :----------- | :-------------------------------------------------------------------------------------------------------- | :----- | :------------ | -| `...matchers` (仅子路由) | `字符串列表` | 标签匹配器,用于确定 Feed 是否属于此子路由。例如 `["category=tech", "source!=github"]`。 | `[]` | 是 (仅子路由) | -| `...receivers` | `字符串列表` | 接收者的名称列表 (在 `notify.receivers` 中定义),用于发送匹配此路由的 Feed 的通知。 | `[]` | 是 (至少一个) | -| `...group_by` | `字符串列表` | 在发送通知前用于对 Feed 进行分组的标签列表。每个分组会产生一个单独的通知。例如 `["source", "category"]`。 | `[]` | 是 (至少一个) | -| `...compress_by_related_threshold` | `*float32` | 如果设置,则根据语义相关性压缩分组内高度相似的 Feed,仅发送一个代表。阈值 (0-1),越高表示越相似。 | `0.85` | 否 | -| `...sub_routes` | `对象列表` | 嵌套路由列表。允许定义更具体的路由规则。每个对象遵循 **通知路由配置**。 | `[]` | 否 | +| 字段 | 类型 | 描述 | 默认值 | 是否必需 | +| :--------------------------------- | :----------- | :-------------------------------------------------------------------------------------------------------- | :---------------------- | :------------ | +| `...matchers` (仅子路由) | `字符串列表` | 标签匹配器,用于确定 Feed 是否属于此子路由。例如 `["category=tech", "source!=github"]`。 | `[]` | 是 (仅子路由) | +| `...receivers` | `字符串列表` | 接收者的名称列表 (在 `notify.receivers` 中定义),用于发送匹配此路由的 Feed 的通知。 | `[]` | 是 (至少一个) | +| `...group_by` | `字符串列表` | 在发送通知前用于对 Feed 进行分组的标签列表。每个分组会产生一个单独的通知。例如 `["source", "category"]`。 | `[]` | 是 (至少一个) | +| `...source_label` | `string` | 从每个 Feed 中提取内容并进行总结的源标签。默认为所有标签。强烈建议设置为 'summary' 以减少上下文长度。 | 所有标签 | 否 | +| `...summary_prompt` | `string` | 用于总结每个分组的 Feed 的 Prompt。 | | 否 | +| `...llm` | `string` | 使用的 LLM 的名称。默认为 `llms` 部分中的默认 LLM。建议使用上下文长度较大的 LLM。 | `llms` 部分中的默认 LLM | 否 | +| `...compress_by_related_threshold` | `*float32` | 如果设置,则根据语义相关性压缩分组内高度相似的 Feed,仅发送一个代表。阈值 (0-1),越高表示越相似。 | `0.85` | 否 | +| `...sub_routes` | `对象列表` | 嵌套路由列表。允许定义更具体的路由规则。每个对象遵循 **通知路由配置**。 | `[]` | 否 | ### 通知接收者配置 (`notify.receivers[]`) diff --git a/docs/config.md b/docs/config.md index f53c098..af5d9fd 100644 --- a/docs/config.md +++ b/docs/config.md @@ -142,13 +142,16 @@ Defines rules for querying and monitoring feeds. This structure can be nested using `sub_routes`. A feed is matched against sub-routes first; if no sub-route matches, the parent route's configuration applies. -| Field | Type | Description | Default | Required | -| :--------------------------------- | :-------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------ | :------------------- | -| `...matchers` (only in sub-routes) | list of strings | Label matchers to determine if a feed belongs to this sub-route. e.g. `["category=tech", "source!=github"]`. | `[]` | Yes (for sub-routes) | -| `...receivers` | list of strings | Names of the receivers (defined in `notify.receivers`) to send notifications for feeds matching this route. | `[]` | Yes (at least one) | -| `...group_by` | list of strings | Labels to group feeds by before sending notifications. Each group results in a separate notification. e.g., `["source", "category"]`. | `[]` | Yes (at least one) | -| `...compress_by_related_threshold` | *float32 | If set, compresses highly similar feeds (based on semantic relatedness) within a group, sending only one representative. Threshold (0-1). Higher means more similar. | `0.85` | No | -| `...sub_routes` | list of objects | Nested routes. Allows defining more specific routing rules. Each object follows the **Notify Route Configuration**. | `[]` | No | +| Field | Type | Description | Default | Required | +| :--------------------------------- | :-------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :---------------------------- | :------------------- | +| `...matchers` (only in sub-routes) | list of strings | Label matchers to determine if a feed belongs to this sub-route. e.g. `["category=tech", "source!=github"]`. | `[]` | Yes (for sub-routes) | +| `...receivers` | list of strings | Names of the receivers (defined in `notify.receivers`) to send notifications for feeds matching this route. | `[]` | Yes (at least one) | +| `...group_by` | list of strings | Labels to group feeds by before sending notifications. Each group results in a separate notification. e.g., `["source", "category"]`. | `[]` | Yes (at least one) | +| `...source_label` | string | The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length. | all labels | No | +| `...summary_prompt` | string | The prompt to summarize the feeds of each group. | | No | +| `...llm` | string | The LLM name to use. Default is the default LLM in `llms` section. A large context length LLM is recommended. | default LLM in `llms` section | No | +| `...compress_by_related_threshold` | *float32 | If set, compresses highly similar feeds (based on semantic relatedness) within a group, sending only one representative. Threshold (0-1). Higher means more similar. | `0.85` | No | +| `...sub_routes` | list of objects | Nested routes. Allows defining more specific routing rules. Each object follows the **Notify Route Configuration**. | `[]` | No | ### Notify Receiver Configuration (`notify.receivers[]`) diff --git a/main.go b/main.go index a5564fe..d7f0814 100644 --- a/main.go +++ b/main.go @@ -355,6 +355,7 @@ func (a *App) setupNotifier() (err error) { RouterFactory: route.NewFactory(), ChannelFactory: channel.NewFactory(), KVStorage: a.kvStorage, + LLMFactory: a.llmFactory, }) if err != nil { return err diff --git a/pkg/config/config.go b/pkg/config/config.go index 3429a88..8ea7089 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -145,6 +145,9 @@ type SchedulsRule struct { type NotifyRoute struct { Receivers []string `yaml:"receivers,omitempty" json:"receivers,omitempty" desc:"The notify receivers. It is required, at least one receiver is needed."` GroupBy []string `yaml:"group_by,omitempty" json:"group_by,omitempty" desc:"The group by config to group the feeds, each group will be notified individually. It is required, at least one group by is needed."` + SourceLabel string `yaml:"source_label,omitempty" json:"source_label,omitempty" desc:"The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length."` + SummaryPrompt string `yaml:"summary_prompt,omitempty" json:"summary_prompt,omitempty" desc:"The prompt to summarize the feeds of each group."` + LLM string `yaml:"llm,omitempty" json:"llm,omitempty" desc:"The LLM name to use. Default is the default LLM in llms section. A large context length LLM is recommended."` CompressByRelatedThreshold *float32 `yaml:"compress_by_related_threshold,omitempty" json:"compress_by_related_threshold,omitempty" desc:"The threshold to compress the feeds by relatedness, that is, if the feeds are too similar, only one will be notified. Default is 0.85."` SubRoutes []NotifySubRoute `yaml:"sub_routes,omitempty" json:"sub_routes,omitempty" desc:"The sub routes to notify the feeds. A feed prefers to be matched by the sub routes, if not matched, it will be matched by the parent route."` } @@ -154,6 +157,9 @@ type NotifySubRoute struct { Receivers []string `yaml:"receivers,omitempty" json:"receivers,omitempty" desc:"The notify receivers. It is required, at least one receiver is needed."` GroupBy []string `yaml:"group_by,omitempty" json:"group_by,omitempty" desc:"The group by config to group the feeds, each group will be notified individually. It is required, at least one group by is needed."` + SourceLabel string `yaml:"source_label,omitempty" json:"source_label,omitempty" desc:"The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length."` + SummaryPrompt string `yaml:"summary_prompt,omitempty" json:"summary_prompt,omitempty" desc:"The prompt to summarize the feeds of each group."` + LLM string `yaml:"llm,omitempty" json:"llm,omitempty" desc:"The LLM name to use. Default is the default LLM in llms section. A large context length LLM is recommended."` CompressByRelatedThreshold *float32 `yaml:"compress_by_related_threshold,omitempty" json:"compress_by_related_threshold,omitempty" desc:"The threshold to compress the feeds by relatedness, that is, if the feeds are too similar, only one will be notified. Default is 0.85."` SubRoutes []NotifySubRoute `yaml:"sub_routes,omitempty" json:"sub_routes,omitempty" desc:"The sub routes to notify the feeds. A feed prefers to be matched by the sub routes, if not matched, it will be matched by the parent route."` } diff --git a/pkg/notify/channel/email.go b/pkg/notify/channel/email.go index 274d94a..2424622 100644 --- a/pkg/notify/channel/email.go +++ b/pkg/notify/channel/email.go @@ -130,7 +130,7 @@ func (e *email) buildEmail(receiver Receiver, group *route.FeedGroup) (*gomail.M m.SetHeader("To", receiver.Email) m.SetHeader("Subject", group.Name) - body, err := e.buildBodyHTML(group.Feeds) + body, err := e.buildBodyHTML(group) if err != nil { return nil, errors.Wrap(err, "build email body HTML") } @@ -139,7 +139,7 @@ func (e *email) buildEmail(receiver Receiver, group *route.FeedGroup) (*gomail.M return m, nil } -func (e *email) buildBodyHTML(feeds []*route.Feed) ([]byte, error) { +func (e *email) buildBodyHTML(group *route.FeedGroup) ([]byte, error) { bodyBuf := buffer.Get() defer buffer.Put(bodyBuf) @@ -148,14 +148,24 @@ func (e *email) buildBodyHTML(feeds []*route.Feed) ([]byte, error) { return nil, errors.Wrap(err, "write HTML header") } + // Write summary. + if err := e.writeSummary(bodyBuf, group.Summary); err != nil { + return nil, errors.Wrap(err, "write summary") + } + // Write each feed content. - for i, feed := range feeds { + if _, err := bodyBuf.WriteString(` +
+

Feeds

`); err != nil { + return nil, errors.Wrap(err, "write feeds header") + } + for i, feed := range group.Feeds { if err := e.writeFeedContent(bodyBuf, feed); err != nil { return nil, errors.Wrap(err, "write feed content") } // Add separator (except the last feed). - if i < len(feeds)-1 { + if i < len(group.Feeds)-1 { if err := e.writeSeparator(bodyBuf); err != nil { return nil, errors.Wrap(err, "write separator") } @@ -188,6 +198,29 @@ func (e *email) writeHTMLHeader(buf *buffer.Bytes) error { return err } +func (e *email) writeSummary(buf *buffer.Bytes, summary string) error { + if summary == "" { + return nil + } + + if _, err := buf.WriteString(` +

Summary

`); err != nil { + return errors.Wrap(err, "write summary header") + } + + contentHTML, err := textconvert.MarkdownToHTML([]byte(summary)) + if err != nil { + return errors.Wrap(err, "markdown to HTML") + } + + contentHTMLWithStyle := fmt.Sprintf(`
%s
`, contentHTML) + if _, err := buf.WriteString(contentHTMLWithStyle); err != nil { + return errors.Wrap(err, "write summary") + } + + return nil +} + const timeLayout = "01-02 15:04" func (e *email) writeFeedContent(buf *buffer.Bytes, feed *route.Feed) error { @@ -311,7 +344,8 @@ func (e *email) renderMarkdownContent(buf *buffer.Bytes, feed *route.Feed) (n in return 0, errors.Wrap(err, "markdown to HTML") } - if _, err := buf.Write(contentHTML); err != nil { + contentHTMLWithStyle := fmt.Sprintf(`
%s
`, contentHTML) + if _, err := buf.WriteString(contentHTMLWithStyle); err != nil { return 0, errors.Wrap(err, "write content HTML") } diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go index 68b03c4..5f31ba0 100644 --- a/pkg/notify/notify.go +++ b/pkg/notify/notify.go @@ -27,6 +27,7 @@ import ( "github.com/glidea/zenfeed/pkg/component" "github.com/glidea/zenfeed/pkg/config" + "github.com/glidea/zenfeed/pkg/llm" "github.com/glidea/zenfeed/pkg/notify/channel" "github.com/glidea/zenfeed/pkg/notify/route" "github.com/glidea/zenfeed/pkg/schedule/rule" @@ -67,6 +68,9 @@ func (c *Config) From(app *config.App) *Config { c.Route = route.Config{ Route: route.Route{ GroupBy: app.Notify.Route.GroupBy, + SourceLabel: app.Notify.Route.SourceLabel, + SummaryPrompt: app.Notify.Route.SummaryPrompt, + LLM: app.Notify.Route.LLM, CompressByRelatedThreshold: app.Notify.Route.CompressByRelatedThreshold, Receivers: app.Notify.Route.Receivers, }, @@ -105,6 +109,9 @@ func convertSubRoute(from *config.NotifySubRoute) *route.SubRoute { to := &route.SubRoute{ Route: route.Route{ GroupBy: from.GroupBy, + SourceLabel: from.SourceLabel, + SummaryPrompt: from.SummaryPrompt, + LLM: from.LLM, CompressByRelatedThreshold: from.CompressByRelatedThreshold, Receivers: from.Receivers, }, @@ -169,6 +176,7 @@ type Dependencies struct { RouterFactory route.Factory ChannelFactory channel.Factory KVStorage kv.Storage + LLMFactory llm.Factory } // --- Factory code block --- @@ -322,7 +330,10 @@ func (n *notifier) newRouter(config *route.Config) (route.Router, error) { return n.Dependencies().RouterFactory.New( n.Instance(), config, - route.Dependencies{RelatedScore: n.Dependencies().RelatedScore}, + route.Dependencies{ + RelatedScore: n.Dependencies().RelatedScore, + LLMFactory: n.Dependencies().LLMFactory, + }, ) } @@ -339,7 +350,7 @@ func (n *notifier) handle(ctx context.Context, result *rule.Result) { router := n.router n.mu.RUnlock() - groups, err := router.Route(result) + groups, err := router.Route(ctx, result) if err != nil { // We don't retry in notifier, retry should be upstream. log.Error(ctx, errors.Wrap(err, "route")) @@ -428,7 +439,7 @@ func (n *notifier) send(ctx context.Context, work sendWork) error { } var nlogKey = func(group *route.FeedGroup, receiver Receiver) string { - return fmt.Sprintf("notifier.group.%s.receiver.%s", group.Name, receiver.Name) + return fmt.Sprintf("notifier.group.%s.receiver.%s.%d", group.Name, receiver.Name, group.Time.Unix()) } func (n *notifier) isSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) bool { diff --git a/pkg/notify/route/route.go b/pkg/notify/route/route.go index 79a1ca1..b287f77 100644 --- a/pkg/notify/route/route.go +++ b/pkg/notify/route/route.go @@ -16,6 +16,8 @@ package route import ( + "context" + "encoding/json" "fmt" "sort" "strings" @@ -25,16 +27,20 @@ import ( "k8s.io/utils/ptr" "github.com/glidea/zenfeed/pkg/component" + "github.com/glidea/zenfeed/pkg/llm" "github.com/glidea/zenfeed/pkg/model" "github.com/glidea/zenfeed/pkg/schedule/rule" "github.com/glidea/zenfeed/pkg/storage/feed/block" + "github.com/glidea/zenfeed/pkg/telemetry" + telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model" + runtimeutil "github.com/glidea/zenfeed/pkg/util/runtime" timeutil "github.com/glidea/zenfeed/pkg/util/time" ) // --- Interface code block --- type Router interface { component.Component - Route(result *rule.Result) (groups []*Group, err error) + Route(ctx context.Context, result *rule.Result) (groups []*Group, err error) } type Config struct { @@ -43,6 +49,9 @@ type Config struct { type Route struct { GroupBy []string + SourceLabel string + SummaryPrompt string + LLM string CompressByRelatedThreshold *float32 Receivers []string SubRoutes SubRoutes @@ -158,6 +167,7 @@ func (c *Config) Validate() error { type Dependencies struct { RelatedScore func(a, b [][]float32) (float32, error) // MUST same with vector index. + LLMFactory llm.Factory } type Group struct { @@ -166,10 +176,11 @@ type Group struct { } type FeedGroup struct { - Name string - Time time.Time - Labels model.Labels - Feeds []*Feed + Name string + Time time.Time + Labels model.Labels + Feeds []*Feed + Summary string } func (g *FeedGroup) ID() string { @@ -216,7 +227,10 @@ type router struct { *component.Base[Config, Dependencies] } -func (r *router) Route(result *rule.Result) (groups []*Group, err error) { +func (r *router) Route(ctx context.Context, result *rule.Result) (groups []*Group, err error) { + ctx = telemetry.StartWith(ctx, append(r.TelemetryLabels(), telemetrymodel.KeyOperation, "Route")...) + defer func() { telemetry.End(ctx, err) }() + // Find route for each feed. feedsByRoute := r.routeFeeds(result.Feeds) @@ -233,12 +247,21 @@ func (r *router) Route(result *rule.Result) (groups []*Group, err error) { // Build final groups. for ls, feeds := range relatedGroups { + var summary string + if prompt := route.SummaryPrompt; prompt != "" && len(feeds) > 1 { + // TODO: Avoid potential for duplicate generation. + summary, err = r.generateSummary(ctx, prompt, feeds, route.SourceLabel) + if err != nil { + return nil, errors.Wrap(err, "generate summary") + } + } groups = append(groups, &Group{ FeedGroup: FeedGroup{ - Name: fmt.Sprintf("%s %s", result.Rule, ls.String()), - Time: result.Time, - Labels: *ls, - Feeds: feeds, + Name: fmt.Sprintf("%s %s", result.Rule, ls.String()), + Time: result.Time, + Labels: *ls, + Feeds: feeds, + Summary: summary, }, Receivers: route.Receivers, }) @@ -252,6 +275,38 @@ func (r *router) Route(result *rule.Result) (groups []*Group, err error) { return groups, nil } +func (r *router) generateSummary(ctx context.Context, prompt string, feeds []*Feed, sourceLabel string) (string, error) { + content := r.parseContentToSummary(feeds, sourceLabel) + if content == "" { + return "", nil + } + + llm := r.Dependencies().LLMFactory.Get(r.Config().LLM) + summary, err := llm.String(ctx, []string{ + content, + prompt, + }) + if err != nil { + return "", errors.Wrap(err, "llm string") + } + + return summary, nil +} + +func (r *router) parseContentToSummary(feeds []*Feed, sourceLabel string) string { + if sourceLabel == "" { + b := runtimeutil.Must1(json.Marshal(feeds)) + return string(b) + } + + var sb strings.Builder + for _, feed := range feeds { + sb.WriteString(feed.Labels.Get(sourceLabel)) + } + + return sb.String() +} + func (r *router) routeFeeds(feeds []*block.FeedVO) map[*Route][]*block.FeedVO { config := r.Config() feedsByRoute := make(map[*Route][]*block.FeedVO) @@ -290,6 +345,12 @@ func (r *router) groupFeedsByLabels(route *Route, feeds []*block.FeedVO) map[*mo groupedFeeds[labelGroup] = append(groupedFeeds[labelGroup], feed) } + for _, feeds := range groupedFeeds { + sort.Slice(feeds, func(i, j int) bool { + return feeds[i].ID < feeds[j].ID + }) + } + return groupedFeeds } @@ -344,6 +405,16 @@ func (r *router) compressRelatedFeedsForGroup( } } + // Sort. + sort.Slice(feedsWithRelated, func(i, j int) bool { + return feedsWithRelated[i].ID < feedsWithRelated[j].ID + }) + for _, feed := range feedsWithRelated { + sort.Slice(feed.Related, func(i, j int) bool { + return feed.Related[i].ID < feed.Related[j].ID + }) + } + return feedsWithRelated, nil } @@ -351,8 +422,8 @@ type mockRouter struct { component.Mock } -func (m *mockRouter) Route(result *rule.Result) (groups []*Group, err error) { - m.Called(result) +func (m *mockRouter) Route(ctx context.Context, result *rule.Result) (groups []*Group, err error) { + m.Called(ctx, result) return groups, err } diff --git a/pkg/notify/route/route_test.go b/pkg/notify/route/route_test.go index 08b2838..9884815 100644 --- a/pkg/notify/route/route_test.go +++ b/pkg/notify/route/route_test.go @@ -1,6 +1,7 @@ package route import ( + "context" "fmt" "testing" "time" @@ -11,6 +12,7 @@ import ( "k8s.io/utils/ptr" "github.com/glidea/zenfeed/pkg/component" + "github.com/glidea/zenfeed/pkg/llm" "github.com/glidea/zenfeed/pkg/model" "github.com/glidea/zenfeed/pkg/schedule/rule" "github.com/glidea/zenfeed/pkg/storage/feed/block" @@ -382,6 +384,11 @@ func TestRoute(t *testing.T) { tt.GivenDetail.relatedScore(&mockDep.Mock) } + llmFactory, err := llm.NewFactory("", nil, llm.FactoryDependencies{}, component.MockOption(func(m *mock.Mock) { + m.On("String", mock.Anything, mock.Anything).Return("test", nil) + })) + Expect(err).NotTo(HaveOccurred()) + routerInstance := &router{ Base: component.New(&component.BaseConfig[Config, Dependencies]{ Name: "TestRouter", @@ -389,11 +396,12 @@ func TestRoute(t *testing.T) { Config: tt.GivenDetail.config, Dependencies: Dependencies{ RelatedScore: mockDep.RelatedScore, + LLMFactory: llmFactory, }, }), } - groups, err := routerInstance.Route(tt.WhenDetail.ruleResult) + groups, err := routerInstance.Route(context.Background(), tt.WhenDetail.ruleResult) if tt.ThenExpected.isErr { Expect(err).To(HaveOccurred()) diff --git a/pkg/rewrite/rewrite.go b/pkg/rewrite/rewrite.go index 4330784..6fc5d24 100644 --- a/pkg/rewrite/rewrite.go +++ b/pkg/rewrite/rewrite.go @@ -465,9 +465,9 @@ func (r *rewriter) Reload(app *config.App) error { return nil } -func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (model.Labels, error) { +func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (rewritten model.Labels, err error) { ctx = telemetry.StartWith(ctx, append(r.TelemetryLabels(), telemetrymodel.KeyOperation, "Labels")...) - defer func() { telemetry.End(ctx, nil) }() + defer func() { telemetry.End(ctx, err) }() rules := *r.Config() for _, rule := range rules { diff --git a/pkg/schedule/rule/periodic.go b/pkg/schedule/rule/periodic.go index ca1b4de..4625a77 100644 --- a/pkg/schedule/rule/periodic.go +++ b/pkg/schedule/rule/periodic.go @@ -77,7 +77,7 @@ func (r *periodic) Run() (err error) { return nil case now := <-tick.C: iter(now) - tick.Reset(3 * time.Minute) + tick.Reset(5 * time.Minute) } } } diff --git a/pkg/storage/feed/block/block.go b/pkg/storage/feed/block/block.go index 67fe785..8b025a0 100644 --- a/pkg/storage/feed/block/block.go +++ b/pkg/storage/feed/block/block.go @@ -523,9 +523,9 @@ type block struct { coldLoaded bool } -func (b *block) Run() error { +func (b *block) Run() (err error) { ctx := telemetry.StartWith(b.Context(), append(b.TelemetryLabels(), telemetrymodel.KeyOperation, "Run")...) - defer func() { telemetry.End(ctx, nil) }() + defer func() { telemetry.End(ctx, err) }() // Maintain metrics. go b.maintainMetrics(ctx) @@ -715,9 +715,9 @@ func (b *block) Query(ctx context.Context, query QueryOptions) (feeds []*FeedVO, return result.Slice(), nil } -func (b *block) Exists(ctx context.Context, id uint64) (bool, error) { +func (b *block) Exists(ctx context.Context, id uint64) (exists bool, err error) { ctx = telemetry.StartWith(ctx, append(b.TelemetryLabels(), telemetrymodel.KeyOperation, "Exists")...) - defer func() { telemetry.End(ctx, nil) }() + defer func() { telemetry.End(ctx, err) }() // Ensure the block is loaded. if err := b.ensureLoaded(ctx); err != nil { diff --git a/pkg/storage/feed/feed.go b/pkg/storage/feed/feed.go index 6d78afb..5c41b72 100644 --- a/pkg/storage/feed/feed.go +++ b/pkg/storage/feed/feed.go @@ -607,9 +607,12 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe }(item) } wg.Wait() - if len(errs) > 0 { + if allFailed := len(errs) == len(feeds); allFailed { return nil, errs[0] } + if len(errs) > 0 { + log.Error(ctx, errors.Wrap(errs[0], "rewrite feeds"), "error_count", len(errs)) + } return rewritten, nil }