-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Implement Request and Response Policy Based Routing in Cluster Mode #3422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: load-balance-search-commands-to-shards
Are you sure you want to change the base?
Implement Request and Response Policy Based Routing in Cluster Mode #3422
Conversation
feat(routing): add internal request/response policy enums
* feat: load the policy table in cluster client * Remove comments
…or osscluster.go (#6) * centralize cluster command routing in osscluster_router.go and refactor osscluster.go * enalbe ci on all branches * Add debug prints * Add debug prints * FIX: deal with nil policy * FIX: fixing clusterClient process * chore(osscluster): simplify switch case * wip(command): ai generated clone method for commands * feat: implement response aggregator for Redis cluster commands * feat: implement response aggregator for Redis cluster commands * fix: solve concurrency errors * fix: solve concurrency errors * return MaxRedirects settings * remove locks from getCommandPolicy * Handle MOVED errors more robustly, remove cluster reloading at exectutions, ennsure better routing * Fix: supports Process hook test * Fix: remove response aggregation for single shard commands * Add more preformant type conversion for Cmd type * Add router logic into processPipeline --------- Co-authored-by: Nedyalko Dyakov <[email protected]>
…ce-search-commands-to-shards
} | ||
if result.cmd != nil && result.err == nil { | ||
// For MGET, extract individual values from the array result | ||
if strings.ToLower(cmd.Name()) == "mget" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this special case?
} | ||
|
||
// getCommandPolicy retrieves the routing policy for a command | ||
func (c *ClusterClient) getCommandPolicy(ctx context.Context, cmd Cmder) *routing.CommandPolicy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this will introduce a big overhead for each command execution.
We should fetch all policies during the connection handshake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: for the first stage we should use hard-coded policy manager that can be extended in the future to take into account the COMMAND
command output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@htemelski-redis 💡 Consider implementing a PolicyResolverConfig
type that users can override via the client options. This config should map module__command_name to metadata (policies, key requirements, etc.).
Set hardcoded defaults in the client options, but allow users to override policies per command as needed.
6e3b627
to
1b2eaa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting partial review for the aggregators.
// For MGET without policy, use keyed aggregator | ||
if cmdName == "mget" { | ||
return routing.NewDefaultAggregator(true) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are passing the cmd.Name()
in routing.NewResponseAggregator
this can be handler by it. If policy
is nil
for mget
, maybe the NewResponseAggregator
can accept a policy
and check the nil as well`.
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
if err != nil && a.firstErr == nil { | ||
a.firstErr = err | ||
return nil | ||
} | ||
if err == nil && !a.hasResult { | ||
a.result = result | ||
a.hasResult = true | ||
} | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of questions here:
- Should we return only the first observed error?
- Why are we overwriting the result?
- Can't we just have an atomic boolean
hasError
? - Same, if we can have atomic
hasResult
we can drop the mutex.
My questions and my idea is that if we are going to return on the first error, we can do this with atomics and skip the cpu cycle for the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the all succeed policy, we either return one of the replies if there is no error, or one of the errors if there's at least one
So
- Yes, returning only the first error is sufficient
- We are setting it only once
3/4. I feel that using atomics will overcomplicate the aggregators, plus there are some caveats to using them. I think we should try to maximize the compatibility of the library
func (a *OneSucceededAggregator) Add(result interface{}, err error) error { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
if err != nil && a.firstErr == nil { | ||
a.firstErr = err | ||
return nil | ||
} | ||
if err == nil && !a.hasResult { | ||
a.result = result | ||
a.hasResult = true | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with AllSucceededAggregator
. Maybe we can use atomics here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
return nil | ||
} | ||
if err == nil { | ||
a.sum += val |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, maybe we can use atomic.Int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-||-
} | ||
|
||
// AggMinAggregator returns the minimum numeric value from all shards. | ||
type AggMinAggregator struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at https://github.com/haraldrudell/parl , there is atomic min and atomic max implementations that we can also use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p.s. I do suggest copying only the needed implementation or using it as reference to reimplement, not including the whole dependency. Of course, mentioning the creator in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-||-
return nil, a.firstErr | ||
} | ||
if !a.hasResult { | ||
return nil, fmt.Errorf("redis: no valid results to aggregate for min operation") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract such errors in a separate file?
func (a *SpecialAggregator) Finish() (interface{}, error) { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
|
||
if a.aggregatorFunc != nil { | ||
return a.aggregatorFunc(a.results, a.errors) | ||
} | ||
// Default behavior: return first non-error result or first error | ||
for i, err := range a.errors { | ||
if err == nil { | ||
return a.results[i], nil | ||
} | ||
} | ||
if len(a.errors) > 0 { | ||
return nil, a.errors[0] | ||
} | ||
return nil, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think we should be able to control the priority here. I assume for some commands the errors will be with higher priority, for others - the results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be achieved using the aggregatorFunc?
// SetAggregatorFunc allows setting custom aggregation logic for special commands. | ||
func (a *SpecialAggregator) SetAggregatorFunc(fn func([]interface{}, []error) (interface{}, error)) { | ||
a.mu.Lock() | ||
defer a.mu.Unlock() | ||
a.aggregatorFunc = fn | ||
} | ||
|
||
// SpecialAggregatorRegistry holds custom aggregation functions for specific commands. | ||
var SpecialAggregatorRegistry = make(map[string]func([]interface{}, []error) (interface{}, error)) | ||
|
||
// RegisterSpecialAggregator registers a custom aggregation function for a command. | ||
func RegisterSpecialAggregator(cmdName string, fn func([]interface{}, []error) (interface{}, error)) { | ||
SpecialAggregatorRegistry[cmdName] = fn | ||
} | ||
|
||
// NewSpecialAggregator creates a special aggregator with command-specific logic if available. | ||
func NewSpecialAggregator(cmdName string) *SpecialAggregator { | ||
agg := &SpecialAggregator{} | ||
if fn, exists := SpecialAggregatorRegistry[cmdName]; exists { | ||
agg.SetAggregatorFunc(fn) | ||
} | ||
return agg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SetAggregatorFunc
is only used internally in this package, I assume it can be private if needed at all, see next comment.
func NewSpecialAggregator(cmdName string) *SpecialAggregator { | ||
agg := &SpecialAggregator{} | ||
if fn, exists := SpecialAggregatorRegistry[cmdName]; exists { | ||
agg.SetAggregatorFunc(fn) | ||
} | ||
return agg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func NewSpecialAggregator(cmdName string) *SpecialAggregator { | |
agg := &SpecialAggregator{} | |
if fn, exists := SpecialAggregatorRegistry[cmdName]; exists { | |
agg.SetAggregatorFunc(fn) | |
} | |
return agg | |
func NewSpecialAggregator(cmdName string) *SpecialAggregator { | |
fn := SpecialAggregatorRegistry[cmdName] | |
return &SpecialAggregator{aggregatorFunc: fn} |
I do think this should be doable and we are going to remove the need for SetAggregatorFunc
and therefore - locking and unlocking the mutex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting another partial review.
} | ||
|
||
func (p *CommandPolicy) CanBeUsedInPipeline() bool { | ||
return p.Request != ReqAllNodes && p.Request != ReqAllShards && p.Request != ReqMultiShard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about special
? Can it be used in a pipeline?
// ShardPicker chooses “one arbitrary shard” when the request_policy is | ||
// ReqDefault and the command has no keys. | ||
type ShardPicker interface { | ||
Next(total int) int // returns an index in [0,total) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are great, can we implement StaticShardPicker
or StickyShardPicker
that will always return the same shard. I do think this can be helpful for testing. This is not a blocker by any means.
return strconv.ParseBool(cmd.val) | ||
return strconv.ParseBool(cmd.Val()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this change needed?
if commandInfoTips != nil { | ||
if v, ok := commandInfoTips[requestPolicy]; ok { | ||
if p, err := routing.ParseRequestPolicy(v); err == nil { | ||
req = p | ||
} | ||
} | ||
if v, ok := commandInfoTips[responsePolicy]; ok { | ||
if p, err := routing.ParseResponsePolicy(v); err == nil { | ||
resp = p | ||
} | ||
} | ||
} | ||
tips := make(map[string]string, len(commandInfoTips)) | ||
for k, v := range commandInfoTips { | ||
if k == requestPolicy || k == responsePolicy { | ||
continue | ||
} | ||
tips[k] = v | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we do both of those in a single range over commandInfoTips
?
// ExtractCommandValue extracts the value from a command result using the fast enum-based approach | ||
func ExtractCommandValue(cmd interface{}) interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume all cases (types) for which interface{ Val() interface{} }
is used for extracting the value can be combined together?
return nil | ||
} | ||
|
||
func (cmd *MonitorCmd) Clone() Cmder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move this above the ExtractCommandValue
function
return nil | ||
} | ||
|
||
func (cmd *IntPointerSliceCmd) Clone() Cmder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's tricky here. do we need to return the same pointer or do we only want the value when cloning?
// cmdInfo will fetch and cache the command policies after the first execution | ||
func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo { | ||
cmdsInfo, err := c.cmdsInfoCache.Get(ctx) | ||
// Use a separate context that won't be canceled to ensure command info lookup | ||
// doesn't fail due to original context cancellation | ||
cmdInfoCtx := context.Background() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
most of the time the cmdInfo should be cached already, why don't we just use the c.context(ctx)
to determine if the original one (with it's timeout) be used or a Background context will be used when c.opt.ContextTimeoutEnabled
is false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final part of initial review
Overview:
- Let's use atomics when possible.
- Left questions related to the node selection and setting of values.
Overall the design of the solution looks good, would have to do an additional pass over the test files once this review is addressed.
Thank you both @ofekshenawa and @htemelski-redis!
func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error { | ||
policy := c.getCommandPolicy(ctx, cmd) | ||
|
||
switch { | ||
case policy != nil && policy.Request == routing.ReqAllNodes: | ||
return c.executeOnAllNodes(ctx, cmd, policy) | ||
case policy != nil && policy.Request == routing.ReqAllShards: | ||
return c.executeOnAllShards(ctx, cmd, policy) | ||
case policy != nil && policy.Request == routing.ReqMultiShard: | ||
return c.executeMultiShard(ctx, cmd, policy) | ||
case policy != nil && policy.Request == routing.ReqSpecial: | ||
return c.executeSpecialCommand(ctx, cmd, policy, node) | ||
default: | ||
return c.executeDefault(ctx, cmd, node) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error { | |
policy := c.getCommandPolicy(ctx, cmd) | |
switch { | |
case policy != nil && policy.Request == routing.ReqAllNodes: | |
return c.executeOnAllNodes(ctx, cmd, policy) | |
case policy != nil && policy.Request == routing.ReqAllShards: | |
return c.executeOnAllShards(ctx, cmd, policy) | |
case policy != nil && policy.Request == routing.ReqMultiShard: | |
return c.executeMultiShard(ctx, cmd, policy) | |
case policy != nil && policy.Request == routing.ReqSpecial: | |
return c.executeSpecialCommand(ctx, cmd, policy, node) | |
default: | |
return c.executeDefault(ctx, cmd, node) | |
} | |
} | |
func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error { | |
policy := c.getCommandPolicy(ctx, cmd) | |
if policy == nil { | |
return c.executeDefault(ctx, cmd, node) | |
} | |
switch policy.Request { | |
case routing.ReqAllNodes: | |
return c.executeOnAllNodes(ctx, cmd, policy) | |
case routing.ReqAllShards: | |
return c.executeOnAllShards(ctx, cmd, policy) | |
case routing.ReqMultiShard: | |
return c.executeMultiShard(ctx, cmd, policy) | |
case routing.ReqSpecial: | |
return c.executeSpecialCommand(ctx, cmd, policy, node) | |
default: | |
return c.executeDefault(ctx, cmd, node) | |
} | |
} |
if c.hasKeys(cmd) { | ||
// execute on key based shard | ||
return node.Client.Process(ctx, cmd) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that this node
servers the slot for the key
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the node should've been selected based on the slot osscluster.go:L1906
func (c *ClusterClient) cmdNode(
// execute on key based shard | ||
return node.Client.Process(ctx, cmd) | ||
} | ||
return c.executeOnArbitraryShard(ctx, cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it doesn't matter and there is already some node selected, why not use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two different ways of picking an arbitrary shard, either round robin or a random one
case CmdTypeKeyFlags: | ||
return NewKeyFlagsCmd(ctx, args...) | ||
case CmdTypeDuration: | ||
return NewDurationCmd(ctx, time.Second, args...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some CmdTypeDuration
commands do use time.Milisecond
as precision, see PExpireTime
for example. Shouldn't we use it here so we don't lose precision?
// Command executed successfully but value extraction failed | ||
// This is common for complex commands like CLUSTER SLOTS | ||
// The command already has its result set correctly, so just return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand that comment here. Why the value extraction returned nil
? Can we make sure the cmd has value set at least? If it doesn't, we may return a cmd with nil value and nil error, which doesn't make sense.
if c, ok := cmd.(*KeyValuesCmd); ok { | ||
// KeyValuesCmd needs a key string and values slice | ||
if key, ok := value.(string); ok { | ||
c.SetVal(key, []string{}) // Default empty values | ||
} | ||
} | ||
case CmdTypeZSliceWithKey: | ||
if c, ok := cmd.(*ZSliceWithKeyCmd); ok { | ||
// ZSliceWithKeyCmd needs a key string and Z slice | ||
if key, ok := value.(string); ok { | ||
c.SetVal(key, []Z{}) // Default empty Z slice | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we setting empty values here?
This PR introduces support for Redis COMMAND-based request_policy and response_policy routing for Redis commands when used in OSS Cluster client.
Key Additions:
Command Policy Loader: Parses and caches COMMAND metadata with routing/aggregation tips on first use.
Routing Engine Enhancements:
Implements support for all request policies: default(keyless), default(hashslot), all_shards, all_nodes, multi_shard, and special.
Response Aggregator: Combines multi-shard replies based on response_policy:
all_succeeded, one_succeeded, agg_sum, special, etc.
Includes custom handling for special commands like FT.CURSOR.
Raw Command Support: Policies are enforced on Client.Do(ctx, args...).