Skip to content

Commit 2f9d902

Browse files
committed
fix(aggregate, search): ft.aggregate bugfixes (#3263)
* fix: rearange args for ft.aggregate apply should be before any groupby or sortby * improve test * wip: add scorer and addscores * enable all tests * fix ftsearch with count test * make linter happy * Addscores is available in later redisearch releases. For safety state it is available in redis ce 8 * load an apply seem to break scorer and addscores
1 parent 212ecde commit 2f9d902

File tree

2 files changed

+158
-25
lines changed

2 files changed

+158
-25
lines changed

search_commands.go

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,20 @@ type FTAggregateWithCursor struct {
240240
}
241241

242242
type FTAggregateOptions struct {
243-
Verbatim bool
244-
LoadAll bool
245-
Load []FTAggregateLoad
246-
Timeout int
247-
GroupBy []FTAggregateGroupBy
248-
SortBy []FTAggregateSortBy
249-
SortByMax int
243+
Verbatim bool
244+
LoadAll bool
245+
Load []FTAggregateLoad
246+
Timeout int
247+
GroupBy []FTAggregateGroupBy
248+
SortBy []FTAggregateSortBy
249+
SortByMax int
250+
// Scorer is used to set scoring function, if not set passed, a default will be used.
251+
// The default scorer depends on the Redis version:
252+
// - `BM25` for Redis >= 8
253+
// - `TFIDF` for Redis < 8
254+
Scorer string
255+
// AddScores is available in Redis CE 8
256+
AddScores bool
250257
Apply []FTAggregateApply
251258
LimitOffset int
252259
Limit int
@@ -483,6 +490,15 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
483490
if options.Verbatim {
484491
queryArgs = append(queryArgs, "VERBATIM")
485492
}
493+
494+
if options.Scorer != "" {
495+
queryArgs = append(queryArgs, "SCORER", options.Scorer)
496+
}
497+
498+
if options.AddScores {
499+
queryArgs = append(queryArgs, "ADDSCORES")
500+
}
501+
486502
if options.LoadAll && options.Load != nil {
487503
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
488504
}
@@ -498,9 +514,18 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
498514
}
499515
}
500516
}
517+
501518
if options.Timeout > 0 {
502519
queryArgs = append(queryArgs, "TIMEOUT", options.Timeout)
503520
}
521+
522+
for _, apply := range options.Apply {
523+
queryArgs = append(queryArgs, "APPLY", apply.Field)
524+
if apply.As != "" {
525+
queryArgs = append(queryArgs, "AS", apply.As)
526+
}
527+
}
528+
504529
if options.GroupBy != nil {
505530
for _, groupBy := range options.GroupBy {
506531
queryArgs = append(queryArgs, "GROUPBY", len(groupBy.Fields))
@@ -542,12 +567,6 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
542567
if options.SortByMax > 0 {
543568
queryArgs = append(queryArgs, "MAX", options.SortByMax)
544569
}
545-
for _, apply := range options.Apply {
546-
queryArgs = append(queryArgs, "APPLY", apply.Field)
547-
if apply.As != "" {
548-
queryArgs = append(queryArgs, "AS", apply.As)
549-
}
550-
}
551570
if options.LimitOffset > 0 {
552571
queryArgs = append(queryArgs, "LIMIT", options.LimitOffset)
553572
}
@@ -574,6 +593,7 @@ func FTAggregateQuery(query string, options *FTAggregateOptions) AggregateQuery
574593
queryArgs = append(queryArgs, key, value)
575594
}
576595
}
596+
577597
if options.DialectVersion > 0 {
578598
queryArgs = append(queryArgs, "DIALECT", options.DialectVersion)
579599
}
@@ -654,11 +674,12 @@ func (cmd *AggregateCmd) readReply(rd *proto.Reader) (err error) {
654674
data, err := rd.ReadSlice()
655675
if err != nil {
656676
cmd.err = err
657-
return nil
677+
return err
658678
}
659679
cmd.val, err = ProcessAggregateResult(data)
660680
if err != nil {
661681
cmd.err = err
682+
return err
662683
}
663684
return nil
664685
}
@@ -674,6 +695,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
674695
if options.Verbatim {
675696
args = append(args, "VERBATIM")
676697
}
698+
if options.Scorer != "" {
699+
args = append(args, "SCORER", options.Scorer)
700+
}
701+
if options.AddScores {
702+
args = append(args, "ADDSCORES")
703+
}
677704
if options.LoadAll && options.Load != nil {
678705
panic("FT.AGGREGATE: LOADALL and LOAD are mutually exclusive")
679706
}
@@ -692,6 +719,12 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
692719
if options.Timeout > 0 {
693720
args = append(args, "TIMEOUT", options.Timeout)
694721
}
722+
for _, apply := range options.Apply {
723+
args = append(args, "APPLY", apply.Field)
724+
if apply.As != "" {
725+
args = append(args, "AS", apply.As)
726+
}
727+
}
695728
if options.GroupBy != nil {
696729
for _, groupBy := range options.GroupBy {
697730
args = append(args, "GROUPBY", len(groupBy.Fields))
@@ -733,12 +766,6 @@ func (c cmdable) FTAggregateWithArgs(ctx context.Context, index string, query st
733766
if options.SortByMax > 0 {
734767
args = append(args, "MAX", options.SortByMax)
735768
}
736-
for _, apply := range options.Apply {
737-
args = append(args, "APPLY", apply.Field)
738-
if apply.As != "" {
739-
args = append(args, "AS", apply.As)
740-
}
741-
}
742769
if options.LimitOffset > 0 {
743770
args = append(args, "LIMIT", options.LimitOffset)
744771
}
@@ -1674,7 +1701,8 @@ func (cmd *FTSearchCmd) readReply(rd *proto.Reader) (err error) {
16741701

16751702
// FTSearch - Executes a search query on an index.
16761703
// The 'index' parameter specifies the index to search, and the 'query' parameter specifies the search query.
1677-
// For more information, please refer to the Redis documentation:
1704+
// For more information, please refer to the Redis documentation about [FT.SEARCH].
1705+
//
16781706
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
16791707
func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSearchCmd {
16801708
args := []interface{}{"FT.SEARCH", index, query}
@@ -1685,6 +1713,12 @@ func (c cmdable) FTSearch(ctx context.Context, index string, query string) *FTSe
16851713

16861714
type SearchQuery []interface{}
16871715

1716+
// FTSearchQuery - Executes a search query on an index with additional options.
1717+
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
1718+
// and the 'options' parameter specifies additional options for the search.
1719+
// For more information, please refer to the Redis documentation about [FT.SEARCH].
1720+
//
1721+
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
16881722
func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
16891723
queryArgs := []interface{}{query}
16901724
if options != nil {
@@ -1797,7 +1831,8 @@ func FTSearchQuery(query string, options *FTSearchOptions) SearchQuery {
17971831
// FTSearchWithArgs - Executes a search query on an index with additional options.
17981832
// The 'index' parameter specifies the index to search, the 'query' parameter specifies the search query,
17991833
// and the 'options' parameter specifies additional options for the search.
1800-
// For more information, please refer to the Redis documentation:
1834+
// For more information, please refer to the Redis documentation about [FT.SEARCH].
1835+
//
18011836
// [FT.SEARCH]: (https://redis.io/commands/ft.search/)
18021837
func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query string, options *FTSearchOptions) *FTSearchCmd {
18031838
args := []interface{}{"FT.SEARCH", index, query}
@@ -1889,7 +1924,7 @@ func (c cmdable) FTSearchWithArgs(ctx context.Context, index string, query strin
18891924
}
18901925
}
18911926
if options.SortByWithCount {
1892-
args = append(args, "WITHCOUT")
1927+
args = append(args, "WITHCOUNT")
18931928
}
18941929
}
18951930
if options.LimitOffset >= 0 && options.Limit > 0 {

search_test.go

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package redis_test
22

33
import (
44
"context"
5+
"fmt"
6+
"strconv"
57
"time"
68

79
. "github.com/bsm/ginkgo/v2"
@@ -127,8 +129,11 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
127129

128130
res3, err := client.FTSearchWithArgs(ctx, "num", "foo", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
129131
Expect(err).NotTo(HaveOccurred())
130-
Expect(res3.Total).To(BeEquivalentTo(int64(0)))
132+
Expect(res3.Total).To(BeEquivalentTo(int64(3)))
131133

134+
res4, err := client.FTSearchWithArgs(ctx, "num", "notpresentf00", &redis.FTSearchOptions{NoContent: true, SortBy: []redis.FTSearchSortBy{sortBy2}, SortByWithCount: true}).Result()
135+
Expect(err).NotTo(HaveOccurred())
136+
Expect(res4.Total).To(BeEquivalentTo(int64(0)))
132137
})
133138

134139
It("should FTCreate and FTSearch example", Label("search", "ftcreate", "ftsearch"), func() {
@@ -594,6 +599,100 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
594599
Expect(res.Rows[0].Fields["t2"]).To(BeEquivalentTo("world"))
595600
})
596601

602+
It("should FTAggregate with scorer and addscores", Label("search", "ftaggregate", "NonRedisEnterprise"), func() {
603+
SkipBeforeRedisMajor(8, "ADDSCORES is available in Redis CE 8")
604+
title := &redis.FieldSchema{FieldName: "title", FieldType: redis.SearchFieldTypeText, Sortable: false}
605+
description := &redis.FieldSchema{FieldName: "description", FieldType: redis.SearchFieldTypeText, Sortable: false}
606+
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{OnHash: true, Prefix: []interface{}{"product:"}}, title, description).Result()
607+
Expect(err).NotTo(HaveOccurred())
608+
Expect(val).To(BeEquivalentTo("OK"))
609+
WaitForIndexing(client, "idx1")
610+
611+
client.HSet(ctx, "product:1", "title", "New Gaming Laptop", "description", "this is not a desktop")
612+
client.HSet(ctx, "product:2", "title", "Super Old Not Gaming Laptop", "description", "this laptop is not a new laptop but it is a laptop")
613+
client.HSet(ctx, "product:3", "title", "Office PC", "description", "office desktop pc")
614+
615+
options := &redis.FTAggregateOptions{
616+
AddScores: true,
617+
Scorer: "BM25",
618+
SortBy: []redis.FTAggregateSortBy{{
619+
FieldName: "@__score",
620+
Desc: true,
621+
}},
622+
}
623+
624+
res, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", options).Result()
625+
Expect(err).NotTo(HaveOccurred())
626+
Expect(res).ToNot(BeNil())
627+
Expect(len(res.Rows)).To(BeEquivalentTo(2))
628+
score1, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[0].Fields["__score"]), 64)
629+
Expect(err).NotTo(HaveOccurred())
630+
score2, err := strconv.ParseFloat(fmt.Sprintf("%s", res.Rows[1].Fields["__score"]), 64)
631+
Expect(err).NotTo(HaveOccurred())
632+
Expect(score1).To(BeNumerically(">", score2))
633+
634+
optionsDM := &redis.FTAggregateOptions{
635+
AddScores: true,
636+
Scorer: "DISMAX",
637+
SortBy: []redis.FTAggregateSortBy{{
638+
FieldName: "@__score",
639+
Desc: true,
640+
}},
641+
}
642+
643+
resDM, err := client.FTAggregateWithArgs(ctx, "idx1", "laptop", optionsDM).Result()
644+
Expect(err).NotTo(HaveOccurred())
645+
Expect(resDM).ToNot(BeNil())
646+
Expect(len(resDM.Rows)).To(BeEquivalentTo(2))
647+
score1DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[0].Fields["__score"]), 64)
648+
Expect(err).NotTo(HaveOccurred())
649+
score2DM, err := strconv.ParseFloat(fmt.Sprintf("%s", resDM.Rows[1].Fields["__score"]), 64)
650+
Expect(err).NotTo(HaveOccurred())
651+
Expect(score1DM).To(BeNumerically(">", score2DM))
652+
653+
Expect(score1DM).To(BeEquivalentTo(float64(4)))
654+
Expect(score2DM).To(BeEquivalentTo(float64(1)))
655+
Expect(score1).NotTo(BeEquivalentTo(score1DM))
656+
Expect(score2).NotTo(BeEquivalentTo(score2DM))
657+
})
658+
659+
It("should FTAggregate apply and groupby", Label("search", "ftaggregate"), func() {
660+
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
661+
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
662+
val, err := client.FTCreate(ctx, "idx1", &redis.FTCreateOptions{}, text1, num1).Result()
663+
Expect(err).NotTo(HaveOccurred())
664+
Expect(val).To(BeEquivalentTo("OK"))
665+
WaitForIndexing(client, "idx1")
666+
667+
// 6 feb
668+
client.HSet(ctx, "doc1", "PrimaryKey", "9::362330", "CreatedDateTimeUTC", "1738823999")
669+
670+
// 12 feb
671+
client.HSet(ctx, "doc2", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739342399")
672+
client.HSet(ctx, "doc3", "PrimaryKey", "9::362329", "CreatedDateTimeUTC", "1739353199")
673+
674+
reducer := redis.FTAggregateReducer{Reducer: redis.SearchCount, As: "perDay"}
675+
676+
options := &redis.FTAggregateOptions{
677+
Apply: []redis.FTAggregateApply{{Field: "floor(@CreatedDateTimeUTC /(60*60*24))", As: "TimestampAsDay"}},
678+
GroupBy: []redis.FTAggregateGroupBy{{
679+
Fields: []interface{}{"@TimestampAsDay"},
680+
Reduce: []redis.FTAggregateReducer{reducer},
681+
}},
682+
SortBy: []redis.FTAggregateSortBy{{
683+
FieldName: "@perDay",
684+
Desc: true,
685+
}},
686+
}
687+
688+
res, err := client.FTAggregateWithArgs(ctx, "idx1", "*", options).Result()
689+
Expect(err).NotTo(HaveOccurred())
690+
Expect(res).ToNot(BeNil())
691+
Expect(len(res.Rows)).To(BeEquivalentTo(2))
692+
Expect(res.Rows[0].Fields["perDay"]).To(BeEquivalentTo("2"))
693+
Expect(res.Rows[1].Fields["perDay"]).To(BeEquivalentTo("1"))
694+
})
695+
597696
It("should FTAggregate apply", Label("search", "ftaggregate"), func() {
598697
text1 := &redis.FieldSchema{FieldName: "PrimaryKey", FieldType: redis.SearchFieldTypeText, Sortable: true}
599698
num1 := &redis.FieldSchema{FieldName: "CreatedDateTimeUTC", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}
@@ -638,7 +737,6 @@ var _ = Describe("RediSearch commands Resp 2", Label("search"), func() {
638737
Expect(res.Rows[0].Fields["age"]).To(BeEquivalentTo("19"))
639738
Expect(res.Rows[1].Fields["age"]).To(BeEquivalentTo("25"))
640739
}
641-
642740
})
643741

644742
It("should FTSearch SkipInitialScan", Label("search", "ftsearch"), func() {

0 commit comments

Comments
 (0)