Compare commits
14 Commits
v0.5.0
...
chore-insp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
158e07ac7e | ||
|
|
7cb8069d60 | ||
|
|
87b84d94ff | ||
|
|
4d29bae67f | ||
|
|
d640e975bd | ||
|
|
e4bd0ca43b | ||
|
|
8b001c4cdf | ||
|
|
6cacb47d3d | ||
|
|
a65d597032 | ||
|
|
151bd5f66f | ||
|
|
69a9545869 | ||
|
|
b01e07e348 | ||
|
|
e92d7e322e | ||
|
|
7b4396067b |
1
.github/FUNDING.yml
vendored
Normal file
1
.github/FUNDING.yml
vendored
Normal file
@@ -0,0 +1 @@
|
||||
custom: https://afdian.com/a/glidea
|
||||
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
@@ -5,6 +5,8 @@ on:
|
||||
branches: [ main, dev ]
|
||||
pull_request:
|
||||
branches: [ main ]
|
||||
release:
|
||||
types: [ published ]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@@ -25,7 +27,7 @@ jobs:
|
||||
build-and-push:
|
||||
runs-on: ubuntu-latest
|
||||
needs: test
|
||||
if: github.event_name == 'push'
|
||||
if: github.event_name == 'release' || (github.event_name == 'push' && github.ref_name == 'dev')
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Docker Buildx
|
||||
@@ -36,7 +38,7 @@ jobs:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
- name: Build and push Docker image (main)
|
||||
if: github.ref_name == 'main'
|
||||
if: github.event_name == 'release'
|
||||
run: make push
|
||||
- name: Build and push Docker image (dev)
|
||||
if: github.ref_name == 'dev'
|
||||
|
||||
204
BUG_FIXES.md
Normal file
204
BUG_FIXES.md
Normal file
@@ -0,0 +1,204 @@
|
||||
# Bug Fixes Report
|
||||
|
||||
This document summarizes the potential bugs found during the comprehensive code review and the fixes applied.
|
||||
|
||||
## Critical Bugs Fixed
|
||||
|
||||
### 1. Resource Leak in Notification Worker (CRITICAL - FIXED)
|
||||
|
||||
**File:** `pkg/notify/notify.go`
|
||||
**Lines:** 387-415
|
||||
**Severity:** Critical
|
||||
|
||||
**Problem:**
|
||||
The `sendWorker` function had `defer` statements inside an infinite loop:
|
||||
|
||||
```go
|
||||
func (n *notifier) sendWorker(i int) {
|
||||
for {
|
||||
select {
|
||||
case work := <-n.channelSendWork:
|
||||
defer func() { telemetry.End(workCtx, nil) }() // BUG!
|
||||
workCtx, cancel := context.WithTimeout(...)
|
||||
defer cancel() // BUG!
|
||||
// ...
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Issue Details:**
|
||||
- `defer` statements are executed when the function returns, NOT when the loop iteration ends
|
||||
- Each iteration creates a new context with timeout but never cancels it until the worker goroutine exits
|
||||
- This causes accumulation of:
|
||||
- Uncancelled contexts (memory leak)
|
||||
- Pending telemetry cleanup functions (memory leak)
|
||||
- Goroutines waiting on context timeouts (goroutine leak)
|
||||
|
||||
**Impact:**
|
||||
- Memory usage grows continuously over time
|
||||
- Goroutine count increases with each notification sent
|
||||
- Eventually leads to OOM or performance degradation
|
||||
- Production systems would crash under moderate load
|
||||
|
||||
**Fix:**
|
||||
Extracted the loop body into a separate function `processSendWork()`:
|
||||
|
||||
```go
|
||||
func (n *notifier) sendWorker(i int) {
|
||||
for {
|
||||
select {
|
||||
case <-n.Context().Done():
|
||||
return
|
||||
case work := <-n.channelSendWork:
|
||||
n.processSendWork(i, work)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) processSendWork(i int, work sendWork) {
|
||||
workCtx := telemetry.StartWith(...)
|
||||
defer func() { telemetry.End(workCtx, nil) }()
|
||||
|
||||
workCtx, cancel := context.WithTimeout(workCtx, 30*time.Second)
|
||||
defer cancel()
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
Now `defer` statements are properly executed at the end of each iteration.
|
||||
|
||||
---
|
||||
|
||||
### 2. Weak Random Number Generation (MEDIUM - FIXED)
|
||||
|
||||
**Files:**
|
||||
- `pkg/api/api.go` (line 22)
|
||||
- `pkg/util/time/time.go` (line 21)
|
||||
|
||||
**Severity:** Medium
|
||||
|
||||
**Problem:**
|
||||
Using `math/rand` instead of `math/rand/v2`:
|
||||
|
||||
```go
|
||||
import "math/rand"
|
||||
// ...
|
||||
feed.ID = rand.Uint64() // Not properly seeded in Go 1.20+
|
||||
```
|
||||
|
||||
**Issue Details:**
|
||||
- In Go 1.20+, `math/rand` requires manual seeding for randomness
|
||||
- Without seeding, the random number generator uses a fixed seed (1)
|
||||
- This results in predictable "random" IDs across different runs
|
||||
- Could lead to ID collisions when multiple instances start at the same time
|
||||
|
||||
**Impact:**
|
||||
- Feed IDs might collide between different instances
|
||||
- Security implications if IDs are used for any access control
|
||||
- Reproducible IDs could be exploited
|
||||
|
||||
**Fix:**
|
||||
Changed to use `math/rand/v2` which is automatically seeded:
|
||||
|
||||
```go
|
||||
import "math/rand/v2"
|
||||
```
|
||||
|
||||
The v2 package automatically seeds itself with a cryptographically random seed, eliminating the need for manual seeding.
|
||||
|
||||
---
|
||||
|
||||
## Potential Issues Identified (Not Fixed)
|
||||
|
||||
### 3. Potential Race Condition in Reload (LOW)
|
||||
|
||||
**File:** `pkg/notify/notify.go`
|
||||
**Lines:** 285-327
|
||||
**Severity:** Low
|
||||
|
||||
**Problem:**
|
||||
In the `Reload` method, the old router and channel are closed before acquiring the write lock, but concurrent workers might still be using them with read locks:
|
||||
|
||||
```go
|
||||
// Old router/channel still in use by workers with RLock
|
||||
if err := n.router.Close(); err != nil { // Close without lock
|
||||
log.Error(...)
|
||||
}
|
||||
|
||||
n.mu.Lock() // Lock acquired AFTER closing
|
||||
n.router = router
|
||||
n.mu.Unlock()
|
||||
```
|
||||
|
||||
**Potential Impact:**
|
||||
- Workers might call methods on closed router/channel
|
||||
- Could result in "use of closed network connection" or similar errors
|
||||
- Errors are logged, not fatal, so low severity
|
||||
|
||||
**Why Not Fixed:**
|
||||
- Low severity - errors are logged and handled gracefully
|
||||
- Requires architectural changes to the reload mechanism
|
||||
- Current design minimizes critical section duration
|
||||
- Reloads are infrequent operational events
|
||||
|
||||
**Recommendation:**
|
||||
Consider implementing a "drain and replace" pattern where:
|
||||
1. Mark old router/channel as "draining"
|
||||
2. Wait for in-flight operations to complete
|
||||
3. Then close and replace
|
||||
|
||||
---
|
||||
|
||||
### 4. Type Assertion Without Check (INFO)
|
||||
|
||||
**File:** `pkg/storage/feed/block/block.go`
|
||||
**Line:** 926
|
||||
**Severity:** Informational
|
||||
|
||||
**Code:**
|
||||
```go
|
||||
b.lastDataAccess.Load().(time.Time)
|
||||
```
|
||||
|
||||
**Analysis:**
|
||||
- Uses type assertion on atomic.Value without checking the type
|
||||
- However, `lastDataAccess` is always initialized with `time.Time` at line 414
|
||||
- The value is only stored as `time.Time` throughout the codebase
|
||||
- Therefore, the assertion is safe in practice
|
||||
|
||||
**Recommendation:**
|
||||
For better safety, consider using Go 1.19+'s typed atomic values:
|
||||
```go
|
||||
type atomicTime struct{ v atomic.Pointer[time.Time] }
|
||||
```
|
||||
|
||||
However, this is not critical as the current code is functionally correct.
|
||||
|
||||
---
|
||||
|
||||
## Code Quality Observations
|
||||
|
||||
### Good Practices Found:
|
||||
1. ✅ Consistent use of `defer` for resource cleanup
|
||||
2. ✅ Proper HTTP response body closing with `defer resp.Body.Close()`
|
||||
3. ✅ Context propagation throughout the codebase
|
||||
4. ✅ Comprehensive error wrapping with context
|
||||
5. ✅ Proper use of sync primitives (RWMutex, atomic, etc.)
|
||||
6. ✅ Channel closing handled in single location (main.go)
|
||||
7. ✅ Graceful shutdown with context cancellation
|
||||
|
||||
### Test Coverage:
|
||||
- Mock implementations follow standard testify/mock patterns
|
||||
- Intentional panics in mocks for early failure detection (correct design)
|
||||
|
||||
---
|
||||
|
||||
## Summary
|
||||
|
||||
**Bugs Fixed:** 2
|
||||
**Critical Issues:** 1 (Resource leak in notification worker)
|
||||
**Medium Issues:** 1 (Weak random number generation)
|
||||
**Potential Issues Noted:** 2 (Low severity, informational)
|
||||
|
||||
The most critical fix is the resource leak in the notification worker, which would cause production systems to crash under load. The random number generation fix improves security and prevents potential ID collisions.
|
||||
@@ -113,7 +113,7 @@ Give zenfeed a try! **AI + RSS** might be a better way to consume information in
|
||||
|
||||
> [!IMPORTANT]
|
||||
> zenfeed uses model services from [SiliconFlow](https://cloud.siliconflow.cn/en) by default.
|
||||
> * Models: `Qwen/Qwen3-8B` (Free) and `Pro/BAAI/bge-m3`.
|
||||
> * Models: `Qwen/Qwen3-8B` (Free) and `Qwen/Qwen3-Embedding-4B`.
|
||||
> * If you don't have a SiliconFlow account yet, use this [**invitation link**](https://cloud.siliconflow.cn/i/U2VS0Q5A) to get a **¥14** credit.
|
||||
> * If you need to use other providers or models, or for more detailed custom deployments, please refer to the [Configuration Documentation](https://github.com/glidea/zenfeed/blob/main/docs/config.md) to edit `docker-compose.yml`.
|
||||
|
||||
|
||||
33
README.md
33
README.md
@@ -1,3 +1,5 @@
|
||||
[Nano Banana🍌 公益站](https://image-generation.zenfeed.xyz/):集成 Twitter 热门 Prompt,轻松玩转各种姿势
|
||||
---
|
||||
[English](README-en.md)
|
||||
|
||||
---
|
||||
@@ -9,13 +11,7 @@
|
||||
* 面向开发者:一站式接入几乎所有AI应用开发需要用到的模型和API,一站式付费,统一接入。
|
||||
* 面向企业:管理与使用界面分离,一人管理,多人使用,降低中小企业使用AI的门槛和成本。
|
||||
|
||||
> 以下是我的个人感受
|
||||
> * 播客生成效果不错,产品设计也不错,能基于 RSS 内容生成,修改播客脚本... 比市面其它播客产品更全面
|
||||
> * 中转价格大多与官方一致,比如硅基 qwen3 8b 同样免费
|
||||
> * 支持 MCP,Agent 托管
|
||||
> * 拥抱开源,平台上各种 AI 应用都是开源的。可以看他们的 [开源账户](https://github.com/302ai)
|
||||
|
||||
看都看了,GitHub 一键登录 [注册一个](https://share.302.ai/mFS9MS) 试试吧!立即获得 1 美元额度
|
||||
GitHub 一键登录 [注册一个](https://share.302.ai/mFS9MS) 试试吧!立即获得 1 美元额度
|
||||
|
||||
---
|
||||
|
||||
@@ -41,6 +37,8 @@ zenfeed 是你的 <strong>AI 信息中枢</strong>。它既是<strong>智能 RSS
|
||||
<p align="center">
|
||||
<a href="https://zenfeed.xyz"><b>在线体验 (仅 RSS 阅读)</b></a>
|
||||
|
|
||||
<a href="https://github.com/xusonfan/zenfeedApp"><b>安卓版体验 (仅 RSS 阅读)</b></a>
|
||||
|
|
||||
<a href="docs/tech/hld-zh.md"><b>技术文档</b></a>
|
||||
|
|
||||
<a href="#-安装与使用"><b>快速开始</b></a>
|
||||
@@ -52,10 +50,19 @@ zenfeed 是你的 <strong>AI 信息中枢</strong>。它既是<strong>智能 RSS
|
||||
---
|
||||
|
||||
**epub2rss**: 把 epub 电子书转成每日更新一个章节的 RSS Feed,[join waitlist](https://epub2rss.pages.dev/)
|
||||
|
||||
**one-coffee**: 一款类似 syft,万物追踪的日报产品(差异点:支持播客等多模态;高质量信源,主攻 AI 领域)。下方加我微信加入 waitlist
|
||||
|
||||
---
|
||||
|
||||
**赞助项目可以领取 Gemini Key**
|
||||
|
||||
<a href="https://afdian.com/a/glidea"><img src="docs/images/sponsor.png" width="500"></a>
|
||||
<br/>
|
||||
<a href="https://afdian.com/a/glidea">赞助项目,支持发展</a>
|
||||
|
||||
---
|
||||
|
||||
## 💡 前言
|
||||
|
||||
RSS(简易信息聚合)诞生于 Web 1.0 时代,旨在解决信息分散的问题,让用户能在一个地方聚合、追踪多个网站的更新,无需频繁访问。它将网站更新以摘要形式推送给订阅者,便于快速获取信息。
|
||||
@@ -135,7 +142,8 @@ RSS(简易信息聚合)诞生于 Web 1.0 时代,旨在解决信息分散
|
||||
|
||||
> [!IMPORTANT]
|
||||
> zenfeed 默认使用 [硅基流动](https://cloud.siliconflow.cn/) 提供的模型服务。
|
||||
> * 模型: `Qwen/Qwen3-8B` (免费) 和 `Pro/BAAI/bge-m3`。
|
||||
> * 模型: `Qwen/Qwen3-8B` (免费) 和 `Qwen/Qwen3-Embedding-4B`。
|
||||
> * **!!!如果你愿意赞助本项目,将获赠一定额度的 Gemini 2.5 Pro/Flash!!! (见下方)**
|
||||
> * 如果你还没有硅基账号,使用 [**邀请链接**](https://cloud.siliconflow.cn/i/U2VS0Q5A) 可获得 **14 元** 赠送额度。
|
||||
> * 如果需要使用其他厂商或模型,或进行更详细的自定义部署,请参考 [配置文档](https://github.com/glidea/zenfeed/blob/main/docs/config-zh.md) 来编辑 `docker-compose.yml`。
|
||||
|
||||
@@ -171,6 +179,7 @@ $env:API_KEY = "sk-..."; docker-compose -p zenfeed up -d
|
||||
> * **安全提示:** zenfeed 尚无认证机制,将服务暴露到公网可能会泄露您的 `API_KEY`。请务必配置严格的安全组规则,仅对信任的 IP 开放访问。
|
||||
|
||||
### 3. 开始使用
|
||||
> 安卓版:https://github.com/xusonfan/zenfeedApp
|
||||
|
||||
#### 添加 RSS 订阅源
|
||||
|
||||
@@ -206,14 +215,14 @@ $env:API_KEY = "sk-..."; docker-compose -p zenfeed up -d
|
||||
<table>
|
||||
<tr>
|
||||
<td align="center">
|
||||
<img src="docs/images/wechat.png" alt="Wechat QR Code" width="150">
|
||||
<img src="docs/images/wechat.png" alt="Wechat QR Code" width="300">
|
||||
<br>
|
||||
<strong>加群讨论</strong>
|
||||
<strong>AI 学习交流社群</strong>
|
||||
</td>
|
||||
<td align="center">
|
||||
<img src="docs/images/sponsor.png" alt="Sponsor QR Code" width="150">
|
||||
<img src="docs/images/sponsor.png" width="500">
|
||||
<br>
|
||||
<strong>请杯咖啡 🧋</strong>
|
||||
<strong><a href="https://afdian.com/a/glidea">请杯奶茶 🧋</a></strong>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
@@ -60,7 +60,7 @@ configs:
|
||||
api_key: ${API_KEY:-your-api-key}
|
||||
- name: embed
|
||||
provider: siliconflow
|
||||
embedding_model: Pro/BAAI/bge-m3
|
||||
embedding_model: Qwen/Qwen3-Embedding-4B
|
||||
api_key: ${API_KEY:-your-api-key}
|
||||
scrape:
|
||||
rsshub_endpoint: http://rsshub:1200
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 201 KiB After Width: | Height: | Size: 897 KiB |
@@ -77,7 +77,7 @@ storage:
|
||||
- `speakers`: 定义播客的演讲者。
|
||||
- `name`: 演讲者的名字。
|
||||
- `role`: 演讲者的角色和人设,将影响脚本内容。
|
||||
- `voice`: 演讲者的声音。请参考 [Google Cloud TTS 文档](https://cloud.google.com/text-to-speech/docs/voices) 获取可用的声音名称(例如 `en-US-Standard-C`,`en-US-News-N`)。
|
||||
- `voice`: 演讲者的声音。请参考 [Gemini TTS 文档](https://ai.google.dev/gemini-api/docs/speech-generation#voices)。
|
||||
|
||||
**示例 `config.yaml`:**
|
||||
|
||||
@@ -85,20 +85,23 @@ storage:
|
||||
storage:
|
||||
feed:
|
||||
rewrites:
|
||||
- source_label: "content"
|
||||
label: "podcast_url"
|
||||
- source_label: content # 基于原文
|
||||
transform:
|
||||
to_podcast:
|
||||
llm: "openai-chat"
|
||||
tts_llm: "gemini-tts"
|
||||
transcript_additional_prompt: "使用中文回复"
|
||||
estimate_maximum_duration: 3m0s # 接近 3 分钟
|
||||
transcript_additional_prompt: 对话引人入胜,流畅自然,拒绝 AI 味,使用中文回复 # 脚本内容要求
|
||||
llm: xxxx # 负责生成脚本的 llm
|
||||
tts_llm: gemini-tts # 仅支持 gemini tts,推荐使用 https://github.com/glidea/one-balance 轮询
|
||||
speakers:
|
||||
- name: "主持人小雅"
|
||||
role: "一位经验丰富、声音甜美、风格活泼的科技播客主持人。擅长联系实际生活场景。"
|
||||
voice: "zh-CN-Standard-A" # 女声
|
||||
- name: "技术评论员老王"
|
||||
role: "一位对技术有深入见解、观点犀利的评论员,说话直接,偶尔有些愤世嫉俗。"
|
||||
voice: "zh-CN-Standard-B" # 男声
|
||||
- name: 小雅
|
||||
role: >-
|
||||
一位经验丰富、声音甜美、风格活泼的科技播客主持人。前财经记者、媒体人出身,因为工作原因长期关注科技行业,后来凭着热爱和出色的口才转行做了全职内容创作者。擅长从普通用户视角出发,把复杂的技术概念讲得生动有趣,是她发掘了老王,并把他‘骗’来一起做播客的‘始作俑者’。
|
||||
voice: Autonoe
|
||||
- name: 老王
|
||||
role: >-
|
||||
一位资深科技评论员,互联网老兵。亲身经历过中国互联网从草莽到巨头的全过程,当过程序员,做过产品经理,也创过业。因此他对行业的各种‘风口’和‘概念’有自己独到的、甚至有些刻薄的见解。观点犀利,一针见血,说话直接,热衷于给身边的一切产品挑刺。被‘忽悠’上了‘贼船’,表面上经常吐槽,但内心很享受这种分享观点的感觉。
|
||||
voice: Puck
|
||||
label: podcast_url
|
||||
```
|
||||
|
||||
配置完成后,Zenfeed 将在每次抓取到新文章时,自动执行上述流程。可以在通知模版中使用 podcast_url label,或 Web 中直接收听(Web 固定读取 podcast_url label,若使用别的名称则无法读取)
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"math/rand"
|
||||
"math/rand/v2"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"io"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -407,6 +408,9 @@ func (c *cached) String(ctx context.Context, messages []string) (string, error)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if strings.Trim(value, " \n\r\t") == "" {
|
||||
return "", errors.New("empty response") // Gemini may occur this.
|
||||
}
|
||||
|
||||
// TODO: reduce copies.
|
||||
if err = c.kvStorage.Set(ctx, []byte(keyStr), []byte(value), 65*time.Minute); err != nil {
|
||||
|
||||
@@ -390,30 +390,35 @@ func (n *notifier) sendWorker(i int) {
|
||||
case <-n.Context().Done():
|
||||
return
|
||||
case work := <-n.channelSendWork:
|
||||
workCtx := telemetry.StartWith(n.Context(),
|
||||
append(n.TelemetryLabels(),
|
||||
telemetrymodel.KeyOperation, "Run",
|
||||
"worker", i,
|
||||
"group", work.group.Name,
|
||||
"time", timeutil.Format(work.group.Time),
|
||||
"receiver", work.receiver.Name,
|
||||
)...,
|
||||
)
|
||||
defer func() { telemetry.End(workCtx, nil) }()
|
||||
|
||||
workCtx, cancel := context.WithTimeout(workCtx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := n.duplicateSend(workCtx, work); err != nil {
|
||||
log.Error(workCtx, err, "duplicate send")
|
||||
|
||||
continue
|
||||
}
|
||||
log.Info(workCtx, "send success")
|
||||
n.processSendWork(i, work)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) processSendWork(i int, work sendWork) {
|
||||
workCtx := telemetry.StartWith(n.Context(),
|
||||
append(n.TelemetryLabels(),
|
||||
telemetrymodel.KeyOperation, "Run",
|
||||
"worker", i,
|
||||
"group", work.group.Name,
|
||||
"time", timeutil.Format(work.group.Time),
|
||||
"receiver", work.receiver.Name,
|
||||
)...,
|
||||
)
|
||||
defer func() { telemetry.End(workCtx, nil) }()
|
||||
|
||||
workCtx, cancel := context.WithTimeout(workCtx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := n.duplicateSend(workCtx, work); err != nil {
|
||||
log.Error(workCtx, err, "duplicate send")
|
||||
|
||||
return
|
||||
}
|
||||
log.Info(workCtx, "send success")
|
||||
}
|
||||
|
||||
|
||||
func (n *notifier) duplicateSend(ctx context.Context, work sendWork) error {
|
||||
if n.isSent(ctx, work.group, work.receiver) { // Double check.
|
||||
return nil
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
@@ -47,11 +46,18 @@ type Config struct {
|
||||
Endpoint string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
Bucket string
|
||||
BucketURL string
|
||||
client *minio.Client
|
||||
|
||||
Bucket string
|
||||
BucketURL string
|
||||
bucketURL *url.URL
|
||||
}
|
||||
|
||||
func (c *Config) Validate() error {
|
||||
if c.Empty() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if c.Endpoint == "" {
|
||||
return errors.New("endpoint is required")
|
||||
}
|
||||
@@ -64,12 +70,26 @@ func (c *Config) Validate() error {
|
||||
if c.SecretAccessKey == "" {
|
||||
return errors.New("secret access key is required")
|
||||
}
|
||||
client, err := minio.New(c.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(c.AccessKeyID, c.SecretAccessKey, ""),
|
||||
Secure: true,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "new minio client")
|
||||
}
|
||||
c.client = client
|
||||
|
||||
if c.Bucket == "" {
|
||||
return errors.New("bucket is required")
|
||||
}
|
||||
if c.BucketURL == "" {
|
||||
return errors.New("bucket url is required")
|
||||
}
|
||||
u, err := url.Parse(c.BucketURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "parse public url")
|
||||
}
|
||||
c.bucketURL = u
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -86,6 +106,10 @@ func (c *Config) From(app *config.App) *Config {
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Config) Empty() bool {
|
||||
return c.Endpoint == "" && c.AccessKeyID == "" && c.SecretAccessKey == "" && c.Bucket == "" && c.BucketURL == ""
|
||||
}
|
||||
|
||||
type Dependencies struct{}
|
||||
|
||||
// --- Factory code block ---
|
||||
@@ -113,19 +137,6 @@ func new(instance string, app *config.App, dependencies Dependencies) (Storage,
|
||||
return nil, errors.Wrap(err, "validate config")
|
||||
}
|
||||
|
||||
client, err := minio.New(config.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(config.AccessKeyID, config.SecretAccessKey, ""),
|
||||
Secure: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "new minio client")
|
||||
}
|
||||
|
||||
u, err := url.Parse(config.BucketURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "parse public url")
|
||||
}
|
||||
|
||||
return &s3{
|
||||
Base: component.New(&component.BaseConfig[Config, Dependencies]{
|
||||
Name: "ObjectStorage",
|
||||
@@ -133,39 +144,40 @@ func new(instance string, app *config.App, dependencies Dependencies) (Storage,
|
||||
Config: config,
|
||||
Dependencies: dependencies,
|
||||
}),
|
||||
client: client,
|
||||
bucketURL: u,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// --- Implementation code block ---
|
||||
type s3 struct {
|
||||
*component.Base[Config, Dependencies]
|
||||
|
||||
client *minio.Client
|
||||
bucketURL *url.URL
|
||||
}
|
||||
|
||||
func (s *s3) Put(ctx context.Context, key string, body io.Reader, contentType string) (publicURL string, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(s.TelemetryLabels(), telemetrymodel.KeyOperation, "Put")...)
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
bucket := s.Config().Bucket
|
||||
config := s.Config()
|
||||
if config.Empty() {
|
||||
return "", errors.New("not configured")
|
||||
}
|
||||
|
||||
if _, err := s.client.PutObject(ctx, bucket, key, body, -1, minio.PutObjectOptions{
|
||||
if _, err := config.client.PutObject(ctx, config.Bucket, key, body, -1, minio.PutObjectOptions{
|
||||
ContentType: contentType,
|
||||
}); err != nil {
|
||||
return "", errors.Wrap(err, "put object")
|
||||
}
|
||||
|
||||
return s.bucketURL.JoinPath(key).String(), nil
|
||||
return config.bucketURL.JoinPath(key).String(), nil
|
||||
}
|
||||
|
||||
func (s *s3) Get(ctx context.Context, key string) (publicURL string, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(s.TelemetryLabels(), telemetrymodel.KeyOperation, "Get")...)
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
bucket := s.Config().Bucket
|
||||
config := s.Config()
|
||||
if config.Empty() {
|
||||
return "", errors.New("not configured")
|
||||
}
|
||||
|
||||
if _, err := s.client.StatObject(ctx, bucket, key, minio.StatObjectOptions{}); err != nil {
|
||||
if _, err := config.client.StatObject(ctx, config.Bucket, key, minio.StatObjectOptions{}); err != nil {
|
||||
errResponse := minio.ToErrorResponse(err)
|
||||
if errResponse.Code == minio.NoSuchKey {
|
||||
return "", ErrNotFound
|
||||
@@ -174,7 +186,7 @@ func (s *s3) Get(ctx context.Context, key string) (publicURL string, err error)
|
||||
return "", errors.Wrap(err, "stat object")
|
||||
}
|
||||
|
||||
return s.bucketURL.JoinPath(key).String(), nil
|
||||
return config.bucketURL.JoinPath(key).String(), nil
|
||||
}
|
||||
|
||||
func (s *s3) Reload(app *config.App) (err error) {
|
||||
@@ -187,29 +199,7 @@ func (s *s3) Reload(app *config.App) (err error) {
|
||||
return errors.Wrap(err, "validate config")
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(s.Config(), newConfig) {
|
||||
log.Debug(ctx, "object storage config not changed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
client, err := minio.New(newConfig.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(newConfig.AccessKeyID, newConfig.SecretAccessKey, ""),
|
||||
Secure: true,
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "new minio client")
|
||||
}
|
||||
|
||||
u, err := url.Parse(newConfig.BucketURL)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "parse public url")
|
||||
}
|
||||
|
||||
s.client = client
|
||||
s.bucketURL = u
|
||||
s.SetConfig(newConfig)
|
||||
|
||||
log.Info(ctx, "object storage reloaded")
|
||||
|
||||
return nil
|
||||
|
||||
@@ -18,7 +18,7 @@ package time
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"math/rand/v2"
|
||||
"time"
|
||||
_ "time/tzdata"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user