add summary for notification & misc fix
This commit is contained in:
@@ -142,13 +142,16 @@
|
||||
|
||||
此结构可以使用 `sub_routes` 进行嵌套。Feed 会首先尝试匹配子路由;如果没有子路由匹配,则应用父路由的配置。
|
||||
|
||||
| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
|
||||
| :--------------------------------- | :----------- | :-------------------------------------------------------------------------------------------------------- | :----- | :------------ |
|
||||
| `...matchers` (仅子路由) | `字符串列表` | 标签匹配器,用于确定 Feed 是否属于此子路由。例如 `["category=tech", "source!=github"]`。 | `[]` | 是 (仅子路由) |
|
||||
| `...receivers` | `字符串列表` | 接收者的名称列表 (在 `notify.receivers` 中定义),用于发送匹配此路由的 Feed 的通知。 | `[]` | 是 (至少一个) |
|
||||
| `...group_by` | `字符串列表` | 在发送通知前用于对 Feed 进行分组的标签列表。每个分组会产生一个单独的通知。例如 `["source", "category"]`。 | `[]` | 是 (至少一个) |
|
||||
| `...compress_by_related_threshold` | `*float32` | 如果设置,则根据语义相关性压缩分组内高度相似的 Feed,仅发送一个代表。阈值 (0-1),越高表示越相似。 | `0.85` | 否 |
|
||||
| `...sub_routes` | `对象列表` | 嵌套路由列表。允许定义更具体的路由规则。每个对象遵循 **通知路由配置**。 | `[]` | 否 |
|
||||
| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
|
||||
| :--------------------------------- | :----------- | :-------------------------------------------------------------------------------------------------------- | :---------------------- | :------------ |
|
||||
| `...matchers` (仅子路由) | `字符串列表` | 标签匹配器,用于确定 Feed 是否属于此子路由。例如 `["category=tech", "source!=github"]`。 | `[]` | 是 (仅子路由) |
|
||||
| `...receivers` | `字符串列表` | 接收者的名称列表 (在 `notify.receivers` 中定义),用于发送匹配此路由的 Feed 的通知。 | `[]` | 是 (至少一个) |
|
||||
| `...group_by` | `字符串列表` | 在发送通知前用于对 Feed 进行分组的标签列表。每个分组会产生一个单独的通知。例如 `["source", "category"]`。 | `[]` | 是 (至少一个) |
|
||||
| `...source_label` | `string` | 从每个 Feed 中提取内容并进行总结的源标签。默认为所有标签。强烈建议设置为 'summary' 以减少上下文长度。 | 所有标签 | 否 |
|
||||
| `...summary_prompt` | `string` | 用于总结每个分组的 Feed 的 Prompt。 | | 否 |
|
||||
| `...llm` | `string` | 使用的 LLM 的名称。默认为 `llms` 部分中的默认 LLM。建议使用上下文长度较大的 LLM。 | `llms` 部分中的默认 LLM | 否 |
|
||||
| `...compress_by_related_threshold` | `*float32` | 如果设置,则根据语义相关性压缩分组内高度相似的 Feed,仅发送一个代表。阈值 (0-1),越高表示越相似。 | `0.85` | 否 |
|
||||
| `...sub_routes` | `对象列表` | 嵌套路由列表。允许定义更具体的路由规则。每个对象遵循 **通知路由配置**。 | `[]` | 否 |
|
||||
|
||||
### 通知接收者配置 (`notify.receivers[]`)
|
||||
|
||||
|
||||
@@ -142,13 +142,16 @@ Defines rules for querying and monitoring feeds.
|
||||
|
||||
This structure can be nested using `sub_routes`. A feed is matched against sub-routes first; if no sub-route matches, the parent route's configuration applies.
|
||||
|
||||
| Field | Type | Description | Default | Required |
|
||||
| :--------------------------------- | :-------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------ | :------------------- |
|
||||
| `...matchers` (only in sub-routes) | list of strings | Label matchers to determine if a feed belongs to this sub-route. e.g. `["category=tech", "source!=github"]`. | `[]` | Yes (for sub-routes) |
|
||||
| `...receivers` | list of strings | Names of the receivers (defined in `notify.receivers`) to send notifications for feeds matching this route. | `[]` | Yes (at least one) |
|
||||
| `...group_by` | list of strings | Labels to group feeds by before sending notifications. Each group results in a separate notification. e.g., `["source", "category"]`. | `[]` | Yes (at least one) |
|
||||
| `...compress_by_related_threshold` | *float32 | If set, compresses highly similar feeds (based on semantic relatedness) within a group, sending only one representative. Threshold (0-1). Higher means more similar. | `0.85` | No |
|
||||
| `...sub_routes` | list of objects | Nested routes. Allows defining more specific routing rules. Each object follows the **Notify Route Configuration**. | `[]` | No |
|
||||
| Field | Type | Description | Default | Required |
|
||||
| :--------------------------------- | :-------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :---------------------------- | :------------------- |
|
||||
| `...matchers` (only in sub-routes) | list of strings | Label matchers to determine if a feed belongs to this sub-route. e.g. `["category=tech", "source!=github"]`. | `[]` | Yes (for sub-routes) |
|
||||
| `...receivers` | list of strings | Names of the receivers (defined in `notify.receivers`) to send notifications for feeds matching this route. | `[]` | Yes (at least one) |
|
||||
| `...group_by` | list of strings | Labels to group feeds by before sending notifications. Each group results in a separate notification. e.g., `["source", "category"]`. | `[]` | Yes (at least one) |
|
||||
| `...source_label` | string | The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length. | all labels | No |
|
||||
| `...summary_prompt` | string | The prompt to summarize the feeds of each group. | | No |
|
||||
| `...llm` | string | The LLM name to use. Default is the default LLM in `llms` section. A large context length LLM is recommended. | default LLM in `llms` section | No |
|
||||
| `...compress_by_related_threshold` | *float32 | If set, compresses highly similar feeds (based on semantic relatedness) within a group, sending only one representative. Threshold (0-1). Higher means more similar. | `0.85` | No |
|
||||
| `...sub_routes` | list of objects | Nested routes. Allows defining more specific routing rules. Each object follows the **Notify Route Configuration**. | `[]` | No |
|
||||
|
||||
### Notify Receiver Configuration (`notify.receivers[]`)
|
||||
|
||||
|
||||
1
main.go
1
main.go
@@ -355,6 +355,7 @@ func (a *App) setupNotifier() (err error) {
|
||||
RouterFactory: route.NewFactory(),
|
||||
ChannelFactory: channel.NewFactory(),
|
||||
KVStorage: a.kvStorage,
|
||||
LLMFactory: a.llmFactory,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -145,6 +145,9 @@ type SchedulsRule struct {
|
||||
type NotifyRoute struct {
|
||||
Receivers []string `yaml:"receivers,omitempty" json:"receivers,omitempty" desc:"The notify receivers. It is required, at least one receiver is needed."`
|
||||
GroupBy []string `yaml:"group_by,omitempty" json:"group_by,omitempty" desc:"The group by config to group the feeds, each group will be notified individually. It is required, at least one group by is needed."`
|
||||
SourceLabel string `yaml:"source_label,omitempty" json:"source_label,omitempty" desc:"The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length."`
|
||||
SummaryPrompt string `yaml:"summary_prompt,omitempty" json:"summary_prompt,omitempty" desc:"The prompt to summarize the feeds of each group."`
|
||||
LLM string `yaml:"llm,omitempty" json:"llm,omitempty" desc:"The LLM name to use. Default is the default LLM in llms section. A large context length LLM is recommended."`
|
||||
CompressByRelatedThreshold *float32 `yaml:"compress_by_related_threshold,omitempty" json:"compress_by_related_threshold,omitempty" desc:"The threshold to compress the feeds by relatedness, that is, if the feeds are too similar, only one will be notified. Default is 0.85."`
|
||||
SubRoutes []NotifySubRoute `yaml:"sub_routes,omitempty" json:"sub_routes,omitempty" desc:"The sub routes to notify the feeds. A feed prefers to be matched by the sub routes, if not matched, it will be matched by the parent route."`
|
||||
}
|
||||
@@ -154,6 +157,9 @@ type NotifySubRoute struct {
|
||||
|
||||
Receivers []string `yaml:"receivers,omitempty" json:"receivers,omitempty" desc:"The notify receivers. It is required, at least one receiver is needed."`
|
||||
GroupBy []string `yaml:"group_by,omitempty" json:"group_by,omitempty" desc:"The group by config to group the feeds, each group will be notified individually. It is required, at least one group by is needed."`
|
||||
SourceLabel string `yaml:"source_label,omitempty" json:"source_label,omitempty" desc:"The source label to extract the content from each feed, and summarize them. Default are all labels. It is very recommended to set it to 'summary' to reduce context length."`
|
||||
SummaryPrompt string `yaml:"summary_prompt,omitempty" json:"summary_prompt,omitempty" desc:"The prompt to summarize the feeds of each group."`
|
||||
LLM string `yaml:"llm,omitempty" json:"llm,omitempty" desc:"The LLM name to use. Default is the default LLM in llms section. A large context length LLM is recommended."`
|
||||
CompressByRelatedThreshold *float32 `yaml:"compress_by_related_threshold,omitempty" json:"compress_by_related_threshold,omitempty" desc:"The threshold to compress the feeds by relatedness, that is, if the feeds are too similar, only one will be notified. Default is 0.85."`
|
||||
SubRoutes []NotifySubRoute `yaml:"sub_routes,omitempty" json:"sub_routes,omitempty" desc:"The sub routes to notify the feeds. A feed prefers to be matched by the sub routes, if not matched, it will be matched by the parent route."`
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ func (e *email) buildEmail(receiver Receiver, group *route.FeedGroup) (*gomail.M
|
||||
m.SetHeader("To", receiver.Email)
|
||||
m.SetHeader("Subject", group.Name)
|
||||
|
||||
body, err := e.buildBodyHTML(group.Feeds)
|
||||
body, err := e.buildBodyHTML(group)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "build email body HTML")
|
||||
}
|
||||
@@ -139,7 +139,7 @@ func (e *email) buildEmail(receiver Receiver, group *route.FeedGroup) (*gomail.M
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (e *email) buildBodyHTML(feeds []*route.Feed) ([]byte, error) {
|
||||
func (e *email) buildBodyHTML(group *route.FeedGroup) ([]byte, error) {
|
||||
bodyBuf := buffer.Get()
|
||||
defer buffer.Put(bodyBuf)
|
||||
|
||||
@@ -148,14 +148,24 @@ func (e *email) buildBodyHTML(feeds []*route.Feed) ([]byte, error) {
|
||||
return nil, errors.Wrap(err, "write HTML header")
|
||||
}
|
||||
|
||||
// Write summary.
|
||||
if err := e.writeSummary(bodyBuf, group.Summary); err != nil {
|
||||
return nil, errors.Wrap(err, "write summary")
|
||||
}
|
||||
|
||||
// Write each feed content.
|
||||
for i, feed := range feeds {
|
||||
if _, err := bodyBuf.WriteString(`
|
||||
<div style="margin-top:20px; padding-top:15px; border-top:1px solid #f1f3f4;">
|
||||
<p style="font-size:32px; font-weight:500; margin:0 0 10px 0;">Feeds</p>`); err != nil {
|
||||
return nil, errors.Wrap(err, "write feeds header")
|
||||
}
|
||||
for i, feed := range group.Feeds {
|
||||
if err := e.writeFeedContent(bodyBuf, feed); err != nil {
|
||||
return nil, errors.Wrap(err, "write feed content")
|
||||
}
|
||||
|
||||
// Add separator (except the last feed).
|
||||
if i < len(feeds)-1 {
|
||||
if i < len(group.Feeds)-1 {
|
||||
if err := e.writeSeparator(bodyBuf); err != nil {
|
||||
return nil, errors.Wrap(err, "write separator")
|
||||
}
|
||||
@@ -188,6 +198,29 @@ func (e *email) writeHTMLHeader(buf *buffer.Bytes) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (e *email) writeSummary(buf *buffer.Bytes, summary string) error {
|
||||
if summary == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := buf.WriteString(`
|
||||
<p style="font-size:32px; font-weight:500; margin:0 0 10px 0;">Summary</p>`); err != nil {
|
||||
return errors.Wrap(err, "write summary header")
|
||||
}
|
||||
|
||||
contentHTML, err := textconvert.MarkdownToHTML([]byte(summary))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "markdown to HTML")
|
||||
}
|
||||
|
||||
contentHTMLWithStyle := fmt.Sprintf(`<div style="font-size:16px; line-height:1.8;">%s</div>`, contentHTML)
|
||||
if _, err := buf.WriteString(contentHTMLWithStyle); err != nil {
|
||||
return errors.Wrap(err, "write summary")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
const timeLayout = "01-02 15:04"
|
||||
|
||||
func (e *email) writeFeedContent(buf *buffer.Bytes, feed *route.Feed) error {
|
||||
@@ -311,7 +344,8 @@ func (e *email) renderMarkdownContent(buf *buffer.Bytes, feed *route.Feed) (n in
|
||||
return 0, errors.Wrap(err, "markdown to HTML")
|
||||
}
|
||||
|
||||
if _, err := buf.Write(contentHTML); err != nil {
|
||||
contentHTMLWithStyle := fmt.Sprintf(`<div style="font-size:16px; line-height:1.8;">%s</div>`, contentHTML)
|
||||
if _, err := buf.WriteString(contentHTMLWithStyle); err != nil {
|
||||
return 0, errors.Wrap(err, "write content HTML")
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
|
||||
"github.com/glidea/zenfeed/pkg/component"
|
||||
"github.com/glidea/zenfeed/pkg/config"
|
||||
"github.com/glidea/zenfeed/pkg/llm"
|
||||
"github.com/glidea/zenfeed/pkg/notify/channel"
|
||||
"github.com/glidea/zenfeed/pkg/notify/route"
|
||||
"github.com/glidea/zenfeed/pkg/schedule/rule"
|
||||
@@ -67,6 +68,9 @@ func (c *Config) From(app *config.App) *Config {
|
||||
c.Route = route.Config{
|
||||
Route: route.Route{
|
||||
GroupBy: app.Notify.Route.GroupBy,
|
||||
SourceLabel: app.Notify.Route.SourceLabel,
|
||||
SummaryPrompt: app.Notify.Route.SummaryPrompt,
|
||||
LLM: app.Notify.Route.LLM,
|
||||
CompressByRelatedThreshold: app.Notify.Route.CompressByRelatedThreshold,
|
||||
Receivers: app.Notify.Route.Receivers,
|
||||
},
|
||||
@@ -105,6 +109,9 @@ func convertSubRoute(from *config.NotifySubRoute) *route.SubRoute {
|
||||
to := &route.SubRoute{
|
||||
Route: route.Route{
|
||||
GroupBy: from.GroupBy,
|
||||
SourceLabel: from.SourceLabel,
|
||||
SummaryPrompt: from.SummaryPrompt,
|
||||
LLM: from.LLM,
|
||||
CompressByRelatedThreshold: from.CompressByRelatedThreshold,
|
||||
Receivers: from.Receivers,
|
||||
},
|
||||
@@ -169,6 +176,7 @@ type Dependencies struct {
|
||||
RouterFactory route.Factory
|
||||
ChannelFactory channel.Factory
|
||||
KVStorage kv.Storage
|
||||
LLMFactory llm.Factory
|
||||
}
|
||||
|
||||
// --- Factory code block ---
|
||||
@@ -322,7 +330,10 @@ func (n *notifier) newRouter(config *route.Config) (route.Router, error) {
|
||||
return n.Dependencies().RouterFactory.New(
|
||||
n.Instance(),
|
||||
config,
|
||||
route.Dependencies{RelatedScore: n.Dependencies().RelatedScore},
|
||||
route.Dependencies{
|
||||
RelatedScore: n.Dependencies().RelatedScore,
|
||||
LLMFactory: n.Dependencies().LLMFactory,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
@@ -339,7 +350,7 @@ func (n *notifier) handle(ctx context.Context, result *rule.Result) {
|
||||
router := n.router
|
||||
n.mu.RUnlock()
|
||||
|
||||
groups, err := router.Route(result)
|
||||
groups, err := router.Route(ctx, result)
|
||||
if err != nil {
|
||||
// We don't retry in notifier, retry should be upstream.
|
||||
log.Error(ctx, errors.Wrap(err, "route"))
|
||||
@@ -428,7 +439,7 @@ func (n *notifier) send(ctx context.Context, work sendWork) error {
|
||||
}
|
||||
|
||||
var nlogKey = func(group *route.FeedGroup, receiver Receiver) string {
|
||||
return fmt.Sprintf("notifier.group.%s.receiver.%s", group.Name, receiver.Name)
|
||||
return fmt.Sprintf("notifier.group.%s.receiver.%s.%d", group.Name, receiver.Name, group.Time.Unix())
|
||||
}
|
||||
|
||||
func (n *notifier) isSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) bool {
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -25,16 +27,20 @@ import (
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/glidea/zenfeed/pkg/component"
|
||||
"github.com/glidea/zenfeed/pkg/llm"
|
||||
"github.com/glidea/zenfeed/pkg/model"
|
||||
"github.com/glidea/zenfeed/pkg/schedule/rule"
|
||||
"github.com/glidea/zenfeed/pkg/storage/feed/block"
|
||||
"github.com/glidea/zenfeed/pkg/telemetry"
|
||||
telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model"
|
||||
runtimeutil "github.com/glidea/zenfeed/pkg/util/runtime"
|
||||
timeutil "github.com/glidea/zenfeed/pkg/util/time"
|
||||
)
|
||||
|
||||
// --- Interface code block ---
|
||||
type Router interface {
|
||||
component.Component
|
||||
Route(result *rule.Result) (groups []*Group, err error)
|
||||
Route(ctx context.Context, result *rule.Result) (groups []*Group, err error)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -43,6 +49,9 @@ type Config struct {
|
||||
|
||||
type Route struct {
|
||||
GroupBy []string
|
||||
SourceLabel string
|
||||
SummaryPrompt string
|
||||
LLM string
|
||||
CompressByRelatedThreshold *float32
|
||||
Receivers []string
|
||||
SubRoutes SubRoutes
|
||||
@@ -158,6 +167,7 @@ func (c *Config) Validate() error {
|
||||
|
||||
type Dependencies struct {
|
||||
RelatedScore func(a, b [][]float32) (float32, error) // MUST same with vector index.
|
||||
LLMFactory llm.Factory
|
||||
}
|
||||
|
||||
type Group struct {
|
||||
@@ -166,10 +176,11 @@ type Group struct {
|
||||
}
|
||||
|
||||
type FeedGroup struct {
|
||||
Name string
|
||||
Time time.Time
|
||||
Labels model.Labels
|
||||
Feeds []*Feed
|
||||
Name string
|
||||
Time time.Time
|
||||
Labels model.Labels
|
||||
Feeds []*Feed
|
||||
Summary string
|
||||
}
|
||||
|
||||
func (g *FeedGroup) ID() string {
|
||||
@@ -216,7 +227,10 @@ type router struct {
|
||||
*component.Base[Config, Dependencies]
|
||||
}
|
||||
|
||||
func (r *router) Route(result *rule.Result) (groups []*Group, err error) {
|
||||
func (r *router) Route(ctx context.Context, result *rule.Result) (groups []*Group, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(r.TelemetryLabels(), telemetrymodel.KeyOperation, "Route")...)
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
|
||||
// Find route for each feed.
|
||||
feedsByRoute := r.routeFeeds(result.Feeds)
|
||||
|
||||
@@ -233,12 +247,21 @@ func (r *router) Route(result *rule.Result) (groups []*Group, err error) {
|
||||
|
||||
// Build final groups.
|
||||
for ls, feeds := range relatedGroups {
|
||||
var summary string
|
||||
if prompt := route.SummaryPrompt; prompt != "" && len(feeds) > 1 {
|
||||
// TODO: Avoid potential for duplicate generation.
|
||||
summary, err = r.generateSummary(ctx, prompt, feeds, route.SourceLabel)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "generate summary")
|
||||
}
|
||||
}
|
||||
groups = append(groups, &Group{
|
||||
FeedGroup: FeedGroup{
|
||||
Name: fmt.Sprintf("%s %s", result.Rule, ls.String()),
|
||||
Time: result.Time,
|
||||
Labels: *ls,
|
||||
Feeds: feeds,
|
||||
Name: fmt.Sprintf("%s %s", result.Rule, ls.String()),
|
||||
Time: result.Time,
|
||||
Labels: *ls,
|
||||
Feeds: feeds,
|
||||
Summary: summary,
|
||||
},
|
||||
Receivers: route.Receivers,
|
||||
})
|
||||
@@ -252,6 +275,38 @@ func (r *router) Route(result *rule.Result) (groups []*Group, err error) {
|
||||
return groups, nil
|
||||
}
|
||||
|
||||
func (r *router) generateSummary(ctx context.Context, prompt string, feeds []*Feed, sourceLabel string) (string, error) {
|
||||
content := r.parseContentToSummary(feeds, sourceLabel)
|
||||
if content == "" {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
llm := r.Dependencies().LLMFactory.Get(r.Config().LLM)
|
||||
summary, err := llm.String(ctx, []string{
|
||||
content,
|
||||
prompt,
|
||||
})
|
||||
if err != nil {
|
||||
return "", errors.Wrap(err, "llm string")
|
||||
}
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (r *router) parseContentToSummary(feeds []*Feed, sourceLabel string) string {
|
||||
if sourceLabel == "" {
|
||||
b := runtimeutil.Must1(json.Marshal(feeds))
|
||||
return string(b)
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
for _, feed := range feeds {
|
||||
sb.WriteString(feed.Labels.Get(sourceLabel))
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func (r *router) routeFeeds(feeds []*block.FeedVO) map[*Route][]*block.FeedVO {
|
||||
config := r.Config()
|
||||
feedsByRoute := make(map[*Route][]*block.FeedVO)
|
||||
@@ -290,6 +345,12 @@ func (r *router) groupFeedsByLabels(route *Route, feeds []*block.FeedVO) map[*mo
|
||||
groupedFeeds[labelGroup] = append(groupedFeeds[labelGroup], feed)
|
||||
}
|
||||
|
||||
for _, feeds := range groupedFeeds {
|
||||
sort.Slice(feeds, func(i, j int) bool {
|
||||
return feeds[i].ID < feeds[j].ID
|
||||
})
|
||||
}
|
||||
|
||||
return groupedFeeds
|
||||
}
|
||||
|
||||
@@ -344,6 +405,16 @@ func (r *router) compressRelatedFeedsForGroup(
|
||||
}
|
||||
}
|
||||
|
||||
// Sort.
|
||||
sort.Slice(feedsWithRelated, func(i, j int) bool {
|
||||
return feedsWithRelated[i].ID < feedsWithRelated[j].ID
|
||||
})
|
||||
for _, feed := range feedsWithRelated {
|
||||
sort.Slice(feed.Related, func(i, j int) bool {
|
||||
return feed.Related[i].ID < feed.Related[j].ID
|
||||
})
|
||||
}
|
||||
|
||||
return feedsWithRelated, nil
|
||||
}
|
||||
|
||||
@@ -351,8 +422,8 @@ type mockRouter struct {
|
||||
component.Mock
|
||||
}
|
||||
|
||||
func (m *mockRouter) Route(result *rule.Result) (groups []*Group, err error) {
|
||||
m.Called(result)
|
||||
func (m *mockRouter) Route(ctx context.Context, result *rule.Result) (groups []*Group, err error) {
|
||||
m.Called(ctx, result)
|
||||
|
||||
return groups, err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package route
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/glidea/zenfeed/pkg/component"
|
||||
"github.com/glidea/zenfeed/pkg/llm"
|
||||
"github.com/glidea/zenfeed/pkg/model"
|
||||
"github.com/glidea/zenfeed/pkg/schedule/rule"
|
||||
"github.com/glidea/zenfeed/pkg/storage/feed/block"
|
||||
@@ -382,6 +384,11 @@ func TestRoute(t *testing.T) {
|
||||
tt.GivenDetail.relatedScore(&mockDep.Mock)
|
||||
}
|
||||
|
||||
llmFactory, err := llm.NewFactory("", nil, llm.FactoryDependencies{}, component.MockOption(func(m *mock.Mock) {
|
||||
m.On("String", mock.Anything, mock.Anything).Return("test", nil)
|
||||
}))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
routerInstance := &router{
|
||||
Base: component.New(&component.BaseConfig[Config, Dependencies]{
|
||||
Name: "TestRouter",
|
||||
@@ -389,11 +396,12 @@ func TestRoute(t *testing.T) {
|
||||
Config: tt.GivenDetail.config,
|
||||
Dependencies: Dependencies{
|
||||
RelatedScore: mockDep.RelatedScore,
|
||||
LLMFactory: llmFactory,
|
||||
},
|
||||
}),
|
||||
}
|
||||
|
||||
groups, err := routerInstance.Route(tt.WhenDetail.ruleResult)
|
||||
groups, err := routerInstance.Route(context.Background(), tt.WhenDetail.ruleResult)
|
||||
|
||||
if tt.ThenExpected.isErr {
|
||||
Expect(err).To(HaveOccurred())
|
||||
|
||||
@@ -465,9 +465,9 @@ func (r *rewriter) Reload(app *config.App) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (model.Labels, error) {
|
||||
func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (rewritten model.Labels, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(r.TelemetryLabels(), telemetrymodel.KeyOperation, "Labels")...)
|
||||
defer func() { telemetry.End(ctx, nil) }()
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
|
||||
rules := *r.Config()
|
||||
for _, rule := range rules {
|
||||
|
||||
@@ -77,7 +77,7 @@ func (r *periodic) Run() (err error) {
|
||||
return nil
|
||||
case now := <-tick.C:
|
||||
iter(now)
|
||||
tick.Reset(3 * time.Minute)
|
||||
tick.Reset(5 * time.Minute)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -523,9 +523,9 @@ type block struct {
|
||||
coldLoaded bool
|
||||
}
|
||||
|
||||
func (b *block) Run() error {
|
||||
func (b *block) Run() (err error) {
|
||||
ctx := telemetry.StartWith(b.Context(), append(b.TelemetryLabels(), telemetrymodel.KeyOperation, "Run")...)
|
||||
defer func() { telemetry.End(ctx, nil) }()
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
|
||||
// Maintain metrics.
|
||||
go b.maintainMetrics(ctx)
|
||||
@@ -715,9 +715,9 @@ func (b *block) Query(ctx context.Context, query QueryOptions) (feeds []*FeedVO,
|
||||
return result.Slice(), nil
|
||||
}
|
||||
|
||||
func (b *block) Exists(ctx context.Context, id uint64) (bool, error) {
|
||||
func (b *block) Exists(ctx context.Context, id uint64) (exists bool, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(b.TelemetryLabels(), telemetrymodel.KeyOperation, "Exists")...)
|
||||
defer func() { telemetry.End(ctx, nil) }()
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
|
||||
// Ensure the block is loaded.
|
||||
if err := b.ensureLoaded(ctx); err != nil {
|
||||
|
||||
@@ -607,9 +607,12 @@ func (s *storage) rewrite(ctx context.Context, feeds []*model.Feed) ([]*model.Fe
|
||||
}(item)
|
||||
}
|
||||
wg.Wait()
|
||||
if len(errs) > 0 {
|
||||
if allFailed := len(errs) == len(feeds); allFailed {
|
||||
return nil, errs[0]
|
||||
}
|
||||
if len(errs) > 0 {
|
||||
log.Error(ctx, errors.Wrap(errs[0], "rewrite feeds"), "error_count", len(errs))
|
||||
}
|
||||
|
||||
return rewritten, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user