338 lines
8.9 KiB
Go
338 lines
8.9 KiB
Go
// Copyright (C) 2025 wangyusong
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package component
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/glidea/zenfeed/pkg/telemetry"
|
|
"github.com/glidea/zenfeed/pkg/telemetry/log"
|
|
telemetrymodel "github.com/glidea/zenfeed/pkg/telemetry/model"
|
|
)
|
|
|
|
// Global is the instance name for the global component.
|
|
const Global = "Global"
|
|
|
|
// Component is the interface for a component.
|
|
// It is used to start, stop and monitor a component.
|
|
// A component means it is runnable, has some async work to do.
|
|
// ALL exported biz structs MUST implement this interface.
|
|
type Component interface {
|
|
// Name returns the name of the component. e.g. "KVStorage".
|
|
// It SHOULD be unique between different components.
|
|
// It will used as the telemetry info. e.g. log, metrics, etc.
|
|
Name() string
|
|
// Instance returns the instance name of the component. e.g. "kvstorage-1".
|
|
// It SHOULD be unique between different instances of the same component.
|
|
// It will used as the telemetry info. e.g. log, metrics, etc.
|
|
Instance() string
|
|
// Run starts the component.
|
|
// It blocks until the component is closed.
|
|
// It MUST be called only once.
|
|
Run() (err error)
|
|
// Ready returns a channel that is closed when the component is ready.
|
|
// Returns a chan to notify the component is ready when Run is called.
|
|
Ready() (notify <-chan struct{})
|
|
// Close closes the component.
|
|
Close() (err error)
|
|
}
|
|
|
|
// Base is the base implementation of a component.
|
|
// It provides partial, default implementations of the Component interface.
|
|
// It SHOULD BE used as an embedded field in the actual component implementation.
|
|
type Base[Config any, Dependencies any] struct {
|
|
baseConfig *BaseConfig[Config, Dependencies]
|
|
telemetryLabels telemetry.Labels
|
|
mu sync.RWMutex
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
ch chan struct{}
|
|
}
|
|
|
|
type BaseConfig[Config any, Dependencies any] struct {
|
|
Name string
|
|
Instance string
|
|
AdditionalTelemetryLabels telemetry.Labels
|
|
Config *Config
|
|
Dependencies Dependencies
|
|
}
|
|
|
|
func New[Config any, Dependencies any](config *BaseConfig[Config, Dependencies]) *Base[Config, Dependencies] {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
ch := make(chan struct{})
|
|
telemetryLabels := telemetry.Labels{
|
|
telemetrymodel.KeyComponent, config.Name,
|
|
telemetrymodel.KeyComponentInstance, config.Instance,
|
|
}
|
|
telemetryLabels = append(telemetryLabels, config.AdditionalTelemetryLabels...)
|
|
|
|
return &Base[Config, Dependencies]{
|
|
telemetryLabels: telemetryLabels,
|
|
baseConfig: config,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
ch: ch,
|
|
}
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Name() string {
|
|
return c.baseConfig.Name
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Instance() string {
|
|
return c.baseConfig.Instance
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) TelemetryLabels() telemetry.Labels {
|
|
return c.telemetryLabels
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) TelemetryLabelsID() prometheus.Labels {
|
|
return prometheus.Labels{
|
|
telemetrymodel.KeyComponent: c.telemetryLabels.Get(telemetrymodel.KeyComponent).(string),
|
|
telemetrymodel.KeyComponentInstance: c.telemetryLabels.Get(telemetrymodel.KeyComponentInstance).(string),
|
|
}
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) TelemetryLabelsIDFields() []string {
|
|
return []string{
|
|
c.telemetryLabels.Get(telemetrymodel.KeyComponent).(string),
|
|
c.telemetryLabels.Get(telemetrymodel.KeyComponentInstance).(string),
|
|
}
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Config() *Config {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return c.baseConfig.Config
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) SetConfig(config *Config) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.baseConfig.Config = config
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Dependencies() Dependencies {
|
|
return c.baseConfig.Dependencies
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Context() context.Context {
|
|
return c.ctx
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Run() error {
|
|
c.MarkReady()
|
|
<-c.ctx.Done()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) MarkReady() {
|
|
close(c.ch)
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Ready() <-chan struct{} {
|
|
return c.ch
|
|
}
|
|
|
|
func (c *Base[Config, Dependencies]) Close() error {
|
|
c.cancel()
|
|
telemetry.CloseMetrics(c.TelemetryLabelsID())
|
|
log.Info(c.Context(), "component closed", c.TelemetryLabels()...)
|
|
|
|
return nil
|
|
}
|
|
|
|
type Factory[ComponentImpl Component, Config any, Dependencies any] interface {
|
|
New(instance string, config *Config, dependencies Dependencies) (ComponentImpl, error)
|
|
}
|
|
|
|
type FactoryFunc[ComponentImpl Component, Config any, Dependencies any] func(
|
|
instance string,
|
|
config *Config,
|
|
dependencies Dependencies,
|
|
) (ComponentImpl, error)
|
|
|
|
func (f FactoryFunc[ComponentImpl, Config, Dependencies]) New(
|
|
instance string,
|
|
config *Config,
|
|
dependencies Dependencies,
|
|
) (ComponentImpl, error) {
|
|
return f(instance, config, dependencies)
|
|
}
|
|
|
|
type Mock struct {
|
|
mock.Mock
|
|
}
|
|
|
|
func (m *Mock) Name() string {
|
|
return m.Called().String(0)
|
|
}
|
|
func (m *Mock) Instance() string {
|
|
return m.Called().String(0)
|
|
}
|
|
func (m *Mock) Run() error {
|
|
return m.Called().Error(0)
|
|
}
|
|
func (m *Mock) Ready() <-chan struct{} {
|
|
return m.Called().Get(0).(<-chan struct{})
|
|
}
|
|
func (m *Mock) Close() error {
|
|
return m.Called().Error(0)
|
|
}
|
|
|
|
type MockOption func(m *mock.Mock)
|
|
|
|
type MockOptions []MockOption
|
|
|
|
func (m MockOptions) Apply(mock *Mock) {
|
|
for _, opt := range m {
|
|
opt(&mock.Mock)
|
|
}
|
|
}
|
|
|
|
func RunUntilReady(waitCtx context.Context, component Component, timeout time.Duration) error {
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- component.Run()
|
|
}()
|
|
|
|
select {
|
|
case <-component.Ready():
|
|
log.Info(waitCtx, "component run and ready",
|
|
telemetrymodel.KeyComponent, component.Name(),
|
|
telemetrymodel.KeyComponentInstance, component.Instance(),
|
|
)
|
|
|
|
return nil
|
|
case err := <-errCh:
|
|
return err
|
|
case <-time.After(timeout):
|
|
return errors.New("component not ready after timeout")
|
|
case <-waitCtx.Done():
|
|
return waitCtx.Err()
|
|
}
|
|
}
|
|
|
|
type Group []Component
|
|
|
|
func Run(ctx context.Context, groups ...Group) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
// Start groups in order.
|
|
runningErrCh := make(chan error, 1)
|
|
for i, group := range groups {
|
|
if err := startGroup(ctx, group, runningErrCh); err != nil {
|
|
stopGroups(groups, i)
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
// All groups started successfully, wait for any component to fail or context to be canceled.
|
|
select {
|
|
case err := <-runningErrCh:
|
|
stopGroups(groups, len(groups)-1)
|
|
|
|
return err
|
|
|
|
case <-ctx.Done():
|
|
stopGroups(groups, len(groups)-1)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func startGroup(ctx context.Context, group Group, runningErrCh chan error) error {
|
|
gCtx := log.With(ctx, telemetrymodel.KeyComponent, "group")
|
|
log.Info(gCtx, "starting group", "components", len(group))
|
|
|
|
// Start all components in current group concurrently.
|
|
startComponents(gCtx, group, runningErrCh)
|
|
|
|
// Wait for all components to be ready or error.
|
|
return waitForGroupReady(ctx, group, runningErrCh)
|
|
}
|
|
|
|
func startComponents(ctx context.Context, group Group, runningErrCh chan error) {
|
|
for _, comp := range group {
|
|
go func(c Component) {
|
|
log.Info(ctx, "starting component",
|
|
telemetrymodel.KeyComponent, c.Name(),
|
|
telemetrymodel.KeyComponentInstance, c.Instance(),
|
|
)
|
|
if err := c.Run(); err != nil {
|
|
select {
|
|
case runningErrCh <- err:
|
|
default:
|
|
}
|
|
}
|
|
log.Info(ctx, "component exited",
|
|
telemetrymodel.KeyComponent, c.Name(),
|
|
telemetrymodel.KeyComponentInstance, c.Instance(),
|
|
)
|
|
}(comp)
|
|
}
|
|
}
|
|
|
|
func waitForGroupReady(ctx context.Context, group Group, runningErrCh chan error) error {
|
|
for _, comp := range group {
|
|
select {
|
|
case <-comp.Ready():
|
|
log.Info(ctx, "component run and ready",
|
|
telemetrymodel.KeyComponent, comp.Name(),
|
|
telemetrymodel.KeyComponentInstance, comp.Instance(),
|
|
)
|
|
case err := <-runningErrCh:
|
|
return err
|
|
case <-time.After(30 * time.Second):
|
|
return errors.New("not ready after 30 seconds")
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func stopGroups(groups []Group, runAt int) {
|
|
for i := runAt; i >= 0; i-- {
|
|
stopGroup(groups[i])
|
|
}
|
|
}
|
|
|
|
func stopGroup(group Group) {
|
|
var wg sync.WaitGroup
|
|
for _, comp := range group {
|
|
wg.Add(1)
|
|
go func(c Component) {
|
|
defer wg.Done()
|
|
_ = c.Close() // Ignore close error.
|
|
}(comp)
|
|
}
|
|
wg.Wait()
|
|
}
|