diff --git a/Makefile b/Makefile
index 8a8a492..ff676c4 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ REGISTRY ?= glidea
FULL_IMAGE_NAME = $(REGISTRY)/$(IMAGE_NAME)
-.PHONY: test push build-installer
+.PHONY: test push dev-push
test:
go test -race -v -coverprofile=coverage.out -coverpkg=./... ./...
@@ -16,3 +16,10 @@ push:
-t $(FULL_IMAGE_NAME):$(VERSION) \
-t $(FULL_IMAGE_NAME):latest \
--push .
+
+dev-push:
+ docker buildx create --use --name multi-platform-builder || true
+ docker buildx build --platform linux/amd64,linux/arm64 \
+ --build-arg VERSION=$(VERSION) \
+ -t $(FULL_IMAGE_NAME):$(VERSION) \
+ --push .
diff --git a/README-en.md b/README-en.md
index 66de421..684790a 100644
--- a/README-en.md
+++ b/README-en.md
@@ -73,7 +73,7 @@ Just for the exquisite email styles, install and use it now!
### 1. Installation
-By default, uses SiliconFlow's Qwen/Qwen2.5-7B-Instruct (free) and Pro/BAAI/bge-m3. If you don't have a SiliconFlow account yet, use this [invitation link](https://cloud.siliconflow.cn/i/U2VS0Q5A) to get a ¥14 credit.
+By default, uses SiliconFlow's Qwen/Qwen3-8B (free) and Pro/BAAI/bge-m3. If you don't have a SiliconFlow account yet, use this [invitation link](https://cloud.siliconflow.cn/i/U2VS0Q5A) to get a ¥14 credit.
Support for other vendors or models is available; follow the instructions below.
@@ -84,7 +84,7 @@ curl -L -O https://raw.githubusercontent.com/glidea/zenfeed/main/docker-compose.
# If you need to customize more configuration parameters, directly edit docker-compose.yml#configs.zenfeed_config.content BEFORE running the command below.
# Configuration Docs: https://github.com/glidea/zenfeed/blob/main/docs/config.md
-API_KEY=your_apikey TZ=your_local_IANA LANG=English docker-compose -p zenfeed up -d
+API_KEY=your_apikey TZ=your_local_IANA LANGUAGE=English docker-compose -p zenfeed up -d
```
#### Windows
@@ -94,7 +94,7 @@ Invoke-WebRequest -Uri "https://raw.githubusercontent.com/glidea/zenfeed/main/do
# If you need to customize more configuration parameters, directly edit docker-compose.yml#configs.zenfeed_config.content BEFORE running the command below.
# Configuration Docs: https://github.com/glidea/zenfeed/blob/main/docs/config.md
-$env:API_KEY = "your_apikey"; $env:TZ = "your_local_IANA"; $env:LANG = "English"; docker-compose -p zenfeed up -d
+$env:API_KEY = "your_apikey"; $env:TZ = "your_local_IANA"; $env:LANGUAGE = "English"; docker-compose -p zenfeed up -d
```
### 2. Using the Web UI
diff --git a/README.md b/README.md
index 9661a80..2607d73 100644
--- a/README.md
+++ b/README.md
@@ -6,16 +6,19 @@
**1. AI 版 RSS 阅读器**
+* 在线服务
+ * https://zenfeed.xyz
+ * 或 Folo 搜索 zenfeed
+
**2. 实时 “新闻” 知识库**
**3. 帮你时刻关注 “指定事件” 的秘书(如 “关税政策变化”,“xx 股票波动”)**,并支持整理研究报告
-开箱即用的公共服务站:https://zenfeed.xyz (集成 Hacker News,Github Trending,V2EX 热榜等常见公开信源)
-
-每日研究报告(包含播客)(实验性质)
+每日研究报告(包含播客)(实验性质) -- 已暂停更新
* [V2EX](https://v2ex.analysis.zenfeed.xyz/)
* [LinuxDO](https://linuxdo.analysis.zenfeed.xyz/)
+---
技术说明文档见:[HLD](docs/tech/hld-zh.md)
## 前言
@@ -98,7 +101,7 @@ zenfeed 是你的智能信息助手。它自动收集、筛选并总结关注的
### 1. 安装
> 最快 1min 拉起
-默认使用硅基流动的 Qwen/Qwen2.5-7B-Instruct(免费) 和 Pro/BAAI/bge-m3。如果你还没有硅基账号,使用 [邀请链接](https://cloud.siliconflow.cn/i/U2VS0Q5A) 得 14 元额度
+默认使用硅基流动的 Qwen/Qwen3-8B (免费) 和 Pro/BAAI/bge-m3。如果你还没有硅基账号,使用 [邀请链接](https://cloud.siliconflow.cn/i/U2VS0Q5A) 得 14 元额度
如果需要使用其他厂商或模型,或自定义部署:请编辑下方 **docker-compose.yml**#configs.zenfeed_config.content.
参考 [配置文档](https://github.com/glidea/zenfeed/blob/main/docs/config-zh.md)
@@ -142,6 +145,14 @@ $env:API_KEY = "硅基流动apikey"; docker-compose -p zenfeed up -d
以 Cherry Studio 为例,配置 MCP 并连接到 Zenfeed,见 [Cherry Studio MCP](docs/cherry-studio-mcp.md)
> 默认地址 http://localhost:1301/sse
+### 后续
+
+zenfeed 提供了超多的自定义配置,还有很多玩法等待你挖掘。详细请查阅[文档](/docs/)
+
+### Roadmap
+
+[Roadmap](/docs/roadmap-zh.md)
+
## 欢迎加群讨论
> 使用问题请提 Issue,谢绝微信私聊。帮助有类似问题的朋友
diff --git a/docs/config-zh.md b/docs/config-zh.md
index 380c226..9240669 100644
--- a/docs/config-zh.md
+++ b/docs/config-zh.md
@@ -1,19 +1,22 @@
-| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
-| :--------- | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------- | :------------- |
-| `timezone` | `string` | 应用的时区。例如 `Asia/Shanghai`。 | 服务器本地时区 | 否 |
-| `log` | `object` | 日志配置。详见下方的 **日志配置** 部分。 | (见具体字段) | 否 |
-| `api` | `object` | API 配置。详见下方的 **API 配置** 部分。 | (见具体字段) | 否 |
-| `llms` | `列表` | 大语言模型 (LLM) 配置。会被其他配置部分引用。详见下方的 **LLM 配置** 部分。 | `[]` | 是 (至少 1 个) |
-| `scrape` | `object` | 抓取配置。详见下方的 **抓取配置** 部分。 | (见具体字段) | 否 |
-| `storage` | `object` | 存储配置。详见下方的 **存储配置** 部分。 | (见具体字段) | 否 |
-| `scheduls` | `object` | 用于监控 Feed 的调度配置 (也称为监控规则)。详见下方的 **调度配置** 部分。 | (见具体字段) | 否 |
-| `notify` | `object` | 通知配置。它接收来自调度模块的结果,通过路由配置进行分组,并通过通知渠道发送给通知接收者。详见下方的 **通知配置**, **通知路由**, **通知接收者**, **通知渠道** 部分。 | (见具体字段) | 是 |
+| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
+| :---------- | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------- | :------------- |
+| `timezone` | `string` | 应用的时区。例如 `Asia/Shanghai`。 | 服务器本地时区 | 否 |
+| `telemetry` | `object` | Telemetry 配置。详见下方的 **Telemetry 配置** 部分。 | (见具体字段) | 否 |
+| `api` | `object` | API 配置。详见下方的 **API 配置** 部分。 | (见具体字段) | 否 |
+| `llms` | `列表` | 大语言模型 (LLM) 配置。会被其他配置部分引用。详见下方的 **LLM 配置** 部分。 | `[]` | 是 (至少 1 个) |
+| `jina` | `object` | Jina AI 配置。详见下方的 **Jina AI 配置** 部分。 | (见具体字段) | 否 |
+| `scrape` | `object` | 抓取配置。详见下方的 **抓取配置** 部分。 | (见具体字段) | 否 |
+| `storage` | `object` | 存储配置。详见下方的 **存储配置** 部分。 | (见具体字段) | 否 |
+| `scheduls` | `object` | 用于监控 Feed 的调度配置 (也称为监控规则)。详见下方的 **调度配置** 部分。 | (见具体字段) | 否 |
+| `notify` | `object` | 通知配置。它接收来自调度模块的结果,通过路由配置进行分组,并通过通知渠道发送给通知接收者。详见下方的 **通知配置**, **通知路由**, **通知接收者**, **通知渠道** 部分。 | (见具体字段) | 是 |
-### 日志配置 (`log`)
+### Telemetry 配置 (`telemetry`)
-| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
-| :---------- | :------- | :--------------------------------------------------------- | :----- | :------- |
-| `log.level` | `string` | 日志级别, 可选值为 `debug`, `info`, `warn`, `error` 之一。 | `info` | 否 |
+| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
+| :-------------------- | :------- | :----------------------------------------------------------------------------- | :----------- | :------- |
+| `telemetry.address` | `string` | 暴露 Prometheus 指标 & pprof。 | | 否 |
+| `telemetry.log` | `object` | Telemetry 相关的日志配置。 | (见具体字段) | 否 |
+| `telemetry.log.level` | `string` | Telemetry 相关消息的日志级别, 可选值为 `debug`, `info`, `warn`, `error` 之一。 | `info` | 否 |
### API 配置 (`api`)
@@ -40,6 +43,14 @@
| `llms[].embedding_model` | `string` | LLM 的 Embedding 模型。例如 `text-embedding-3-small`。如果用于 Embedding,则不能为空。如果此 LLM 被使用,则不能与 `model` 同时为空。**注意:** 初次使用后请勿直接修改,应添加新的 LLM 配置。 | | 条件性必需 |
| `llms[].temperature` | `float32` | LLM 的温度 (0-2)。 | `0.0` | 否 |
+### Jina AI 配置 (`jina`)
+
+此部分用于配置 Jina AI Reader API 的相关参数,主要供重写规则中的 `crawl_by_jina` 类型使用。
+
+| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
+| :----------- | :------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----- | :------- |
+| `jina.token` | `string` | Jina AI 的 API Token。从 [Jina AI API Dashboard](https://jina.ai/api-dashboard/) 获取。提供 Token 可以获得更高的服务速率限制。如果留空,将以匿名用户身份请求,速率限制较低。 | | 否 |
+
### 抓取配置 (`scrape`)
| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
@@ -88,15 +99,16 @@
定义在存储前处理 Feed 的规则。规则按顺序应用。
-| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
-| :--------------------------------------- | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------- | :--------------------------------------------- |
-| `...rewrites[].source_label` | `string` | 用作转换源文本的 Feed 标签。默认标签包括: `type`, `source`, `title`, `link`, `pub_time`, `content`。 | `content` | 否 |
-| `...rewrites[].skip_too_short_threshold` | `*int` | 如果设置,`source_label` 文本长度低于此阈值的 Feed 将被此规则跳过 (处理将继续进行下一条规则,如果没有更多规则则进行 Feed 存储)。有助于过滤掉过短/信息量不足的 Feed。 | `300` | 否 |
-| `...rewrites[].transform` | `object` | 配置如何转换 `source_label` 文本。详见下方的 **重写规则转换配置**。如果未设置,则直接使用 `source_label` 文本进行匹配。 | `nil` | 否 |
-| `...rewrites[].match` | `string` | 用于匹配 (转换后) 文本的简单字符串。不能与 `match_re` 同时设置。 | | 否 (使用 `match` 或 `match_re`) |
-| `...rewrites[].match_re` | `string` | 用于匹配 (转换后) 文本的正则表达式。 | `.*` (匹配所有) | 否 (使用 `match` 或 `match_re`) |
-| `...rewrites[].action` | `string` | 匹配时执行的操作: `create_or_update_label` (使用匹配/转换后的文本添加/更新标签), `drop_feed` (完全丢弃该 Feed)。 | `create_or_update_label` | 否 |
-| `...rewrites[].label` | `string` | 要创建或更新的 Feed 标签名称。 | | 是 (如果 `action` 是 `create_or_update_label`) |
+| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
+| :--------------------------------------- | :----------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :----------------------- | :--------------------------------------------- |
+| `...rewrites[].if` | `字符串列表` | 用于匹配 Feed 的条件配置。如果未设置,则表示匹配所有 Feed。类似于标签过滤器,例如 `["source=github", "title!=xxx"]`。如果条件不满足,则跳过此规则。 | `[]` (匹配所有) | 否 |
+| `...rewrites[].source_label` | `string` | 用作转换源文本的 Feed 标签。默认标签包括: `type`, `source`, `title`, `link`, `pub_time`, `content`。 | `content` | 否 |
+| `...rewrites[].skip_too_short_threshold` | `*int` | 如果设置,`source_label` 文本长度低于此阈值的 Feed 将被此规则跳过 (处理将继续进行下一条规则,如果没有更多规则则进行 Feed 存储)。有助于过滤掉过短/信息量不足的 Feed。 | `300` | 否 |
+| `...rewrites[].transform` | `object` | 配置如何转换 `source_label` 文本。详见下方的 **重写规则转换配置**。如果未设置,则直接使用 `source_label` 文本进行匹配。 | `nil` | 否 |
+| `...rewrites[].match` | `string` | 用于匹配 (转换后) 文本的简单字符串。不能与 `match_re` 同时设置。 | | 否 (使用 `match` 或 `match_re`) |
+| `...rewrites[].match_re` | `string` | 用于匹配 (转换后) 文本的正则表达式。 | `.*` (匹配所有) | 否 (使用 `match` 或 `match_re`) |
+| `...rewrites[].action` | `string` | 匹配时执行的操作: `create_or_update_label` (使用匹配/转换后的文本添加/更新标签), `drop_feed` (完全丢弃该 Feed)。 | `create_or_update_label` | 否 |
+| `...rewrites[].label` | `string` | 要创建或更新的 Feed 标签名称。 | | 是 (如果 `action` 是 `create_or_update_label`) |
### 重写规则转换配置 (`storage.feed.rewrites[].transform`)
@@ -106,10 +118,13 @@
### 重写规则转换为文本配置 (`storage.feed.rewrites[].transform.to_text`)
-| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
-| :------------------ | :------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | :---------------------- | :------- |
-| `...to_text.llm` | `string` | 用于转换的 LLM 名称 (来自 `llms` 部分)。 | `llms` 部分中的默认 LLM | 否 |
-| `...to_text.prompt` | `string` | 用于转换的 Prompt。源文本将被注入。可以使用 Go 模板语法引用内置 Prompt: `{{ .summary }}`, `{{ .category }}`, `{{ .tags }}`, `{{ .score }}`, `{{ .comment_confucius }}`, `{{ .summary_html_snippet }}`。 | | 是 |
+此配置定义了如何将 `source_label` 的文本进行转换。
+
+| 字段 | 类型 | 描述 | 默认值 | 是否必需 |
+| :------------------ | :------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :---------------------- | :--------------------------- |
+| `...to_text.type` | `string` | 转换的类型。可选值:
Feeds
`); err != nil {
- return nil, errors.Wrap(err, "write feeds header")
+ return "", errors.Wrap(err, "write feeds header")
}
for i, feed := range group.Feeds {
if err := e.writeFeedContent(bodyBuf, feed); err != nil {
- return nil, errors.Wrap(err, "write feed content")
+ return "", errors.Wrap(err, "write feed content")
}
// Add separator (except the last feed).
if i < len(group.Feeds)-1 {
if err := e.writeSeparator(bodyBuf); err != nil {
- return nil, errors.Wrap(err, "write separator")
+ return "", errors.Wrap(err, "write separator")
}
}
}
// Write disclaimer and HTML footer.
if err := e.writeDisclaimer(bodyBuf); err != nil {
- return nil, errors.Wrap(err, "write disclaimer")
+ return "", errors.Wrap(err, "write disclaimer")
}
if err := e.writeHTMLFooter(bodyBuf); err != nil {
- return nil, errors.Wrap(err, "write HTML footer")
+ return "", errors.Wrap(err, "write HTML footer")
}
- return bodyBuf.Bytes(), nil
+ return bodyBuf.String(), nil
}
func (e *email) writeHTMLHeader(buf *buffer.Bytes) error {
diff --git a/pkg/notify/channel/webhook.go b/pkg/notify/channel/webhook.go
index 401a997..2e755f0 100644
--- a/pkg/notify/channel/webhook.go
+++ b/pkg/notify/channel/webhook.go
@@ -41,9 +41,10 @@ func (r *WebhookReceiver) Validate() error {
}
type webhookBody struct {
- Group string `json:"group"`
- Labels model.Labels `json:"labels"`
- Feeds []*route.Feed `json:"feeds"`
+ Group string `json:"group"`
+ Labels model.Labels `json:"labels"`
+ Summary string `json:"summary"`
+ Feeds []*route.Feed `json:"feeds"`
}
func newWebhook() sender {
@@ -59,9 +60,10 @@ type webhook struct {
func (w *webhook) Send(ctx context.Context, receiver Receiver, group *route.FeedGroup) error {
// Prepare request.
body := &webhookBody{
- Group: group.Name,
- Labels: group.Labels,
- Feeds: group.Feeds,
+ Group: group.Name,
+ Labels: group.Labels,
+ Summary: group.Summary,
+ Feeds: group.Feeds,
}
b := runtimeutil.Must1(json.Marshal(body))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, receiver.Webhook.URL, bytes.NewReader(b))
diff --git a/pkg/notify/notify.go b/pkg/notify/notify.go
index 5f31ba0..c91a596 100644
--- a/pkg/notify/notify.go
+++ b/pkg/notify/notify.go
@@ -86,9 +86,9 @@ func (c *Config) From(app *config.App) *Config {
if app.Notify.Receivers[i].Email != "" {
c.Receivers[i].Email = app.Notify.Receivers[i].Email
}
- // if app.Notify.Receivers[i].Webhook != nil {
- // c.Receivers[i].Webhook = &channel.WebhookReceiver{URL: app.Notify.Receivers[i].Webhook.URL}
- // }
+ if app.Notify.Receivers[i].Webhook != nil {
+ c.Receivers[i].Webhook = &channel.WebhookReceiver{URL: app.Notify.Receivers[i].Webhook.URL}
+ }
}
c.Channels = channel.Config{}
@@ -438,8 +438,8 @@ func (n *notifier) send(ctx context.Context, work sendWork) error {
return channel.Send(ctx, work.receiver.Receiver, work.group)
}
-var nlogKey = func(group *route.FeedGroup, receiver Receiver) string {
- return fmt.Sprintf("notifier.group.%s.receiver.%s.%d", group.Name, receiver.Name, group.Time.Unix())
+var nlogKey = func(group *route.FeedGroup, receiver Receiver) []byte {
+ return fmt.Appendf(nil, "notifier.group.%s.receiver.%s.%d", group.Name, receiver.Name, group.Time.Unix())
}
func (n *notifier) isSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) bool {
@@ -457,7 +457,7 @@ func (n *notifier) isSent(ctx context.Context, group *route.FeedGroup, receiver
}
func (n *notifier) markSent(ctx context.Context, group *route.FeedGroup, receiver Receiver) error {
- return n.Dependencies().KVStorage.Set(ctx, nlogKey(group, receiver), timeutil.Format(time.Now()), timeutil.Day)
+ return n.Dependencies().KVStorage.Set(ctx, nlogKey(group, receiver), []byte(timeutil.Format(time.Now())), timeutil.Day)
}
type sendWork struct {
diff --git a/pkg/notify/route/route.go b/pkg/notify/route/route.go
index 988d4ad..58bad01 100644
--- a/pkg/notify/route/route.go
+++ b/pkg/notify/route/route.go
@@ -72,56 +72,25 @@ func (s SubRoutes) Match(feed *block.FeedVO) *SubRoute {
type SubRoute struct {
Route
Matchers []string
- matchers []matcher
+ matchers model.LabelFilters
}
func (r *SubRoute) Match(feed *block.FeedVO) *SubRoute {
+ // Match sub routes.
for _, subRoute := range r.SubRoutes {
if matched := subRoute.Match(feed); matched != nil {
return matched
}
}
- for _, m := range r.matchers {
- fv := feed.Labels.Get(m.key)
- switch m.equal {
- case true:
- if fv != m.value {
- return nil
- }
- default:
- if fv == m.value {
- return nil
- }
- }
+
+ // Match self.
+ if !r.matchers.Match(feed.Labels) {
+ return nil
}
return r
}
-type matcher struct {
- key string
- value string
- equal bool
-}
-
-var (
- matcherEqual = "="
- matcherNotEqual = "!="
- parseMatcher = func(filter string) (matcher, error) {
- eq := false
- parts := strings.Split(filter, matcherNotEqual)
- if len(parts) != 2 {
- parts = strings.Split(filter, matcherEqual)
- eq = true
- }
- if len(parts) != 2 {
- return matcher{}, errors.New("invalid matcher")
- }
-
- return matcher{key: parts[0], value: parts[1], equal: eq}, nil
- }
-)
-
func (r *SubRoute) Validate() error {
if len(r.GroupBy) == 0 {
r.GroupBy = []string{model.LabelSource}
@@ -129,17 +98,16 @@ func (r *SubRoute) Validate() error {
if r.CompressByRelatedThreshold == nil {
r.CompressByRelatedThreshold = ptr.To(float32(0.85))
}
+
if len(r.Matchers) == 0 {
return errors.New("matchers is required")
}
- r.matchers = make([]matcher, len(r.Matchers))
- for i, matcher := range r.Matchers {
- m, err := parseMatcher(matcher)
- if err != nil {
- return errors.Wrap(err, "invalid matcher")
- }
- r.matchers[i] = m
+ matchers, err := model.NewLabelFilters(r.Matchers)
+ if err != nil {
+ return errors.Wrap(err, "invalid matchers")
}
+ r.matchers = matchers
+
for _, subRoute := range r.SubRoutes {
if err := subRoute.Validate(); err != nil {
return errors.Wrap(err, "invalid sub_route")
@@ -151,7 +119,7 @@ func (r *SubRoute) Validate() error {
func (c *Config) Validate() error {
if len(c.GroupBy) == 0 {
- c.GroupBy = []string{model.LabelSource}
+ c.GroupBy = []string{model.LabelType}
}
if c.CompressByRelatedThreshold == nil {
c.CompressByRelatedThreshold = ptr.To(float32(0.85))
@@ -179,8 +147,8 @@ type FeedGroup struct {
Name string
Time time.Time
Labels model.Labels
- Feeds []*Feed
Summary string
+ Feeds []*Feed
}
func (g *FeedGroup) ID() string {
diff --git a/pkg/rewrite/rewrite.go b/pkg/rewrite/rewrite.go
index 6fc5d24..f8cc9a7 100644
--- a/pkg/rewrite/rewrite.go
+++ b/pkg/rewrite/rewrite.go
@@ -19,8 +19,8 @@ import (
"context"
"html/template"
"regexp"
+ "strings"
"unicode/utf8"
- "unsafe"
"github.com/pkg/errors"
"k8s.io/utils/ptr"
@@ -28,14 +28,15 @@ import (
"github.com/glidea/zenfeed/pkg/component"
"github.com/glidea/zenfeed/pkg/config"
"github.com/glidea/zenfeed/pkg/llm"
+ "github.com/glidea/zenfeed/pkg/llm/prompt"
"github.com/glidea/zenfeed/pkg/model"
"github.com/glidea/zenfeed/pkg/telemetry"
telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model"
"github.com/glidea/zenfeed/pkg/util/buffer"
+ "github.com/glidea/zenfeed/pkg/util/crawl"
)
// --- Interface code block ---
-
type Rewriter interface {
component.Component
config.Watcher
@@ -71,6 +72,11 @@ type Dependencies struct {
}
type Rule struct {
+ // If is the condition to check before applying the rule.
+ // If not set, the rule will be applied.
+ If []string
+ if_ model.LabelFilters
+
// SourceLabel specifies which label's value to use as source text.
// Default is model.LabelContent.
SourceLabel string
@@ -96,29 +102,51 @@ type Rule struct {
}
func (r *Rule) Validate() error { //nolint:cyclop
+ // If.
+ if len(r.If) > 0 {
+ if_, err := model.NewLabelFilters(r.If)
+ if err != nil {
+ return errors.Wrapf(err, "invalid if %q", r.If)
+ }
+ r.if_ = if_
+ }
+
// Source label.
if r.SourceLabel == "" {
r.SourceLabel = model.LabelContent
}
if r.SkipTooShortThreshold == nil {
- r.SkipTooShortThreshold = ptr.To(300)
+ r.SkipTooShortThreshold = ptr.To(0)
}
// Transform.
if r.Transform != nil {
- if r.Transform.ToText.Prompt == "" {
- return errors.New("to text prompt is required")
+ if r.Transform.ToText == nil {
+ return errors.New("to_text is required when transform is set")
}
- tmpl, err := template.New("").Parse(r.Transform.ToText.Prompt)
- if err != nil {
- return errors.Wrapf(err, "parse prompt template %s", r.Transform.ToText.Prompt)
+
+ switch r.Transform.ToText.Type {
+ case ToTextTypePrompt:
+ if r.Transform.ToText.Prompt == "" {
+ return errors.New("to text prompt is required for prompt type")
+ }
+ tmpl, err := template.New("").Parse(r.Transform.ToText.Prompt)
+ if err != nil {
+ return errors.Wrapf(err, "parse prompt template %s", r.Transform.ToText.Prompt)
+ }
+
+ buf := buffer.Get()
+ defer buffer.Put(buf)
+ if err := tmpl.Execute(buf, prompt.Builtin); err != nil {
+ return errors.Wrapf(err, "execute prompt template %s", r.Transform.ToText.Prompt)
+ }
+ r.Transform.ToText.promptRendered = buf.String()
+
+ case ToTextTypeCrawl, ToTextTypeCrawlByJina:
+ // No specific validation for crawl type here, as the source text itself is the URL.
+ default:
+ return errors.Errorf("unknown transform type: %s", r.Transform.ToText.Type)
}
- buf := buffer.Get()
- defer buffer.Put(buf)
- if err := tmpl.Execute(buf, promptTemplates); err != nil {
- return errors.Wrapf(err, "execute prompt template %s", r.Transform.ToText.Prompt)
- }
- r.Transform.ToText.promptRendered = buf.String()
}
// Match.
@@ -148,15 +176,21 @@ func (r *Rule) Validate() error { //nolint:cyclop
}
func (r *Rule) From(c *config.RewriteRule) {
+ r.If = c.If
r.SourceLabel = c.SourceLabel
r.SkipTooShortThreshold = c.SkipTooShortThreshold
if c.Transform != nil {
t := &Transform{}
if c.Transform.ToText != nil {
- t.ToText = &ToText{
+ toText := &ToText{
LLM: c.Transform.ToText.LLM,
Prompt: c.Transform.ToText.Prompt,
}
+ toText.Type = ToTextType(c.Transform.ToText.Type)
+ if toText.Type == "" {
+ toText.Type = ToTextTypePrompt // Default to prompt if not specified.
+ }
+ t.ToText = toText
}
r.Transform = t
}
@@ -173,15 +207,27 @@ type Transform struct {
}
type ToText struct {
+ Type ToTextType
+
// LLM is the name of the LLM to use.
+ // Only used when Type is ToTextTypePrompt.
LLM string
// Prompt is the prompt for LLM completion.
// The source text will automatically be injected into the prompt.
+ // Only used when Type is ToTextTypePrompt.
Prompt string
promptRendered string
}
+type ToTextType string
+
+const (
+ ToTextTypePrompt ToTextType = "prompt"
+ ToTextTypeCrawl ToTextType = "crawl"
+ ToTextTypeCrawlByJina ToTextType = "crawl_by_jina"
+)
+
type Action string
const (
@@ -189,233 +235,7 @@ const (
ActionCreateOrUpdateLabel Action = "create_or_update_label"
)
-var promptTemplates = map[string]string{
- "category": `
-Analyze the content and categorize it into exactly one of these categories:
-Technology, Development, Entertainment, Finance, Health, Politics, Other
-
-Classification requirements:
-- Choose the SINGLE most appropriate category based on:
- * Primary topic and main focus of the content
- * Key terminology and concepts used
- * Target audience and purpose
- * Technical depth and complexity level
-- For content that could fit multiple categories:
- * Identify the dominant theme
- * Consider the most specific applicable category
- * Use the primary intended purpose
-- If content appears ambiguous:
- * Focus on the most prominent aspects
- * Consider the practical application
- * Choose the category that best serves user needs
-
-Output format:
-Return ONLY the category name, no other text or explanation.
-Must be one of the provided categories exactly as written.
-`,
-
- "tags": `
-Analyze the content and add appropriate tags based on:
-- Main topics and themes
-- Key concepts and terminology
-- Target audience and purpose
-- Technical depth and domain
-- 2-4 tags are enough
-Output format:
-Return a list of tags, separated by commas, no other text or explanation.
-e.g. "AI, Technology, Innovation, Future"
-`,
-
- "score": `
-Please give a score between 0 and 10 based on the following content.
-Evaluate the content comprehensively considering clarity, accuracy, depth, logical structure, language expression, and completeness.
-Note: If the content is an article or a text intended to be detailed, the length is an important factor. Generally, content under 300 words may receive a lower score due to lack of substance, unless its type (such as poetry or summary) is inherently suitable for brevity.
-Output format:
-Return the score (0-10), no other text or explanation.
-E.g. "8", "5", "3", etc.
-`,
-
- "comment_confucius": `
-Please act as Confucius and write a 100-word comment on the article.
-Content needs to be in line with the Chinese mainland's regulations.
-Output format:
-Return the comment only, no other text or explanation.
-Reply short and concise, 100 words is enough.
-`,
-
- "summary": `
-Summarize the article in 100-200 words.
-`,
-
- "summary_html_snippet": `
-# Task: Create Visually Appealing Information Summary Emails
-
-You are a professional content designer. Please convert the provided articles into **visually modern HTML email segments**, focusing on display effects in modern clients like Gmail and QQ Mail.
-
-## Key Requirements:
-
-1. **Output Format**:
- - Only output HTML code snippets, **no need for complete HTML document structure**
- - Only generate HTML code for a single article, so users can combine multiple pieces into a complete email
- - No explanations, additional comments, or markups
- - **No need to add titles and sources**, users will inject them automatically
- - No use html backticks, output raw html code directly
- - Output directly, no explanation, no comments, no markups
-
-2. **Content Processing**:
- - **Don't directly copy the original text**, but extract key information and core insights from each article
- - **Each article summary should be 100-200 words**, don't force word count, adjust the word count based on the actual length of the article
- - Summarize points in relaxed, natural language, as if chatting with friends, while maintaining depth
- - Maintain the original language of the article (e.g., Chinese summary for Chinese articles)
-
-3. **Visual Design**:
- - Design should be aesthetically pleasing with coordinated colors
- - Use sufficient whitespace and contrast
- - Maintain a consistent visual style across all articles
- - **Must use multiple visual elements** (charts, cards, quote blocks, etc.), avoid pure text presentation
- - Each article should use at least 2-3 different visual elements to make content more intuitive and readable
-
-4. **Highlight Techniques**:
-
- A. **Beautiful Quote Blocks** (for highlighting important viewpoints):
-
-
- Here is the key viewpoint or finding that needs to be highlighted.
-
-
-
- B. **Information Cards** (for highlighting key data):
-
-
- C. **Key Points List** (for highlighting multiple points):
-
- -
- 1
- First point description
-
- -
- 2
- Second point description
-
-
-
- D. **Emphasis Text** (for highlighting key words or phrases):
-
Text to emphasize
-
-5. **Timeline Design** (suitable for event sequences or news developments):
-
-
Event Development Timeline
-
-
-
-
-
-
June 1, 2023
-
Event description content, concisely explaining the key points and impact of the event.
-
-
-
-
-
-
June 15, 2023
-
Event description content, concisely explaining the key points and impact of the event.
-
-
-
-
-6. **Comparison Table** (for comparing different options or viewpoints):
-
-
-
-
- | Feature |
- Option A |
- Option B |
-
-
-
-
- | Cost |
- Higher |
- Moderate |
-
-
- | Efficiency |
- Very High |
- Average |
-
-
-
-
-
-7. **Chart Data Processing**:
- - Bar Chart/Horizontal Bars:
-
-
Data Comparison
-
-
-
-
- Project A
- 65%
-
-
-
-
-
-
-
- Project B
- 42%
-
-
-
-
-
-8. **Highlight Box** (for displaying tips or reminders):
-
-
-
- !
-
-
-
Tip
-
- Here are some additional tips or suggestions to help readers better understand or apply the article content.
-
-
-
-
-
-9. **Summary Box**:
-
-
In Simple Terms
-
- This is a concise summary of the entire content, highlighting the most critical findings and conclusions.
-
-
-
-## Notes:
-1. **Only generate content for a single article**, not including title and source, and not including HTML head and tail structure
-2. Content should be **200-300 words**, don't force word count
-3. **Must use multiple visual elements** (at least 2-3 types), avoid monotonous pure text presentation
-4. Use relaxed, natural language, as if chatting with friends
-5. Create visual charts for important data, rather than just describing with text
-6. Use quote blocks to highlight important viewpoints, and lists to organize multiple points
-7. Appropriately use emojis and conversational expressions to increase friendliness
-8. Note that the article content has been provided in the previous message, please reply directly, no explanation, no comments, no markups
-`,
-}
-
// --- Factory code block ---
-
type Factory component.Factory[Rewriter, config.App, Dependencies]
func NewFactory(mockOn ...component.MockOption) Factory {
@@ -445,6 +265,8 @@ func new(instance string, app *config.App, dependencies Dependencies) (Rewriter,
Config: c,
Dependencies: dependencies,
}),
+ crawler: crawl.NewLocal(),
+ jinaCrawler: crawl.NewJina(app.Jina.Token),
}, nil
}
@@ -452,6 +274,9 @@ func new(instance string, app *config.App, dependencies Dependencies) (Rewriter,
type rewriter struct {
*component.Base[Config, Dependencies]
+
+ crawler crawl.Crawler
+ jinaCrawler crawl.Crawler
}
func (r *rewriter) Reload(app *config.App) error {
@@ -462,6 +287,8 @@ func (r *rewriter) Reload(app *config.App) error {
}
r.SetConfig(newConfig)
+ r.jinaCrawler = crawl.NewJina(app.Jina.Token)
+
return nil
}
@@ -471,6 +298,11 @@ func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (rewritten m
rules := *r.Config()
for _, rule := range rules {
+ // If.
+ if !rule.if_.Match(labels) {
+ continue
+ }
+
// Get source text based on source label.
sourceText := labels.Get(rule.SourceLabel)
if utf8.RuneCountInString(sourceText) < *rule.SkipTooShortThreshold {
@@ -479,7 +311,7 @@ func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (rewritten m
// Transform text if configured.
text := sourceText
- if rule.Transform != nil {
+ if rule.Transform != nil && rule.Transform.ToText != nil {
transformed, err := r.transformText(ctx, rule.Transform, sourceText)
if err != nil {
return nil, errors.Wrap(err, "transform text")
@@ -506,15 +338,37 @@ func (r *rewriter) Labels(ctx context.Context, labels model.Labels) (rewritten m
return labels, nil
}
-// transformText transforms text using configured LLM.
+// transformText transforms text using configured LLM or by crawling a URL.
func (r *rewriter) transformText(ctx context.Context, transform *Transform, text string) (string, error) {
+ switch transform.ToText.Type {
+ case ToTextTypeCrawl:
+ return r.transformTextCrawl(ctx, r.crawler, text)
+ case ToTextTypeCrawlByJina:
+ return r.transformTextCrawl(ctx, r.jinaCrawler, text)
+
+ case ToTextTypePrompt:
+ return r.transformTextPrompt(ctx, transform, text)
+ default:
+ return r.transformTextPrompt(ctx, transform, text)
+ }
+}
+
+func (r *rewriter) transformTextCrawl(ctx context.Context, crawler crawl.Crawler, url string) (string, error) {
+ mdBytes, err := crawler.Markdown(ctx, url)
+ if err != nil {
+ return "", errors.Wrapf(err, "crawl %s", url)
+ }
+ return string(mdBytes), nil
+}
+
+// transformTextPrompt transforms text using configured LLM.
+func (r *rewriter) transformTextPrompt(ctx context.Context, transform *Transform, text string) (string, error) {
// Get LLM instance.
llm := r.Dependencies().LLMFactory.Get(transform.ToText.LLM)
// Call completion.
result, err := llm.String(ctx, []string{
transform.ToText.promptRendered,
- "The content to be processed is below, and the processing requirements are as above",
text, // TODO: may place to first line to hit the model cache in different rewrite rules.
})
if err != nil {
@@ -525,32 +379,11 @@ func (r *rewriter) transformText(ctx context.Context, transform *Transform, text
}
func (r *rewriter) transformTextHack(text string) string {
- bytes := unsafe.Slice(unsafe.StringData(text), len(text))
- start := 0
- end := len(bytes)
-
- // Remove the last line if it's empty.
- // This is a hack to avoid the model output a empty line.
- // E.g. category: tech\n
- if end > 0 && bytes[end-1] == '\n' {
- end--
- }
-
- // Remove the html backticks.
- if end-start >= 7 && string(bytes[start:start+7]) == "```html" {
- start += 7
- }
- if end-start >= 3 && string(bytes[end-3:end]) == "```" {
- end -= 3
- }
-
- // If no changes, return the original string.
- if start == 0 && end == len(bytes) {
- return text
- }
-
- // Only copy one time.
- return string(bytes[start:end])
+ // TODO: optimize this.
+ text = strings.ReplaceAll(text, "```html", "")
+ text = strings.ReplaceAll(text, "```markdown", "")
+ text = strings.ReplaceAll(text, "```", "")
+ return text
}
type mockRewriter struct {
diff --git a/pkg/rewrite/rewrite_test.go b/pkg/rewrite/rewrite_test.go
index 98401ff..636592c 100644
--- a/pkg/rewrite/rewrite_test.go
+++ b/pkg/rewrite/rewrite_test.go
@@ -44,6 +44,7 @@ func TestLabels(t *testing.T) {
SkipTooShortThreshold: ptr.To(10),
Transform: &Transform{
ToText: &ToText{
+ Type: ToTextTypePrompt,
LLM: "mock-llm",
Prompt: "{{ .category }}", // Using a simple template for testing
},
@@ -79,6 +80,7 @@ func TestLabels(t *testing.T) {
SkipTooShortThreshold: ptr.To(10),
Transform: &Transform{
ToText: &ToText{
+ Type: ToTextTypePrompt,
LLM: "mock-llm",
Prompt: "{{ .category }}",
},
@@ -148,6 +150,7 @@ func TestLabels(t *testing.T) {
SkipTooShortThreshold: ptr.To(10),
Transform: &Transform{
ToText: &ToText{
+ Type: ToTextTypePrompt,
LLM: "mock-llm",
Prompt: "{{ .category }}",
promptRendered: "Analyze the content and categorize it...",
@@ -186,6 +189,7 @@ func TestLabels(t *testing.T) {
SkipTooShortThreshold: ptr.To(10),
Transform: &Transform{
ToText: &ToText{
+ Type: ToTextTypePrompt,
LLM: "mock-llm",
Prompt: "{{ .category }}",
promptRendered: "Analyze the content and categorize it...",
diff --git a/pkg/schedule/rule/periodic.go b/pkg/schedule/rule/periodic.go
index 4625a77..e596bf8 100644
--- a/pkg/schedule/rule/periodic.go
+++ b/pkg/schedule/rule/periodic.go
@@ -55,7 +55,7 @@ func (r *periodic) Run() (err error) {
end := time.Date(today.Year(), today.Month(), today.Day(),
config.end.Hour(), config.end.Minute(), 0, 0, today.Location())
- buffer := 20 * time.Minute
+ buffer := 30 * time.Minute
endPlusBuffer := end.Add(buffer)
if now.Before(end) || now.After(endPlusBuffer) {
return
diff --git a/pkg/schedule/rule/rule.go b/pkg/schedule/rule/rule.go
index a781283..d2f3b0a 100644
--- a/pkg/schedule/rule/rule.go
+++ b/pkg/schedule/rule/rule.go
@@ -18,7 +18,6 @@ package rule
import (
"strings"
"time"
- "unicode/utf8"
"github.com/pkg/errors"
@@ -58,11 +57,8 @@ func (c *Config) Validate() error { //nolint:cyclop,gocognit
if c.Name == "" {
return errors.New("name is required")
}
- if c.Query != "" && utf8.RuneCountInString(c.Query) < 5 {
- return errors.New("query must be at least 5 characters")
- }
if c.Threshold == 0 {
- c.Threshold = 0.6
+ c.Threshold = 0.5
}
if c.Threshold < 0 || c.Threshold > 1 {
return errors.New("threshold must be between 0 and 1")
diff --git a/pkg/scrape/scraper/rss.go b/pkg/scrape/scraper/rss.go
index 361b64e..851bc49 100644
--- a/pkg/scrape/scraper/rss.go
+++ b/pkg/scrape/scraper/rss.go
@@ -65,7 +65,6 @@ func newRSSReader(config *ScrapeSourceRSS) (reader, error) {
}
// --- Implementation code block ---
-
type rssReader struct {
config *ScrapeSourceRSS
client client
diff --git a/pkg/scrape/scraper/scraper.go b/pkg/scrape/scraper/scraper.go
index 495340b..cc37f80 100644
--- a/pkg/scrape/scraper/scraper.go
+++ b/pkg/scrape/scraper/scraper.go
@@ -227,7 +227,7 @@ func (s *scraper) filterExists(ctx context.Context, feeds []*model.Feed) (filter
appendToResult := func(feed *model.Feed) {
key := keyPrefix + strconv.FormatUint(feed.ID, 10)
value := timeutil.Format(feed.Time)
- if err := s.Dependencies().KVStorage.Set(ctx, key, value, ttl); err != nil {
+ if err := s.Dependencies().KVStorage.Set(ctx, []byte(key), []byte(value), ttl); err != nil {
log.Error(ctx, err, "set last try store time")
}
filtered = append(filtered, feed)
@@ -236,7 +236,7 @@ func (s *scraper) filterExists(ctx context.Context, feeds []*model.Feed) (filter
for _, feed := range feeds {
key := keyPrefix + strconv.FormatUint(feed.ID, 10)
- lastTryStored, err := s.Dependencies().KVStorage.Get(ctx, key)
+ lastTryStored, err := s.Dependencies().KVStorage.Get(ctx, []byte(key))
switch {
default:
log.Error(ctx, err, "get last stored time, fallback to continue writing")
@@ -246,7 +246,7 @@ func (s *scraper) filterExists(ctx context.Context, feeds []*model.Feed) (filter
appendToResult(feed)
case err == nil:
- t, err := timeutil.Parse(lastTryStored)
+ t, err := timeutil.Parse(string(lastTryStored))
if err != nil {
log.Error(ctx, err, "parse last try stored time, fallback to continue writing")
appendToResult(feed)
diff --git a/pkg/storage/feed/block/block.go b/pkg/storage/feed/block/block.go
index 8b025a0..4d2151a 100644
--- a/pkg/storage/feed/block/block.go
+++ b/pkg/storage/feed/block/block.go
@@ -26,7 +26,6 @@ import (
"runtime"
"slices"
"strconv"
- "strings"
"sync"
"sync/atomic"
"time"
@@ -277,47 +276,20 @@ type QueryOptions struct {
Query string
Threshold float32
LabelFilters []string
- labelFilters []LabelFilter
+ labelFilters model.LabelFilters
Limit int
Start, End time.Time
}
-var (
- LabelFilterEqual = "="
- LabelFilterNotEqual = "!="
-
- NewLabelFilter = func(key, value string, eq bool) string {
- if eq {
- return fmt.Sprintf("%s%s%s", key, LabelFilterEqual, value)
- }
-
- return fmt.Sprintf("%s%s%s", key, LabelFilterNotEqual, value)
- }
-
- ParseLabelFilter = func(filter string) (LabelFilter, error) {
- eq := false
- parts := strings.Split(filter, LabelFilterNotEqual)
- if len(parts) != 2 {
- parts = strings.Split(filter, LabelFilterEqual)
- eq = true
- }
- if len(parts) != 2 {
- return LabelFilter{}, errors.New("invalid label filter")
- }
-
- return LabelFilter{Label: parts[0], Value: parts[1], Equal: eq}, nil
- }
-)
-
func (q *QueryOptions) Validate() error { //nolint:cyclop
if q.Threshold < 0 || q.Threshold > 1 {
return errors.New("threshold must be between 0 and 1")
}
- for _, labelFilter := range q.LabelFilters {
- if labelFilter == "" {
+ for _, s := range q.LabelFilters {
+ if s == "" {
return errors.New("label filter is required")
}
- filter, err := ParseLabelFilter(labelFilter)
+ filter, err := model.NewLabelFilter(s)
if err != nil {
return errors.Wrap(err, "parse label filter")
}
@@ -368,13 +340,6 @@ func (q *QueryOptions) HitTimeRangeCondition(b Block) bool {
return queryAsBase || blockAsBase
}
-// LabelFilter defines the matcher for an item.
-type LabelFilter struct {
- Label string
- Equal bool
- Value string
-}
-
// --- Factory code block ---
type Factory component.Factory[Block, Config, Dependencies]
@@ -1228,14 +1193,14 @@ func (b *block) applyFilters(ctx context.Context, query *QueryOptions) (res filt
return b.mergeFilterResults(labelsResult, vectorsResult), nil
}
-func (b *block) applyLabelFilters(ctx context.Context, filters []LabelFilter) filterResult {
+func (b *block) applyLabelFilters(ctx context.Context, filters model.LabelFilters) filterResult {
if len(filters) == 0 {
return matchedAllFilterResult
}
var allIDs map[uint64]struct{}
for _, filter := range filters {
- ids := b.invertedIndex.Search(ctx, filter.Label, filter.Equal, filter.Value)
+ ids := b.invertedIndex.Search(ctx, filter)
if len(ids) == 0 {
return matchedNothingFilterResult
}
@@ -1317,7 +1282,7 @@ func (b *block) mergeFilterResults(x, y filterResult) filterResult {
}
func (b *block) fillEmbedding(ctx context.Context, feeds []*model.Feed) ([]*chunk.Feed, error) {
- embedded := make([]*chunk.Feed, len(feeds))
+ embedded := make([]*chunk.Feed, 0, len(feeds))
llm := b.Dependencies().LLMFactory.Get(b.Config().embeddingLLM)
var wg sync.WaitGroup
var mu sync.Mutex
@@ -1336,16 +1301,21 @@ func (b *block) fillEmbedding(ctx context.Context, feeds []*model.Feed) ([]*chun
}
mu.Lock()
- embedded[i] = &chunk.Feed{
+ embedded = append(embedded, &chunk.Feed{
Feed: feed,
Vectors: vectors,
- }
+ })
mu.Unlock()
}(i, feed)
}
wg.Wait()
- if len(errs) > 0 {
- return nil, errs[0]
+
+ switch len(errs) {
+ case 0:
+ case len(feeds):
+ return nil, errs[0] // All failed.
+ default:
+ log.Error(ctx, errors.Wrap(errs[0], "fill embedding"), "error_count", len(errs))
}
return embedded, nil
diff --git a/pkg/storage/feed/block/index/inverted/inverted.go b/pkg/storage/feed/block/index/inverted/inverted.go
index b633757..790cc99 100644
--- a/pkg/storage/feed/block/index/inverted/inverted.go
+++ b/pkg/storage/feed/block/index/inverted/inverted.go
@@ -24,7 +24,7 @@ type Index interface {
index.Codec
// Search returns item IDs matching the given label and value.
- Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{})
+ Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{})
// Add adds item to the index.
// If label or value in labels is empty, it will be ignored.
// If value is too long, it will be ignored,
@@ -88,17 +88,17 @@ type idx struct {
mu sync.RWMutex
}
-func (idx *idx) Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{}) {
+func (idx *idx) Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{}) {
ctx = telemetry.StartWith(ctx, append(idx.TelemetryLabels(), telemetrymodel.KeyOperation, "Search")...)
defer func() { telemetry.End(ctx, nil) }()
idx.mu.RLock()
defer idx.mu.RUnlock()
- if value == "" {
- return idx.searchEmptyValue(label, eq)
+ if matcher.Value == "" {
+ return idx.searchEmptyValue(matcher.Label, matcher.Equal)
}
- return idx.searchNonEmptyValue(label, eq, value)
+ return idx.searchNonEmptyValue(matcher)
}
func (idx *idx) Add(ctx context.Context, id uint64, labels model.Labels) {
@@ -198,16 +198,16 @@ func (idx *idx) searchEmptyValue(label string, eq bool) map[uint64]struct{} {
// searchNonEmptyValue handles the search logic when the target value is not empty.
// If eq is true, it returns IDs that have the exact label-value pair.
// If eq is false, it returns IDs that *do not* have the exact label-value pair.
-func (idx *idx) searchNonEmptyValue(label string, eq bool, value string) map[uint64]struct{} {
+func (idx *idx) searchNonEmptyValue(matcher model.LabelFilter) map[uint64]struct{} {
// Get the map of values for the given label.
- values, labelExists := idx.m[label]
+ values, labelExists := idx.m[matcher.Label]
// If equal (eq), find the exact match.
- if eq {
+ if matcher.Equal {
if !labelExists {
return make(map[uint64]struct{}) // Label doesn't exist.
}
- ids, valueExists := values[value]
+ ids, valueExists := values[matcher.Value]
if !valueExists {
return make(map[uint64]struct{}) // Value doesn't exist for this label.
}
@@ -221,7 +221,7 @@ func (idx *idx) searchNonEmptyValue(label string, eq bool, value string) map[uin
resultIDs := maps.Clone(idx.ids)
if labelExists {
// If the specific label-value pair exists, remove its associated IDs.
- if matchingIDs, valueExists := values[value]; valueExists {
+ if matchingIDs, valueExists := values[matcher.Value]; valueExists {
for id := range matchingIDs {
delete(resultIDs, id)
}
@@ -413,8 +413,8 @@ type mockIndex struct {
component.Mock
}
-func (m *mockIndex) Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{}) {
- args := m.Called(ctx, label, eq, value)
+func (m *mockIndex) Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{}) {
+ args := m.Called(ctx, matcher)
return args.Get(0).(map[uint64]struct{})
}
diff --git a/pkg/storage/feed/block/index/inverted/inverted_test.go b/pkg/storage/feed/block/index/inverted/inverted_test.go
index afc656b..20f0336 100644
--- a/pkg/storage/feed/block/index/inverted/inverted_test.go
+++ b/pkg/storage/feed/block/index/inverted/inverted_test.go
@@ -118,9 +118,7 @@ func TestSearch(t *testing.T) {
setupLabels map[uint64]model.Labels
}
type whenDetail struct {
- searchLabel string
- eq bool
- searchValue string
+ matcher model.LabelFilter
}
type thenExpected struct {
want []uint64
@@ -140,9 +138,11 @@ func TestSearch(t *testing.T) {
},
},
WhenDetail: whenDetail{
- searchLabel: "category",
- searchValue: "tech",
- eq: true,
+ matcher: model.LabelFilter{
+ Label: "category",
+ Value: "tech",
+ Equal: true,
+ },
},
ThenExpected: thenExpected{
want: []uint64{1, 2},
@@ -159,9 +159,11 @@ func TestSearch(t *testing.T) {
},
},
WhenDetail: whenDetail{
- searchLabel: "invalid",
- searchValue: "value",
- eq: true,
+ matcher: model.LabelFilter{
+ Label: "invalid",
+ Value: "value",
+ Equal: true,
+ },
},
ThenExpected: thenExpected{
want: nil,
@@ -178,9 +180,11 @@ func TestSearch(t *testing.T) {
},
},
WhenDetail: whenDetail{
- searchLabel: "category",
- searchValue: "invalid",
- eq: true,
+ matcher: model.LabelFilter{
+ Label: "category",
+ Value: "invalid",
+ Equal: true,
+ },
},
ThenExpected: thenExpected{
want: nil,
@@ -200,9 +204,11 @@ func TestSearch(t *testing.T) {
},
},
WhenDetail: whenDetail{
- searchLabel: "category",
- searchValue: "tech",
- eq: false,
+ matcher: model.LabelFilter{
+ Label: "category",
+ Value: "tech",
+ Equal: false,
+ },
},
ThenExpected: thenExpected{
want: []uint64{2},
@@ -220,9 +226,11 @@ func TestSearch(t *testing.T) {
},
},
WhenDetail: whenDetail{
- searchLabel: "invalid",
- searchValue: "value",
- eq: false,
+ matcher: model.LabelFilter{
+ Label: "invalid",
+ Value: "value",
+ Equal: false,
+ },
},
ThenExpected: thenExpected{
want: []uint64{1, 2},
@@ -240,7 +248,7 @@ func TestSearch(t *testing.T) {
}
// When.
- result := idx.Search(context.Background(), tt.WhenDetail.searchLabel, tt.WhenDetail.eq, tt.WhenDetail.searchValue)
+ result := idx.Search(context.Background(), tt.WhenDetail.matcher)
// Then.
if tt.ThenExpected.want == nil {
diff --git a/pkg/storage/kv/kv.go b/pkg/storage/kv/kv.go
index 93ba024..caefce0 100644
--- a/pkg/storage/kv/kv.go
+++ b/pkg/storage/kv/kv.go
@@ -32,8 +32,8 @@ import (
// --- Interface code block ---
type Storage interface {
component.Component
- Get(ctx context.Context, key string) (string, error)
- Set(ctx context.Context, key string, value string, ttl time.Duration) error
+ Get(ctx context.Context, key []byte) ([]byte, error)
+ Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) error
}
var ErrNotFound = errors.New("not found")
@@ -137,7 +137,7 @@ func (k *kv) Close() error {
const bucket = "0"
-func (k *kv) Get(ctx context.Context, key string) (value string, err error) {
+func (k *kv) Get(ctx context.Context, key []byte) (value []byte, err error) {
ctx = telemetry.StartWith(ctx, append(k.TelemetryLabels(), telemetrymodel.KeyOperation, "Get")...)
defer func() {
telemetry.End(ctx, func() error {
@@ -157,22 +157,22 @@ func (k *kv) Get(ctx context.Context, key string) (value string, err error) {
})
switch {
case err == nil:
- return string(b), nil
+ return b, nil
case errors.Is(err, nutsdb.ErrNotFoundKey):
- return "", ErrNotFound
+ return nil, ErrNotFound
case strings.Contains(err.Error(), "key not found"):
- return "", ErrNotFound
+ return nil, ErrNotFound
default:
- return "", err
+ return nil, err
}
}
-func (k *kv) Set(ctx context.Context, key string, value string, ttl time.Duration) (err error) {
+func (k *kv) Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) (err error) {
ctx = telemetry.StartWith(ctx, append(k.TelemetryLabels(), telemetrymodel.KeyOperation, "Set")...)
defer func() { telemetry.End(ctx, err) }()
return k.db.Update(func(tx *nutsdb.Tx) error {
- return tx.Put(bucket, []byte(key), []byte(value), uint32(ttl.Seconds()))
+ return tx.Put(bucket, key, value, uint32(ttl.Seconds()))
})
}
@@ -180,13 +180,13 @@ type mockKV struct {
component.Mock
}
-func (m *mockKV) Get(ctx context.Context, key string) (string, error) {
+func (m *mockKV) Get(ctx context.Context, key []byte) ([]byte, error) {
args := m.Called(ctx, key)
- return args.String(0), args.Error(1)
+ return args.Get(0).([]byte), args.Error(1)
}
-func (m *mockKV) Set(ctx context.Context, key string, value string, ttl time.Duration) error {
+func (m *mockKV) Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) error {
args := m.Called(ctx, key, value, ttl)
return args.Error(0)
diff --git a/pkg/telemetry/log/log.go b/pkg/telemetry/log/log.go
index 3536f98..dbe2e03 100644
--- a/pkg/telemetry/log/log.go
+++ b/pkg/telemetry/log/log.go
@@ -27,6 +27,8 @@ import (
"github.com/pkg/errors"
slogdedup "github.com/veqryn/slog-dedup"
+
+ "github.com/glidea/zenfeed/pkg/model"
)
type Level string
@@ -187,7 +189,8 @@ func getStack(skip, depth int) string {
}
first = false
- b.WriteString(frame.Function)
+ fn := strings.TrimPrefix(frame.Function, model.Module) // no module prefix for zenfeed self.
+ b.WriteString(fn)
b.WriteByte(':')
b.WriteString(strconv.Itoa(frame.Line))
}
diff --git a/pkg/telemetry/server/server.go b/pkg/telemetry/server/server.go
new file mode 100644
index 0000000..0722b55
--- /dev/null
+++ b/pkg/telemetry/server/server.go
@@ -0,0 +1,137 @@
+// Copyright (C) 2025 wangyusong
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see
.
+
+package http
+
+import (
+ "net"
+ "net/http"
+ "net/http/pprof"
+
+ "github.com/pkg/errors"
+
+ "github.com/glidea/zenfeed/pkg/component"
+ "github.com/glidea/zenfeed/pkg/config"
+ telemetry "github.com/glidea/zenfeed/pkg/telemetry"
+ "github.com/glidea/zenfeed/pkg/telemetry/log"
+ "github.com/glidea/zenfeed/pkg/telemetry/metric"
+ telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model"
+)
+
+// --- Interface code block ---
+type Server interface {
+ component.Component
+}
+
+type Config struct {
+ Address string
+}
+
+func (c *Config) Validate() error {
+ if c.Address == "" {
+ c.Address = ":9090"
+ }
+ if _, _, err := net.SplitHostPort(c.Address); err != nil {
+ return errors.Wrap(err, "invalid address")
+ }
+
+ return nil
+}
+
+func (c *Config) From(app *config.App) *Config {
+ c.Address = app.Telemetry.Address
+
+ return c
+}
+
+type Dependencies struct {
+}
+
+// --- Factory code block ---
+type Factory component.Factory[Server, config.App, Dependencies]
+
+func NewFactory(mockOn ...component.MockOption) Factory {
+ if len(mockOn) > 0 {
+ return component.FactoryFunc[Server, config.App, Dependencies](
+ func(instance string, config *config.App, dependencies Dependencies) (Server, error) {
+ m := &mockServer{}
+ component.MockOptions(mockOn).Apply(&m.Mock)
+
+ return m, nil
+ },
+ )
+ }
+
+ return component.FactoryFunc[Server, config.App, Dependencies](new)
+}
+
+func new(instance string, app *config.App, dependencies Dependencies) (Server, error) {
+ config := &Config{}
+ config.From(app)
+ if err := config.Validate(); err != nil {
+ return nil, errors.Wrap(err, "validate config")
+ }
+
+ router := http.NewServeMux()
+ router.Handle("/health", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(200)
+ }))
+ router.Handle("/metrics", metric.Handler())
+ router.HandleFunc("/pprof", pprof.Index)
+ router.HandleFunc("/pprof/cmdline", pprof.Cmdline)
+ router.HandleFunc("/pprof/profile", pprof.Profile)
+ router.HandleFunc("/pprof/symbol", pprof.Symbol)
+ router.HandleFunc("/pprof/trace", pprof.Trace)
+
+ return &server{
+ Base: component.New(&component.BaseConfig[Config, Dependencies]{
+ Name: "TelemetryServer",
+ Instance: instance,
+ Config: config,
+ Dependencies: dependencies,
+ }),
+ http: &http.Server{Addr: config.Address, Handler: router},
+ }, nil
+}
+
+// --- Implementation code block ---
+type server struct {
+ *component.Base[Config, Dependencies]
+ http *http.Server
+}
+
+func (s *server) Run() (err error) {
+ ctx := telemetry.StartWith(s.Context(), append(s.TelemetryLabels(), telemetrymodel.KeyOperation, "Run")...)
+ defer func() { telemetry.End(ctx, err) }()
+
+ serverErr := make(chan error, 1)
+ go func() {
+ serverErr <- s.http.ListenAndServe()
+ }()
+
+ s.MarkReady()
+ select {
+ case <-ctx.Done():
+ log.Info(ctx, "shutting down")
+
+ return s.http.Shutdown(ctx)
+ case err := <-serverErr:
+ return errors.Wrap(err, "listen and serve")
+ }
+}
+
+type mockServer struct {
+ component.Mock
+}
diff --git a/pkg/util/crawl/crawl.go b/pkg/util/crawl/crawl.go
new file mode 100644
index 0000000..700e73d
--- /dev/null
+++ b/pkg/util/crawl/crawl.go
@@ -0,0 +1,176 @@
+package crawl
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "sync"
+
+ "github.com/pkg/errors"
+ "github.com/temoto/robotstxt"
+
+ "github.com/glidea/zenfeed/pkg/util/text_convert"
+)
+
+type Crawler interface {
+ Markdown(ctx context.Context, u string) ([]byte, error)
+}
+
+type local struct {
+ hc *http.Client
+
+ robotsDataCache sync.Map
+}
+
+func NewLocal() Crawler {
+ return &local{
+ hc: &http.Client{},
+ }
+}
+
+func (c *local) Markdown(ctx context.Context, u string) ([]byte, error) {
+ // Check if the page is allowed.
+ if err := c.checkAllowed(ctx, u); err != nil {
+ return nil, errors.Wrapf(err, "check robots.txt for %s", u)
+ }
+
+ // Prepare the request.
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
+ if err != nil {
+ return nil, errors.Wrapf(err, "create request for %s", u)
+ }
+ req.Header.Set("User-Agent", userAgent)
+
+ // Send the request.
+ resp, err := c.hc.Do(req)
+ if err != nil {
+ return nil, errors.Wrapf(err, "fetch %s", u)
+ }
+ defer resp.Body.Close()
+
+ // Parse the response.
+ if resp.StatusCode != http.StatusOK {
+ return nil, errors.Errorf("received non-200 status code %d from %s", resp.StatusCode, u)
+ }
+ bodyBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, errors.Wrapf(err, "read body from %s", u)
+ }
+
+ // Convert the body to markdown.
+ mdBytes, err := textconvert.HTMLToMarkdown(bodyBytes)
+ if err != nil {
+ return nil, errors.Wrap(err, "convert html to markdown")
+ }
+
+ return mdBytes, nil
+}
+
+const userAgent = "ZenFeed"
+
+func (c *local) checkAllowed(ctx context.Context, u string) error {
+ parsedURL, err := url.Parse(u)
+ if err != nil {
+ return errors.Wrapf(err, "parse url %s", u)
+ }
+
+ d, err := c.getRobotsData(ctx, parsedURL.Host)
+ if err != nil {
+ return errors.Wrapf(err, "check robots.txt for %s", parsedURL.Host)
+ }
+ if !d.TestAgent(parsedURL.Path, userAgent) {
+ return errors.Errorf("disallowed by robots.txt for %s", u)
+ }
+
+ return nil
+}
+
+// getRobotsData fetches and parses robots.txt for a given host.
+func (c *local) getRobotsData(ctx context.Context, host string) (*robotstxt.RobotsData, error) {
+ // Check the cache.
+ if data, found := c.robotsDataCache.Load(host); found {
+ return data.(*robotstxt.RobotsData), nil
+ }
+
+ // Prepare the request.
+ robotsURL := fmt.Sprintf("https://%s/robots.txt", host)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, robotsURL, nil)
+ if err != nil {
+ return nil, errors.Wrapf(err, "create request for %s", robotsURL)
+ }
+ req.Header.Set("User-Agent", userAgent)
+
+ // Send the request.
+ resp, err := c.hc.Do(req)
+ if err != nil {
+ return nil, errors.Wrapf(err, "fetch %s", robotsURL)
+ }
+ defer resp.Body.Close()
+
+ // Parse the response.
+ switch resp.StatusCode {
+ case http.StatusOK:
+ data, err := robotstxt.FromResponse(resp)
+ if err != nil {
+ return nil, errors.Wrapf(err, "parse robots.txt from %s", robotsURL)
+ }
+ c.robotsDataCache.Store(host, data)
+ return data, nil
+ case http.StatusNotFound:
+ data := &robotstxt.RobotsData{}
+ c.robotsDataCache.Store(host, data)
+ return data, nil
+ case http.StatusUnauthorized, http.StatusForbidden:
+ return nil, errors.Errorf("access to %s denied (status %d)", robotsURL, resp.StatusCode)
+ default:
+ return nil, errors.Errorf("unexpected status %d fetching %s", resp.StatusCode, robotsURL)
+ }
+}
+
+type jina struct {
+ hc *http.Client
+ token string
+}
+
+func NewJina(token string) Crawler {
+ return &jina{
+ hc: &http.Client{},
+
+ // If token is empty, will not affect to use, but rate limit will be lower.
+ // See https://jina.ai/api-dashboard/rate-limit.
+ token: token,
+ }
+}
+
+func (c *jina) Markdown(ctx context.Context, u string) ([]byte, error) {
+ proxyURL := fmt.Sprintf("https://r.jina.ai/%s", u)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, proxyURL, nil)
+ if err != nil {
+ return nil, errors.Wrapf(err, "create request for %s", u)
+ }
+
+ req.Header.Set("X-Engine", "browser")
+ req.Header.Set("X-Robots-Txt", userAgent)
+ if c.token != "" {
+ req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.token))
+ }
+
+ resp, err := c.hc.Do(req)
+ if err != nil {
+ return nil, errors.Wrapf(err, "fetch %s", proxyURL)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, errors.Errorf("received non-200 status code %d from %s", resp.StatusCode, proxyURL)
+ }
+
+ mdBytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, errors.Wrapf(err, "read body from %s", proxyURL)
+ }
+
+ return mdBytes, nil
+}
diff --git a/pkg/util/rpc/rpc.go b/pkg/util/jsonrpc/jsonrpc.go
similarity index 75%
rename from pkg/util/rpc/rpc.go
rename to pkg/util/jsonrpc/jsonrpc.go
index 98cac88..7e28496 100644
--- a/pkg/util/rpc/rpc.go
+++ b/pkg/util/jsonrpc/jsonrpc.go
@@ -13,39 +13,19 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see
.
-package rpc
+package jsonrpc
import (
"context"
"encoding/json"
"errors"
"net/http"
+
+ "github.com/glidea/zenfeed/pkg/api"
)
type Handler[Request any, Response any] func(ctx context.Context, req *Request) (*Response, error)
-var (
- ErrBadRequest = func(err error) Error { return newError(http.StatusBadRequest, err) }
- ErrNotFound = func(err error) Error { return newError(http.StatusNotFound, err) }
- ErrInternal = func(err error) Error { return newError(http.StatusInternalServerError, err) }
-)
-
-type Error struct {
- Code int `json:"code"`
- Message string `json:"message"`
-}
-
-func (e Error) Error() string {
- return e.Message
-}
-
-func newError(code int, err error) Error {
- return Error{
- Code: code,
- Message: err.Error(),
- }
-}
-
func API[Request any, Response any](handler Handler[Request, Response]) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
allowCORS(w)
@@ -65,11 +45,11 @@ func API[Request any, Response any](handler Handler[Request, Response]) http.Han
resp, err := handler(r.Context(), &req)
if err != nil {
- var rpcErr Error
- if errors.As(err, &rpcErr) {
+ var apiErr api.Error
+ if errors.As(err, &apiErr) {
w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(rpcErr.Code)
- _ = json.NewEncoder(w).Encode(rpcErr)
+ w.WriteHeader(apiErr.Code)
+ _ = json.NewEncoder(w).Encode(apiErr)
return
}
diff --git a/pkg/util/rpc/rpc_test.go b/pkg/util/jsonrpc/jsonrpc_test.go
similarity index 96%
rename from pkg/util/rpc/rpc_test.go
rename to pkg/util/jsonrpc/jsonrpc_test.go
index cc5e02a..e4c0672 100644
--- a/pkg/util/rpc/rpc_test.go
+++ b/pkg/util/jsonrpc/jsonrpc_test.go
@@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see
.
-package rpc
+package jsonrpc
import (
"bytes"
@@ -27,6 +27,7 @@ import (
. "github.com/onsi/gomega"
+ "github.com/glidea/zenfeed/pkg/api"
"github.com/glidea/zenfeed/pkg/test"
)
@@ -58,15 +59,15 @@ func TestAPI(t *testing.T) {
}
badRequestHandler := func(ctx context.Context, req *TestRequest) (*TestResponse, error) {
- return nil, ErrBadRequest(errors.New("invalid request"))
+ return nil, api.ErrBadRequest(errors.New("invalid request"))
}
notFoundHandler := func(ctx context.Context, req *TestRequest) (*TestResponse, error) {
- return nil, ErrNotFound(errors.New("resource not found"))
+ return nil, api.ErrNotFound(errors.New("resource not found"))
}
internalErrorHandler := func(ctx context.Context, req *TestRequest) (*TestResponse, error) {
- return nil, ErrInternal(errors.New("server error"))
+ return nil, api.ErrInternal(errors.New("server error"))
}
genericErrorHandler := func(ctx context.Context, req *TestRequest) (*TestResponse, error) {