Files
zenfeed/pkg/notify/notify.go
2025-06-05 23:29:37 +08:00

475 lines
12 KiB
Go

// Copyright (C) 2025 wangyusong
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package notify
import (
"context"
"fmt"
"reflect"
"runtime"
"sync"
"time"
"github.com/pkg/errors"
"github.com/glidea/zenfeed/pkg/component"
"github.com/glidea/zenfeed/pkg/config"
"github.com/glidea/zenfeed/pkg/llm"
"github.com/glidea/zenfeed/pkg/notify/channel"
"github.com/glidea/zenfeed/pkg/notify/route"
"github.com/glidea/zenfeed/pkg/schedule/rule"
"github.com/glidea/zenfeed/pkg/storage/kv"
"github.com/glidea/zenfeed/pkg/telemetry"
"github.com/glidea/zenfeed/pkg/telemetry/log"
telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model"
timeutil "github.com/glidea/zenfeed/pkg/util/time"
)
// --- Interface code block ---
type Notifier interface {
component.Component
config.Watcher
}
type Config struct {
Route route.Config
Receivers Receivers
Channels channel.Config
}
func (c *Config) Validate() error {
if err := (&c.Route).Validate(); err != nil {
return errors.Wrap(err, "invalid route")
}
if err := (&c.Receivers).Validate(); err != nil {
return errors.Wrap(err, "invalid receivers")
}
if err := (&c.Channels).Validate(); err != nil {
return errors.Wrap(err, "invalid channels")
}
return nil
}
func (c *Config) From(app *config.App) *Config {
c.Route = route.Config{
Route: route.Route{
GroupBy: app.Notify.Route.GroupBy,
SourceLabel: app.Notify.Route.SourceLabel,
SummaryPrompt: app.Notify.Route.SummaryPrompt,
LLM: app.Notify.Route.LLM,
CompressByRelatedThreshold: app.Notify.Route.CompressByRelatedThreshold,
Receivers: app.Notify.Route.Receivers,
},
}
for i := range app.Notify.Route.SubRoutes {
c.Route.SubRoutes = append(c.Route.SubRoutes, convertSubRoute(&app.Notify.Route.SubRoutes[i]))
}
c.Receivers = make(Receivers, len(app.Notify.Receivers))
for i := range app.Notify.Receivers {
c.Receivers[i] = Receiver{
Name: app.Notify.Receivers[i].Name,
}
if app.Notify.Receivers[i].Email != "" {
c.Receivers[i].Email = app.Notify.Receivers[i].Email
}
if app.Notify.Receivers[i].Webhook != nil {
c.Receivers[i].Webhook = &channel.WebhookReceiver{URL: app.Notify.Receivers[i].Webhook.URL}
}
}
c.Channels = channel.Config{}
if app.Notify.Channels.Email != nil {
c.Channels.Email = &channel.Email{
SmtpEndpoint: app.Notify.Channels.Email.SmtpEndpoint,
From: app.Notify.Channels.Email.From,
Password: app.Notify.Channels.Email.Password,
FeedMarkdownTemplate: app.Notify.Channels.Email.FeedMarkdownTemplate,
FeedHTMLSnippetTemplate: app.Notify.Channels.Email.FeedHTMLSnippetTemplate,
}
}
return c
}
func convertSubRoute(from *config.NotifySubRoute) *route.SubRoute {
to := &route.SubRoute{
Route: route.Route{
GroupBy: from.GroupBy,
SourceLabel: from.SourceLabel,
SummaryPrompt: from.SummaryPrompt,
LLM: from.LLM,
CompressByRelatedThreshold: from.CompressByRelatedThreshold,
Receivers: from.Receivers,
},
}
to.Matchers = from.Matchers
to.Receivers = from.Receivers
for i := range from.SubRoutes {
to.SubRoutes = append(to.SubRoutes, convertSubRoute(&from.SubRoutes[i]))
}
return to
}
type Receivers []Receiver
func (rs Receivers) Validate() error {
names := make(map[string]bool)
for i := range rs {
r := &rs[i]
if err := r.Validate(); err != nil {
return errors.Wrap(err, "invalid receiver")
}
if _, ok := names[r.Name]; ok {
return errors.New("receiver name must be unique")
}
names[r.Name] = true
}
return nil
}
func (r Receivers) get(name string) *Receiver {
for _, receiver := range r {
if receiver.Name == name {
return &receiver
}
}
return nil
}
type Receiver struct {
channel.Receiver
Name string
}
func (r *Receiver) Validate() error {
if r.Name == "" {
return errors.New("name is required")
}
if err := (&r.Receiver).Validate(); err != nil {
return errors.Wrap(err, "invalid receiver")
}
return nil
}
type Dependencies struct {
In <-chan *rule.Result
RelatedScore func(a, b [][]float32) (float32, error)
RouterFactory route.Factory
ChannelFactory channel.Factory
KVStorage kv.Storage
LLMFactory llm.Factory
}
// --- Factory code block ---
type Factory component.Factory[Notifier, config.App, Dependencies]
func NewFactory(mockOn ...component.MockOption) Factory {
if len(mockOn) > 0 {
return component.FactoryFunc[Notifier, config.App, Dependencies](
func(instance string, app *config.App, dependencies Dependencies) (Notifier, error) {
m := &mockNotifier{}
component.MockOptions(mockOn).Apply(&m.Mock)
return m, nil
},
)
}
return component.FactoryFunc[Notifier, config.App, Dependencies](new)
}
func new(instance string, app *config.App, dependencies Dependencies) (Notifier, error) {
config := &Config{}
config.From(app)
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid config")
}
n := &notifier{
Base: component.New(&component.BaseConfig[Config, Dependencies]{
Name: "Notifier",
Instance: instance,
Config: config,
Dependencies: dependencies,
}),
channelSendWork: make(chan sendWork, 100),
}
router, err := n.newRouter(&config.Route)
if err != nil {
return nil, errors.Wrap(err, "create router")
}
n.router = router
channel, err := n.newChannel(&config.Channels)
if err != nil {
return nil, errors.Wrap(err, "create channel")
}
n.channel = channel
return n, nil
}
// --- Implementation code block ---
type notifier struct {
*component.Base[Config, Dependencies]
router route.Router
channel channel.Channel
channelSendWork chan sendWork
mu sync.RWMutex
}
var sendConcurrency = runtime.NumCPU() * 2
func (n *notifier) Run() (err error) {
ctx := telemetry.StartWith(n.Context(), append(n.TelemetryLabels(), telemetrymodel.KeyOperation, "Run")...)
defer func() { telemetry.End(ctx, err) }()
if err := component.RunUntilReady(n.Context(), n.router, 10*time.Second); err != nil {
return errors.Wrap(err, "router not ready")
}
if err := component.RunUntilReady(n.Context(), n.channel, 10*time.Second); err != nil {
return errors.Wrap(err, "channel not ready")
}
for i := range sendConcurrency {
go n.sendWorker(i)
}
n.MarkReady()
for {
select {
case <-ctx.Done():
return nil
case result := <-n.Dependencies().In:
n.handle(ctx, result)
}
}
}
func (n *notifier) Close() error {
if err := n.Base.Close(); err != nil {
return errors.Wrap(err, "close base")
}
if err := n.router.Close(); err != nil {
return errors.Wrap(err, "close router")
}
if err := n.channel.Close(); err != nil {
return errors.Wrap(err, "close channel")
}
close(n.channelSendWork)
return nil
}
func (n *notifier) Reload(app *config.App) error {
newConfig := &Config{}
newConfig.From(app)
if err := newConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid config")
}
if reflect.DeepEqual(n.Config(), newConfig) {
log.Debug(n.Context(), "no changes in notify config")
return nil
}
router, err := n.newRouter(&route.Config{Route: newConfig.Route.Route})
if err != nil {
return errors.Wrap(err, "create router")
}
if component.RunUntilReady(n.Context(), router, 10*time.Second) != nil {
return errors.New("router not ready")
}
channel, err := n.newChannel(&channel.Config{Email: newConfig.Channels.Email})
if err != nil {
return errors.Wrap(err, "create email")
}
if component.RunUntilReady(n.Context(), channel, 10*time.Second) != nil {
return errors.New("channel not ready")
}
if err := n.router.Close(); err != nil {
log.Error(n.Context(), errors.Wrap(err, "close router"))
}
if err := n.channel.Close(); err != nil {
log.Error(n.Context(), errors.Wrap(err, "close channel"))
}
n.mu.Lock()
defer n.mu.Unlock()
n.SetConfig(newConfig)
n.router = router
n.channel = channel
return nil
}
func (n *notifier) newRouter(config *route.Config) (route.Router, error) {
return n.Dependencies().RouterFactory.New(
n.Instance(),
config,
route.Dependencies{
RelatedScore: n.Dependencies().RelatedScore,
LLMFactory: n.Dependencies().LLMFactory,
},
)
}
func (n *notifier) newChannel(config *channel.Config) (channel.Channel, error) {
return n.Dependencies().ChannelFactory.New(
n.Instance(),
config,
channel.Dependencies{},
)
}
func (n *notifier) handle(ctx context.Context, result *rule.Result) {
n.mu.RLock()
router := n.router
n.mu.RUnlock()
groups, err := router.Route(ctx, result)
if err != nil {
// We don't retry in notifier, retry should be upstream.
log.Error(ctx, errors.Wrap(err, "route"))
return
}
for _, group := range groups {
for i := range group.Receivers {
n.trySummitSendWork(ctx, group, group.Receivers[i])
}
}
}
func (n *notifier) trySummitSendWork(ctx context.Context, group *route.Group, receiverName string) {
config := n.Config()
receiver := config.Receivers.get(receiverName)
if receiver == nil {
log.Error(ctx, errors.New("receiver not found"), "receiver", receiverName)
return
}
if n.isSent(ctx, &group.FeedGroup, *receiver) {
log.Debug(ctx, "already sent")
return
}
n.channelSendWork <- sendWork{
group: &group.FeedGroup,
receiver: *receiver,
}
}
func (n *notifier) sendWorker(i int) {
for {
select {
case <-n.Context().Done():
return
case work := <-n.channelSendWork:
workCtx := telemetry.StartWith(n.Context(),
append(n.TelemetryLabels(),
telemetrymodel.KeyOperation, "Run",
"worker", i,
"group", work.group.Name,
"time", timeutil.Format(work.group.Time),
"receiver", work.receiver.Name,
)...,
)
defer func() { telemetry.End(workCtx, nil) }()
workCtx, cancel := context.WithTimeout(workCtx, 30*time.Second)
defer cancel()
if err := n.duplicateSend(workCtx, work); err != nil {
log.Error(workCtx, err, "duplicate send")
continue
}
log.Info(workCtx, "send success")
}
}
}
func (n *notifier) duplicateSend(ctx context.Context, work sendWork) error {
if n.isSent(ctx, work.group, work.receiver) { // Double check.
return nil
}
if err := n.send(ctx, work); err != nil {
return errors.Wrap(err, "send")
}
if err := n.markSent(ctx, work.group, work.receiver); err != nil {
log.Error(ctx, errors.Wrap(err, "set nlog, may duplicate sending in next time"))
}
return nil
}
func (n *notifier) send(ctx context.Context, work sendWork) error {
n.mu.RLock()
channel := n.channel
n.mu.RUnlock()
return channel.Send(ctx, work.receiver.Receiver, work.group)
}
var nlogKey = func(group *route.FeedGroup, receiver Receiver) []byte {
return fmt.Appendf(nil, "notifier.group.%s.receiver.%s.%d", group.Name, receiver.Name, group.Time.Unix())
}
func (n *notifier) isSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) bool {
_, err := n.Dependencies().KVStorage.Get(ctx, nlogKey(group, receiver))
switch {
case err == nil:
return true // Already sent.
case errors.Is(err, kv.ErrNotFound):
return false
default:
log.Warn(ctx, errors.Wrap(err, "get nlog, continue sending"))
return false
}
}
func (n *notifier) markSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) error {
return n.Dependencies().KVStorage.Set(ctx, nlogKey(group, receiver), []byte(timeutil.Format(time.Now())), timeutil.Day)
}
type sendWork struct {
group *route.FeedGroup
receiver Receiver
}
type mockNotifier struct {
component.Mock
}
func (m *mockNotifier) Reload(app *config.App) error {
return m.Called(app).Error(0)
}