add rss & crawl & webhook
This commit is contained in:
@@ -26,7 +26,6 @@ import (
|
||||
"runtime"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -277,47 +276,20 @@ type QueryOptions struct {
|
||||
Query string
|
||||
Threshold float32
|
||||
LabelFilters []string
|
||||
labelFilters []LabelFilter
|
||||
labelFilters model.LabelFilters
|
||||
Limit int
|
||||
Start, End time.Time
|
||||
}
|
||||
|
||||
var (
|
||||
LabelFilterEqual = "="
|
||||
LabelFilterNotEqual = "!="
|
||||
|
||||
NewLabelFilter = func(key, value string, eq bool) string {
|
||||
if eq {
|
||||
return fmt.Sprintf("%s%s%s", key, LabelFilterEqual, value)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s%s%s", key, LabelFilterNotEqual, value)
|
||||
}
|
||||
|
||||
ParseLabelFilter = func(filter string) (LabelFilter, error) {
|
||||
eq := false
|
||||
parts := strings.Split(filter, LabelFilterNotEqual)
|
||||
if len(parts) != 2 {
|
||||
parts = strings.Split(filter, LabelFilterEqual)
|
||||
eq = true
|
||||
}
|
||||
if len(parts) != 2 {
|
||||
return LabelFilter{}, errors.New("invalid label filter")
|
||||
}
|
||||
|
||||
return LabelFilter{Label: parts[0], Value: parts[1], Equal: eq}, nil
|
||||
}
|
||||
)
|
||||
|
||||
func (q *QueryOptions) Validate() error { //nolint:cyclop
|
||||
if q.Threshold < 0 || q.Threshold > 1 {
|
||||
return errors.New("threshold must be between 0 and 1")
|
||||
}
|
||||
for _, labelFilter := range q.LabelFilters {
|
||||
if labelFilter == "" {
|
||||
for _, s := range q.LabelFilters {
|
||||
if s == "" {
|
||||
return errors.New("label filter is required")
|
||||
}
|
||||
filter, err := ParseLabelFilter(labelFilter)
|
||||
filter, err := model.NewLabelFilter(s)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "parse label filter")
|
||||
}
|
||||
@@ -368,13 +340,6 @@ func (q *QueryOptions) HitTimeRangeCondition(b Block) bool {
|
||||
return queryAsBase || blockAsBase
|
||||
}
|
||||
|
||||
// LabelFilter defines the matcher for an item.
|
||||
type LabelFilter struct {
|
||||
Label string
|
||||
Equal bool
|
||||
Value string
|
||||
}
|
||||
|
||||
// --- Factory code block ---
|
||||
type Factory component.Factory[Block, Config, Dependencies]
|
||||
|
||||
@@ -1228,14 +1193,14 @@ func (b *block) applyFilters(ctx context.Context, query *QueryOptions) (res filt
|
||||
return b.mergeFilterResults(labelsResult, vectorsResult), nil
|
||||
}
|
||||
|
||||
func (b *block) applyLabelFilters(ctx context.Context, filters []LabelFilter) filterResult {
|
||||
func (b *block) applyLabelFilters(ctx context.Context, filters model.LabelFilters) filterResult {
|
||||
if len(filters) == 0 {
|
||||
return matchedAllFilterResult
|
||||
}
|
||||
|
||||
var allIDs map[uint64]struct{}
|
||||
for _, filter := range filters {
|
||||
ids := b.invertedIndex.Search(ctx, filter.Label, filter.Equal, filter.Value)
|
||||
ids := b.invertedIndex.Search(ctx, filter)
|
||||
if len(ids) == 0 {
|
||||
return matchedNothingFilterResult
|
||||
}
|
||||
@@ -1317,7 +1282,7 @@ func (b *block) mergeFilterResults(x, y filterResult) filterResult {
|
||||
}
|
||||
|
||||
func (b *block) fillEmbedding(ctx context.Context, feeds []*model.Feed) ([]*chunk.Feed, error) {
|
||||
embedded := make([]*chunk.Feed, len(feeds))
|
||||
embedded := make([]*chunk.Feed, 0, len(feeds))
|
||||
llm := b.Dependencies().LLMFactory.Get(b.Config().embeddingLLM)
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
@@ -1336,16 +1301,21 @@ func (b *block) fillEmbedding(ctx context.Context, feeds []*model.Feed) ([]*chun
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
embedded[i] = &chunk.Feed{
|
||||
embedded = append(embedded, &chunk.Feed{
|
||||
Feed: feed,
|
||||
Vectors: vectors,
|
||||
}
|
||||
})
|
||||
mu.Unlock()
|
||||
}(i, feed)
|
||||
}
|
||||
wg.Wait()
|
||||
if len(errs) > 0 {
|
||||
return nil, errs[0]
|
||||
|
||||
switch len(errs) {
|
||||
case 0:
|
||||
case len(feeds):
|
||||
return nil, errs[0] // All failed.
|
||||
default:
|
||||
log.Error(ctx, errors.Wrap(errs[0], "fill embedding"), "error_count", len(errs))
|
||||
}
|
||||
|
||||
return embedded, nil
|
||||
|
||||
@@ -24,7 +24,7 @@ type Index interface {
|
||||
index.Codec
|
||||
|
||||
// Search returns item IDs matching the given label and value.
|
||||
Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{})
|
||||
Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{})
|
||||
// Add adds item to the index.
|
||||
// If label or value in labels is empty, it will be ignored.
|
||||
// If value is too long, it will be ignored,
|
||||
@@ -88,17 +88,17 @@ type idx struct {
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (idx *idx) Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{}) {
|
||||
func (idx *idx) Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{}) {
|
||||
ctx = telemetry.StartWith(ctx, append(idx.TelemetryLabels(), telemetrymodel.KeyOperation, "Search")...)
|
||||
defer func() { telemetry.End(ctx, nil) }()
|
||||
idx.mu.RLock()
|
||||
defer idx.mu.RUnlock()
|
||||
|
||||
if value == "" {
|
||||
return idx.searchEmptyValue(label, eq)
|
||||
if matcher.Value == "" {
|
||||
return idx.searchEmptyValue(matcher.Label, matcher.Equal)
|
||||
}
|
||||
|
||||
return idx.searchNonEmptyValue(label, eq, value)
|
||||
return idx.searchNonEmptyValue(matcher)
|
||||
}
|
||||
|
||||
func (idx *idx) Add(ctx context.Context, id uint64, labels model.Labels) {
|
||||
@@ -198,16 +198,16 @@ func (idx *idx) searchEmptyValue(label string, eq bool) map[uint64]struct{} {
|
||||
// searchNonEmptyValue handles the search logic when the target value is not empty.
|
||||
// If eq is true, it returns IDs that have the exact label-value pair.
|
||||
// If eq is false, it returns IDs that *do not* have the exact label-value pair.
|
||||
func (idx *idx) searchNonEmptyValue(label string, eq bool, value string) map[uint64]struct{} {
|
||||
func (idx *idx) searchNonEmptyValue(matcher model.LabelFilter) map[uint64]struct{} {
|
||||
// Get the map of values for the given label.
|
||||
values, labelExists := idx.m[label]
|
||||
values, labelExists := idx.m[matcher.Label]
|
||||
|
||||
// If equal (eq), find the exact match.
|
||||
if eq {
|
||||
if matcher.Equal {
|
||||
if !labelExists {
|
||||
return make(map[uint64]struct{}) // Label doesn't exist.
|
||||
}
|
||||
ids, valueExists := values[value]
|
||||
ids, valueExists := values[matcher.Value]
|
||||
if !valueExists {
|
||||
return make(map[uint64]struct{}) // Value doesn't exist for this label.
|
||||
}
|
||||
@@ -221,7 +221,7 @@ func (idx *idx) searchNonEmptyValue(label string, eq bool, value string) map[uin
|
||||
resultIDs := maps.Clone(idx.ids)
|
||||
if labelExists {
|
||||
// If the specific label-value pair exists, remove its associated IDs.
|
||||
if matchingIDs, valueExists := values[value]; valueExists {
|
||||
if matchingIDs, valueExists := values[matcher.Value]; valueExists {
|
||||
for id := range matchingIDs {
|
||||
delete(resultIDs, id)
|
||||
}
|
||||
@@ -413,8 +413,8 @@ type mockIndex struct {
|
||||
component.Mock
|
||||
}
|
||||
|
||||
func (m *mockIndex) Search(ctx context.Context, label string, eq bool, value string) (ids map[uint64]struct{}) {
|
||||
args := m.Called(ctx, label, eq, value)
|
||||
func (m *mockIndex) Search(ctx context.Context, matcher model.LabelFilter) (ids map[uint64]struct{}) {
|
||||
args := m.Called(ctx, matcher)
|
||||
|
||||
return args.Get(0).(map[uint64]struct{})
|
||||
}
|
||||
|
||||
@@ -118,9 +118,7 @@ func TestSearch(t *testing.T) {
|
||||
setupLabels map[uint64]model.Labels
|
||||
}
|
||||
type whenDetail struct {
|
||||
searchLabel string
|
||||
eq bool
|
||||
searchValue string
|
||||
matcher model.LabelFilter
|
||||
}
|
||||
type thenExpected struct {
|
||||
want []uint64
|
||||
@@ -140,9 +138,11 @@ func TestSearch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
WhenDetail: whenDetail{
|
||||
searchLabel: "category",
|
||||
searchValue: "tech",
|
||||
eq: true,
|
||||
matcher: model.LabelFilter{
|
||||
Label: "category",
|
||||
Value: "tech",
|
||||
Equal: true,
|
||||
},
|
||||
},
|
||||
ThenExpected: thenExpected{
|
||||
want: []uint64{1, 2},
|
||||
@@ -159,9 +159,11 @@ func TestSearch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
WhenDetail: whenDetail{
|
||||
searchLabel: "invalid",
|
||||
searchValue: "value",
|
||||
eq: true,
|
||||
matcher: model.LabelFilter{
|
||||
Label: "invalid",
|
||||
Value: "value",
|
||||
Equal: true,
|
||||
},
|
||||
},
|
||||
ThenExpected: thenExpected{
|
||||
want: nil,
|
||||
@@ -178,9 +180,11 @@ func TestSearch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
WhenDetail: whenDetail{
|
||||
searchLabel: "category",
|
||||
searchValue: "invalid",
|
||||
eq: true,
|
||||
matcher: model.LabelFilter{
|
||||
Label: "category",
|
||||
Value: "invalid",
|
||||
Equal: true,
|
||||
},
|
||||
},
|
||||
ThenExpected: thenExpected{
|
||||
want: nil,
|
||||
@@ -200,9 +204,11 @@ func TestSearch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
WhenDetail: whenDetail{
|
||||
searchLabel: "category",
|
||||
searchValue: "tech",
|
||||
eq: false,
|
||||
matcher: model.LabelFilter{
|
||||
Label: "category",
|
||||
Value: "tech",
|
||||
Equal: false,
|
||||
},
|
||||
},
|
||||
ThenExpected: thenExpected{
|
||||
want: []uint64{2},
|
||||
@@ -220,9 +226,11 @@ func TestSearch(t *testing.T) {
|
||||
},
|
||||
},
|
||||
WhenDetail: whenDetail{
|
||||
searchLabel: "invalid",
|
||||
searchValue: "value",
|
||||
eq: false,
|
||||
matcher: model.LabelFilter{
|
||||
Label: "invalid",
|
||||
Value: "value",
|
||||
Equal: false,
|
||||
},
|
||||
},
|
||||
ThenExpected: thenExpected{
|
||||
want: []uint64{1, 2},
|
||||
@@ -240,7 +248,7 @@ func TestSearch(t *testing.T) {
|
||||
}
|
||||
|
||||
// When.
|
||||
result := idx.Search(context.Background(), tt.WhenDetail.searchLabel, tt.WhenDetail.eq, tt.WhenDetail.searchValue)
|
||||
result := idx.Search(context.Background(), tt.WhenDetail.matcher)
|
||||
|
||||
// Then.
|
||||
if tt.ThenExpected.want == nil {
|
||||
|
||||
@@ -32,8 +32,8 @@ import (
|
||||
// --- Interface code block ---
|
||||
type Storage interface {
|
||||
component.Component
|
||||
Get(ctx context.Context, key string) (string, error)
|
||||
Set(ctx context.Context, key string, value string, ttl time.Duration) error
|
||||
Get(ctx context.Context, key []byte) ([]byte, error)
|
||||
Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) error
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
@@ -137,7 +137,7 @@ func (k *kv) Close() error {
|
||||
|
||||
const bucket = "0"
|
||||
|
||||
func (k *kv) Get(ctx context.Context, key string) (value string, err error) {
|
||||
func (k *kv) Get(ctx context.Context, key []byte) (value []byte, err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(k.TelemetryLabels(), telemetrymodel.KeyOperation, "Get")...)
|
||||
defer func() {
|
||||
telemetry.End(ctx, func() error {
|
||||
@@ -157,22 +157,22 @@ func (k *kv) Get(ctx context.Context, key string) (value string, err error) {
|
||||
})
|
||||
switch {
|
||||
case err == nil:
|
||||
return string(b), nil
|
||||
return b, nil
|
||||
case errors.Is(err, nutsdb.ErrNotFoundKey):
|
||||
return "", ErrNotFound
|
||||
return nil, ErrNotFound
|
||||
case strings.Contains(err.Error(), "key not found"):
|
||||
return "", ErrNotFound
|
||||
return nil, ErrNotFound
|
||||
default:
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (k *kv) Set(ctx context.Context, key string, value string, ttl time.Duration) (err error) {
|
||||
func (k *kv) Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) (err error) {
|
||||
ctx = telemetry.StartWith(ctx, append(k.TelemetryLabels(), telemetrymodel.KeyOperation, "Set")...)
|
||||
defer func() { telemetry.End(ctx, err) }()
|
||||
|
||||
return k.db.Update(func(tx *nutsdb.Tx) error {
|
||||
return tx.Put(bucket, []byte(key), []byte(value), uint32(ttl.Seconds()))
|
||||
return tx.Put(bucket, key, value, uint32(ttl.Seconds()))
|
||||
})
|
||||
}
|
||||
|
||||
@@ -180,13 +180,13 @@ type mockKV struct {
|
||||
component.Mock
|
||||
}
|
||||
|
||||
func (m *mockKV) Get(ctx context.Context, key string) (string, error) {
|
||||
func (m *mockKV) Get(ctx context.Context, key []byte) ([]byte, error) {
|
||||
args := m.Called(ctx, key)
|
||||
|
||||
return args.String(0), args.Error(1)
|
||||
return args.Get(0).([]byte), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *mockKV) Set(ctx context.Context, key string, value string, ttl time.Duration) error {
|
||||
func (m *mockKV) Set(ctx context.Context, key []byte, value []byte, ttl time.Duration) error {
|
||||
args := m.Called(ctx, key, value, ttl)
|
||||
|
||||
return args.Error(0)
|
||||
|
||||
Reference in New Issue
Block a user