Skip to content

Commit 10121e9

Browse files
authored
feat(osscluster): Support subscriptions against cluster slave nodes (#3480)
1 parent 6f41b60 commit 10121e9

File tree

2 files changed

+186
-4
lines changed

2 files changed

+186
-4
lines changed

osscluster.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,14 +1834,33 @@ func (c *ClusterClient) pubSub() *PubSub {
18341834
}
18351835

18361836
var err error
1837+
18371838
if len(channels) > 0 {
18381839
slot := hashtag.Slot(channels[0])
1839-
node, err = c.slotMasterNode(ctx, slot)
1840+
1841+
// newConn in PubSub is only used for subscription connections, so it is safe to
1842+
// assume that a slave node can always be used when client options specify ReadOnly.
1843+
if c.opt.ReadOnly {
1844+
state, err := c.state.Get(ctx)
1845+
if err != nil {
1846+
return nil, err
1847+
}
1848+
1849+
node, err = c.slotReadOnlyNode(state, slot)
1850+
if err != nil {
1851+
return nil, err
1852+
}
1853+
} else {
1854+
node, err = c.slotMasterNode(ctx, slot)
1855+
if err != nil {
1856+
return nil, err
1857+
}
1858+
}
18401859
} else {
18411860
node, err = c.nodes.Random()
1842-
}
1843-
if err != nil {
1844-
return nil, err
1861+
if err != nil {
1862+
return nil, err
1863+
}
18451864
}
18461865

18471866
cn, err := node.Client.newConn(context.TODO())

osscluster_test.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
. "github.com/bsm/ginkgo/v2"
@@ -644,6 +645,87 @@ var _ = Describe("ClusterClient", func() {
644645
}, 30*time.Second).ShouldNot(HaveOccurred())
645646
})
646647

648+
It("supports PubSub with ReadOnly option", func() {
649+
opt = redisClusterOptions()
650+
opt.ReadOnly = true
651+
client = cluster.newClusterClient(ctx, opt)
652+
653+
pubsub := client.Subscribe(ctx, "mychannel")
654+
defer pubsub.Close()
655+
656+
Eventually(func() error {
657+
var masterPubsubChannels atomic.Int64
658+
var slavePubsubChannels atomic.Int64
659+
660+
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
661+
info := master.InfoMap(ctx, "stats")
662+
if info.Err() != nil {
663+
return info.Err()
664+
}
665+
666+
pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
667+
if err != nil {
668+
return err
669+
}
670+
671+
masterPubsubChannels.Add(int64(pc))
672+
673+
return nil
674+
})
675+
if err != nil {
676+
return err
677+
}
678+
679+
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
680+
info := slave.InfoMap(ctx, "stats")
681+
if info.Err() != nil {
682+
return info.Err()
683+
}
684+
685+
pc, err := strconv.Atoi(info.Item("Stats", "pubsub_channels"))
686+
if err != nil {
687+
return err
688+
}
689+
690+
slavePubsubChannels.Add(int64(pc))
691+
692+
return nil
693+
})
694+
if err != nil {
695+
return err
696+
}
697+
698+
if c := masterPubsubChannels.Load(); c != int64(0) {
699+
return fmt.Errorf("total master pubsub_channels is %d; expected 0", c)
700+
}
701+
702+
if c := slavePubsubChannels.Load(); c != int64(1) {
703+
return fmt.Errorf("total slave pubsub_channels is %d; expected 1", c)
704+
}
705+
706+
return nil
707+
}, 30*time.Second).ShouldNot(HaveOccurred())
708+
709+
Eventually(func() error {
710+
_, err := client.Publish(ctx, "mychannel", "hello").Result()
711+
if err != nil {
712+
return err
713+
}
714+
715+
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
716+
if err != nil {
717+
return err
718+
}
719+
720+
_, ok := msg.(*redis.Message)
721+
if !ok {
722+
return fmt.Errorf("got %T, wanted *redis.Message", msg)
723+
}
724+
725+
return nil
726+
}, 30*time.Second).ShouldNot(HaveOccurred())
727+
})
728+
647729
It("supports sharded PubSub", func() {
648730
pubsub := client.SSubscribe(ctx, "mychannel")
649731
defer pubsub.Close()
@@ -668,6 +750,87 @@ var _ = Describe("ClusterClient", func() {
668750
}, 30*time.Second).ShouldNot(HaveOccurred())
669751
})
670752

753+
It("supports sharded PubSub with ReadOnly option", func() {
754+
opt = redisClusterOptions()
755+
opt.ReadOnly = true
756+
client = cluster.newClusterClient(ctx, opt)
757+
758+
pubsub := client.SSubscribe(ctx, "mychannel")
759+
defer pubsub.Close()
760+
761+
Eventually(func() error {
762+
var masterPubsubShardChannels atomic.Int64
763+
var slavePubsubShardChannels atomic.Int64
764+
765+
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
766+
info := master.InfoMap(ctx, "stats")
767+
if info.Err() != nil {
768+
return info.Err()
769+
}
770+
771+
pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
772+
if err != nil {
773+
return err
774+
}
775+
776+
masterPubsubShardChannels.Add(int64(pc))
777+
778+
return nil
779+
})
780+
if err != nil {
781+
return err
782+
}
783+
784+
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
785+
info := slave.InfoMap(ctx, "stats")
786+
if info.Err() != nil {
787+
return info.Err()
788+
}
789+
790+
pc, err := strconv.Atoi(info.Item("Stats", "pubsubshard_channels"))
791+
if err != nil {
792+
return err
793+
}
794+
795+
slavePubsubShardChannels.Add(int64(pc))
796+
797+
return nil
798+
})
799+
if err != nil {
800+
return err
801+
}
802+
803+
if c := masterPubsubShardChannels.Load(); c != int64(0) {
804+
return fmt.Errorf("total master pubsubshard_channels is %d; expected 0", c)
805+
}
806+
807+
if c := slavePubsubShardChannels.Load(); c != int64(1) {
808+
return fmt.Errorf("total slave pubsubshard_channels is %d; expected 1", c)
809+
}
810+
811+
return nil
812+
}, 30*time.Second).ShouldNot(HaveOccurred())
813+
814+
Eventually(func() error {
815+
_, err := client.SPublish(ctx, "mychannel", "hello").Result()
816+
if err != nil {
817+
return err
818+
}
819+
820+
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
821+
if err != nil {
822+
return err
823+
}
824+
825+
_, ok := msg.(*redis.Message)
826+
if !ok {
827+
return fmt.Errorf("got %T, wanted *redis.Message", msg)
828+
}
829+
830+
return nil
831+
}, 30*time.Second).ShouldNot(HaveOccurred())
832+
})
833+
671834
It("supports PubSub.Ping without channels", func() {
672835
pubsub := client.Subscribe(ctx)
673836
defer pubsub.Close()

0 commit comments

Comments
 (0)