Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions redis/asyncio/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ def pubsub(self, **kwargs):

async def execute_command(self, *args, **options):
async def callback():
return await self._active_database.client.execute_command(*args, **options)
response = await self._active_database.client.execute_command(*args, **options)
await self._register_command_execution(args)
return response

return await self._execute_with_failure_detection(callback, args)

Expand All @@ -209,7 +211,9 @@ async def callback():
for command, options in command_stack:
pipe.execute_command(*command, **options)

return await pipe.execute()
response = await pipe.execute()
await self._register_command_execution(command_stack)
return response

return await self._execute_with_failure_detection(callback, command_stack)

Expand All @@ -222,23 +226,28 @@ async def execute_transaction(
watch_delay: Optional[float] = None,
):
async def callback():
return await self._active_database.client.transaction(
response = await self._active_database.client.transaction(
func,
*watches,
shard_hint=shard_hint,
value_from_callable=value_from_callable,
watch_delay=watch_delay
)
await self._register_command_execution(())
return response

return await self._execute_with_failure_detection(callback)

async def execute_pubsub_method(self, method_name: str, *args, **kwargs):
async def callback():
method = getattr(self.active_pubsub, method_name)
if iscoroutinefunction(method):
return await method(*args, **kwargs)
response = await method(*args, **kwargs)
else:
return method(*args, **kwargs)
response = method(*args, **kwargs)

await self._register_command_execution(args)
return response

return await self._execute_with_failure_detection(callback, *args)

Expand Down Expand Up @@ -280,6 +289,10 @@ async def _check_active_database(self):
async def _on_command_fail(self, error, *args):
await self._event_dispatcher.dispatch_async(AsyncOnCommandsFailEvent(args, error))

async def _register_command_execution(self, cmd: tuple):
for detector in self._failure_detectors:
await detector.register_command_execution(cmd)

def _setup_event_dispatcher(self):
"""
Registers necessary listeners.
Expand Down
22 changes: 14 additions & 8 deletions redis/asyncio/multidb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from redis.asyncio.multidb.database import Databases, Database
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_DELAY, \
DEFAULT_FAILOVER_ATTEMPTS
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper, \
DEFAULT_FAILURES_THRESHOLD, DEFAULT_FAILURES_DURATION
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper
from redis.asyncio.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_INTERVAL, \
DEFAULT_HEALTH_CHECK_PROBES, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
from redis.data_structure import WeightedList
from redis.event import EventDispatcherInterface, EventDispatcher
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter, DEFAULT_GRACE_PERIOD
from redis.multidb.failure_detector import CommandFailureDetector
from redis.multidb.failure_detector import CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \
DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_FAILURES_DETECTION_WINDOW

DEFAULT_AUTO_FALLBACK_INTERVAL = 120

Expand Down Expand Up @@ -70,8 +70,9 @@ class MultiDbConfig:
client_class: The client class used to manage database connections.
command_retry: Retry strategy for executing database commands.
failure_detectors: Optional list of additional failure detectors for monitoring database failures.
failure_threshold: Threshold for determining database failure.
failures_interval: Time interval for tracking database failures.
min_num_failures: Minimal count of failures required for failover
failure_rate_threshold: Percentage of failures required for failover
failures_detection_window: Time interval for tracking database failures.
health_checks: Optional list of additional health checks performed on databases.
health_check_interval: Time interval for executing health checks.
health_check_probes: Number of attempts to evaluate the health of a database.
Expand Down Expand Up @@ -105,8 +106,9 @@ class MultiDbConfig:
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
)
failure_detectors: Optional[List[AsyncFailureDetector]] = None
failure_threshold: int = DEFAULT_FAILURES_THRESHOLD
failures_interval: float = DEFAULT_FAILURES_DURATION
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD
failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW
health_checks: Optional[List[HealthCheck]] = None
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
Expand Down Expand Up @@ -151,7 +153,11 @@ def databases(self) -> Databases:
def default_failure_detectors(self) -> List[AsyncFailureDetector]:
return [
FailureDetectorAsyncWrapper(
CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval)
CommandFailureDetector(
min_num_failures=self.min_num_failures,
failure_rate_threshold=self.failure_rate_threshold,
failure_detection_window=self.failures_detection_window
)
),
]

Expand Down
11 changes: 8 additions & 3 deletions redis/asyncio/multidb/failure_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

from redis.multidb.failure_detector import FailureDetector

DEFAULT_FAILURES_THRESHOLD = 1000
DEFAULT_FAILURES_DURATION = 2

class AsyncFailureDetector(ABC):

@abstractmethod
async def register_failure(self, exception: Exception, cmd: tuple) -> None:
"""Register a failure that occurred during command execution."""
pass

@abstractmethod
async def register_command_execution(self, cmd: tuple) -> None:
"""Register a command execution."""
pass

@abstractmethod
def set_command_executor(self, command_executor) -> None:
"""Set the command executor for this failure."""
Expand All @@ -27,5 +29,8 @@ def __init__(self, failure_detector: FailureDetector) -> None:
async def register_failure(self, exception: Exception, cmd: tuple) -> None:
self._failure_detector.register_failure(exception, cmd)

async def register_command_execution(self, cmd: tuple) -> None:
self._failure_detector.register_command_execution(cmd)

def set_command_executor(self, command_executor) -> None:
self._failure_detector.set_command_executor(command_executor)
20 changes: 16 additions & 4 deletions redis/multidb/command_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ def failover_strategy_executor(self) -> FailoverStrategyExecutor:

def execute_command(self, *args, **options):
def callback():
return self._active_database.client.execute_command(*args, **options)
response = self._active_database.client.execute_command(*args, **options)
self._register_command_execution(args)
return response

return self._execute_with_failure_detection(callback, args)

Expand All @@ -234,13 +236,17 @@ def callback():
for command, options in command_stack:
pipe.execute_command(*command, **options)

return pipe.execute()
response = pipe.execute()
self._register_command_execution(command_stack)
return response

return self._execute_with_failure_detection(callback, command_stack)

def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options):
def callback():
return self._active_database.client.transaction(transaction, *watches, **options)
response = self._active_database.client.transaction(transaction, *watches, **options)
self._register_command_execution(())
return response

return self._execute_with_failure_detection(callback)

Expand All @@ -256,7 +262,9 @@ def callback():
def execute_pubsub_method(self, method_name: str, *args, **kwargs):
def callback():
method = getattr(self.active_pubsub, method_name)
return method(*args, **kwargs)
response = method(*args, **kwargs)
self._register_command_execution(args)
return response

return self._execute_with_failure_detection(callback, *args)

Expand Down Expand Up @@ -298,6 +306,10 @@ def _check_active_database(self):
self.active_database = self._failover_strategy_executor.execute()
self._schedule_next_fallback()

def _register_command_execution(self, cmd: tuple):
for detector in self._failure_detectors:
detector.register_command_execution(cmd)

def _setup_event_dispatcher(self):
"""
Registers necessary listeners.
Expand Down
20 changes: 13 additions & 7 deletions redis/multidb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from redis.event import EventDispatcher, EventDispatcherInterface
from redis.multidb.circuit import PBCircuitBreakerAdapter, CircuitBreaker, DEFAULT_GRACE_PERIOD
from redis.multidb.database import Database, Databases
from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_FAILURES_THRESHOLD, \
DEFAULT_FAILURES_DURATION
from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \
DEFAULT_FAILURES_DETECTION_WINDOW, DEFAULT_FAILURE_RATE_THRESHOLD
from redis.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_PROBES, \
DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
from redis.multidb.failover import FailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_ATTEMPTS, \
Expand Down Expand Up @@ -71,8 +71,9 @@ class MultiDbConfig:
client_class: The client class used to manage database connections.
command_retry: Retry strategy for executing database commands.
failure_detectors: Optional list of additional failure detectors for monitoring database failures.
failure_threshold: Threshold for determining database failure.
failures_interval: Time interval for tracking database failures.
min_num_failures: Minimal count of failures required for failover
failure_rate_threshold: Percentage of failures required for failover
failures_detection_window: Time interval for tracking database failures.
health_checks: Optional list of additional health checks performed on databases.
health_check_interval: Time interval for executing health checks.
health_check_probes: Number of attempts to evaluate the health of a database.
Expand Down Expand Up @@ -107,8 +108,9 @@ class MultiDbConfig:
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
)
failure_detectors: Optional[List[FailureDetector]] = None
failure_threshold: int = DEFAULT_FAILURES_THRESHOLD
failures_interval: float = DEFAULT_FAILURES_DURATION
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD
failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW
health_checks: Optional[List[HealthCheck]] = None
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
Expand Down Expand Up @@ -152,7 +154,11 @@ def databases(self) -> Databases:

def default_failure_detectors(self) -> List[FailureDetector]:
return [
CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval),
CommandFailureDetector(
min_num_failures=self.min_num_failures,
failure_rate_threshold=self.failure_rate_threshold,
failure_detection_window=self.failures_detection_window
),
]

def default_health_checks(self) -> List[HealthCheck]:
Expand Down
62 changes: 39 additions & 23 deletions redis/multidb/failure_detector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import threading
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
Expand All @@ -7,8 +8,9 @@

from redis.multidb.circuit import State as CBState

DEFAULT_FAILURES_THRESHOLD = 1000
DEFAULT_FAILURES_DURATION = 2
DEFAULT_MIN_NUM_FAILURES = 1000
DEFAULT_FAILURE_RATE_THRESHOLD = 0.1
DEFAULT_FAILURES_DETECTION_WINDOW = 2

class FailureDetector(ABC):

Expand All @@ -17,6 +19,11 @@ def register_failure(self, exception: Exception, cmd: tuple) -> None:
"""Register a failure that occurred during command execution."""
pass

@abstractmethod
def register_command_execution(self, cmd: tuple) -> None:
"""Register a command execution."""
pass

@abstractmethod
def set_command_executor(self, command_executor) -> None:
"""Set the command executor for this failure."""
Expand All @@ -28,56 +35,65 @@ class CommandFailureDetector(FailureDetector):
"""
def __init__(
self,
threshold: int = DEFAULT_FAILURES_THRESHOLD,
duration: float = DEFAULT_FAILURES_DURATION,
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES,
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD,
failure_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW,
error_types: Optional[List[Type[Exception]]] = None,
) -> None:
"""
Initialize a new CommandFailureDetector instance.

Args:
threshold: The number of failures that must occur within the duration to trigger failure detection.
duration: The time window in seconds during which failures are counted.
min_num_failures: Minimal count of failures required for failover
failure_rate_threshold: Percentage of failures required for failover
failure_detection_window: Time interval for executing health checks.
error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted.

The detector tracks command failures within a sliding time window. When the number of failures
exceeds the threshold within the specified duration, it triggers failure detection.
"""
self._command_executor = None
self._threshold = threshold
self._duration = duration
self._min_num_failures = min_num_failures
self._failure_rate_threshold = failure_rate_threshold
self._failure_detection_window = failure_detection_window
self._error_types = error_types
self._commands_executed: int = 0
self._start_time: datetime = datetime.now()
self._end_time: datetime = self._start_time + timedelta(seconds=self._duration)
self._failures_within_duration: List[tuple[datetime, tuple]] = []
self._end_time: datetime = self._start_time + timedelta(seconds=self._failure_detection_window)
self._failures_count: int = 0
self._lock = threading.RLock()

def register_failure(self, exception: Exception, cmd: tuple) -> None:
failure_time = datetime.now()

if not self._start_time < failure_time < self._end_time:
self._reset()

with self._lock:
if self._error_types:
if type(exception) in self._error_types:
self._failures_within_duration.append((datetime.now(), cmd))
self._failures_count += 1
else:
self._failures_within_duration.append((datetime.now(), cmd))
self._failures_count += 1

self._check_threshold()
self._check_threshold()

def set_command_executor(self, command_executor) -> None:
self._command_executor = command_executor

def _check_threshold(self):
def register_command_execution(self, cmd: tuple) -> None:
with self._lock:
if len(self._failures_within_duration) >= self._threshold:
self._command_executor.active_database.circuit.state = CBState.OPEN
if not self._start_time < datetime.now() < self._end_time:
self._reset()

self._commands_executed += 1

def _check_threshold(self):
if (
self._failures_count >= self._min_num_failures
and self._failures_count >= (math.ceil(self._commands_executed * self._failure_rate_threshold))
):
self._command_executor.active_database.circuit.state = CBState.OPEN
self._reset()

def _reset(self) -> None:
with self._lock:
self._start_time = datetime.now()
self._end_time = self._start_time + timedelta(seconds=self._duration)
self._failures_within_duration = []
self._end_time = self._start_time + timedelta(seconds=self._failure_detection_window)
self._failures_count = 0
self._commands_executed = 0
Loading