From 02ed4e10080253ed60db1d6add7ab06b957003ec Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Thu, 14 Nov 2024 12:19:14 -0800 Subject: [PATCH 1/3] fix: Fixed issue with context cancelled error leading to connection spikes on Master --- error.go | 9 +++++++++ osscluster.go | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/error.go b/error.go index 9b348193a..a7bf159c2 100644 --- a/error.go +++ b/error.go @@ -38,6 +38,15 @@ type Error interface { var _ Error = proto.RedisError("") +func isContextError(err error) bool { + switch err { + case context.Canceled, context.DeadlineExceeded: + return true + default: + return false + } +} + func shouldRetry(err error, retryTimeout bool) bool { switch err { case io.EOF, io.ErrUnexpectedEOF: diff --git a/osscluster.go b/osscluster.go index 517fbd450..1e9ee7de4 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1350,7 +1350,9 @@ func (c *ClusterClient) processPipelineNode( _ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error { cn, err := node.Client.getConn(ctx) if err != nil { - node.MarkAsFailing() + if !isContextError(err) { + node.MarkAsFailing() + } _ = c.mapCmdsByNode(ctx, failedCmds, cmds) setCmdsErr(cmds, err) return err From 6559130eea0dbcb4a47e1f8b5943fc3acd12e1eb Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Fri, 21 Feb 2025 16:30:41 -0800 Subject: [PATCH 2/3] fix: Added tests --- osscluster_test.go | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/osscluster_test.go b/osscluster_test.go index 93ee464f3..4d96af73a 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -556,6 +556,38 @@ var _ = Describe("ClusterClient", func() { AfterEach(func() {}) assertPipeline() + + It("doesn't fail node with context.Canceled error", func() { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + pipe.Set(ctx, "A", "A_value", 0) + cmds, err := pipe.Exec(ctx) + Expect(cmds).To(HaveLen(1)) + Expect(err).To(Equal(context.Canceled)) + + clientNodes, _ := client.Nodes(ctx, "A") + + for _, node := range clientNodes { + Expect(node.Failing()).To(BeFalse()) + } + }) + + It("doesn't fail node with context.DeadlineExceeded error", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond) + defer cancel() + + pipe.Set(ctx, "A", "A_value", 0) + _, err := pipe.Exec(ctx) + + Expect(err).To(HaveOccurred()) + Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue()) + + clientNodes, _ := client.Nodes(ctx, "A") + + for _, node := range clientNodes { + Expect(node.Failing()).To(BeFalse()) + } + }) }) Describe("with TxPipeline", func() { From 99cea39aec3cc65e9174f78c4124e6aae0c357a5 Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Fri, 21 Feb 2025 16:35:50 -0800 Subject: [PATCH 3/3] fix: Updated tests --- osscluster_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/osscluster_test.go b/osscluster_test.go index 4d96af73a..cb6470337 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -561,9 +561,10 @@ var _ = Describe("ClusterClient", func() { ctx, cancel := context.WithCancel(context.Background()) cancel() pipe.Set(ctx, "A", "A_value", 0) - cmds, err := pipe.Exec(ctx) - Expect(cmds).To(HaveLen(1)) - Expect(err).To(Equal(context.Canceled)) + _, err := pipe.Exec(ctx) + + Expect(err).To(HaveOccurred()) + Expect(errors.Is(err, context.Canceled)).To(BeTrue()) clientNodes, _ := client.Nodes(ctx, "A")