Skip to content
144 changes: 97 additions & 47 deletions redis/_parsers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from typing import Awaitable, Callable, List, Optional, Protocol, Union

from redis.maintenance_events import (
MaintenanceEvent,
NodeFailedOverEvent,
NodeFailingOverEvent,
NodeMigratedEvent,
NodeMigratingEvent,
NodeMovingEvent,
Expand Down Expand Up @@ -167,20 +170,76 @@ async def read_response(
raise NotImplementedError()


_INVALIDATION_MESSAGE = (b"invalidate", "invalidate")
_MOVING_MESSAGE = (b"MOVING", "MOVING")
_MIGRATING_MESSAGE = (b"MIGRATING", "MIGRATING")
_MIGRATED_MESSAGE = (b"MIGRATED", "MIGRATED")
_FAILING_OVER_MESSAGE = (b"FAILING_OVER", "FAILING_OVER")
_FAILED_OVER_MESSAGE = (b"FAILED_OVER", "FAILED_OVER")
class MaintenanceNotificationsParser:
"""Protocol defining maintenance push notification parsing functionality"""

@staticmethod
def parse_maintenance_start_msg(response, notification_type):
# Expected message format is: <event_type> <seq_number> <time>
id = response[1]
ttl = response[2]
return notification_type(id, ttl)

@staticmethod
def parse_maintenance_completed_msg(response, notification_type):
# Expected message format is: <event_type> <seq_number>
id = response[1]
return notification_type(id)

@staticmethod
def parse_moving_msg(response):
# Expected message format is: MOVING <seq_number> <time> <endpoint>
id = response[1]
ttl = response[2]
if response[3] in [b"null", "null"]:
host, port = None, None
else:
value = response[3]
if isinstance(value, bytes):
value = value.decode()
host, port = value.split(":")
port = int(port) if port is not None else None

return NodeMovingEvent(id, host, port, ttl)


_INVALIDATION_MESSAGE = "invalidate"
_MOVING_MESSAGE = "MOVING"
_MIGRATING_MESSAGE = "MIGRATING"
_MIGRATED_MESSAGE = "MIGRATED"
_FAILING_OVER_MESSAGE = "FAILING_OVER"
_FAILED_OVER_MESSAGE = "FAILED_OVER"

_MAINTENANCE_MESSAGES = (
*_MIGRATING_MESSAGE,
*_MIGRATED_MESSAGE,
*_FAILING_OVER_MESSAGE,
*_FAILED_OVER_MESSAGE,
_MIGRATING_MESSAGE,
_MIGRATED_MESSAGE,
_FAILING_OVER_MESSAGE,
_FAILED_OVER_MESSAGE,
)

MSG_TYPE_TO_EVENT_PARSER_MAPPING: dict[str, tuple[type[MaintenanceEvent], Callable]] = {
_MIGRATING_MESSAGE: (
NodeMigratingEvent,
MaintenanceNotificationsParser.parse_maintenance_start_msg,
),
_MIGRATED_MESSAGE: (
NodeMigratedEvent,
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
),
_FAILING_OVER_MESSAGE: (
NodeFailingOverEvent,
MaintenanceNotificationsParser.parse_maintenance_start_msg,
),
_FAILED_OVER_MESSAGE: (
NodeFailedOverEvent,
MaintenanceNotificationsParser.parse_maintenance_completed_msg,
),
_MOVING_MESSAGE: (
NodeMovingEvent,
MaintenanceNotificationsParser.parse_moving_msg,
),
}


class PushNotificationsParser(Protocol):
"""Protocol defining RESP3-specific parsing functionality"""
Expand All @@ -196,39 +255,33 @@ def handle_pubsub_push_response(self, response):

def handle_push_response(self, response, **kwargs):
msg_type = response[0]
if isinstance(msg_type, bytes):
msg_type = msg_type.decode()

if msg_type not in (
*_INVALIDATION_MESSAGE,
_INVALIDATION_MESSAGE,
*_MAINTENANCE_MESSAGES,
*_MOVING_MESSAGE,
_MOVING_MESSAGE,
):
return self.pubsub_push_handler_func(response)

try:
if (
msg_type in _INVALIDATION_MESSAGE
msg_type == _INVALIDATION_MESSAGE
and self.invalidation_push_handler_func
):
return self.invalidation_push_handler_func(response)

if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# Expected message format is: MOVING <seq_number> <time> <endpoint>
id = response[1]
ttl = response[2]
host, port = response[3].decode().split(":")
notification = NodeMovingEvent(id, host, port, ttl)
if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]

notification = parser_function(response)
return self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
notification = None

if msg_type in _MIGRATING_MESSAGE:
# Expected message format is: MIGRATING <seq_number> <time> <shard_id-s>
id = response[1]
ttl = response[2]
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
id = response[1]
notification = NodeMigratedEvent(id)
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
notification = parser_function(response, notification_type)

if notification is not None:
return self.maintenance_push_handler_func(notification)
Expand Down Expand Up @@ -268,38 +321,35 @@ async def handle_push_response(self, response, **kwargs):
"""Handle push responses asynchronously"""

msg_type = response[0]
if isinstance(msg_type, bytes):
msg_type = msg_type.decode()

if msg_type not in (
*_INVALIDATION_MESSAGE,
_INVALIDATION_MESSAGE,
*_MAINTENANCE_MESSAGES,
*_MOVING_MESSAGE,
_MOVING_MESSAGE,
):
return await self.pubsub_push_handler_func(response)

try:
if (
msg_type in _INVALIDATION_MESSAGE
msg_type == _INVALIDATION_MESSAGE
and self.invalidation_push_handler_func
):
return await self.invalidation_push_handler_func(response)

if msg_type in _MOVING_MESSAGE and self.node_moving_push_handler_func:
# push notification from enterprise cluster for node moving
id = response[1]
ttl = response[2]
host, port = response[3].split(":")
notification = NodeMovingEvent(id, host, port, ttl)
if isinstance(msg_type, bytes):
msg_type = msg_type.decode()

if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func:
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
notification = parser_function(response)
return await self.node_moving_push_handler_func(notification)

if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func:
notification = None

if msg_type in _MIGRATING_MESSAGE:
id = response[1]
ttl = response[2]
notification = NodeMigratingEvent(id, ttl)
elif msg_type in _MIGRATED_MESSAGE:
id = response[1]
notification = NodeMigratedEvent(id)
parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1]
notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0]
notification = parser_function(response, notification_type)

if notification is not None:
return await self.maintenance_push_handler_func(notification)
Expand Down
Loading