From ded6d4922ce24193f910c327942db3ec33f7dd71 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Tue, 23 Sep 2025 16:34:21 +0300 Subject: [PATCH] Refactored Failure Detector --- redis/asyncio/multidb/command_executor.py | 23 ++- redis/asyncio/multidb/config.py | 22 ++- redis/asyncio/multidb/failure_detector.py | 11 +- redis/multidb/command_executor.py | 20 ++- redis/multidb/config.py | 20 ++- redis/multidb/failure_detector.py | 62 ++++--- tests/test_asyncio/test_multidb/conftest.py | 5 +- .../test_multidb/test_command_executor.py | 3 + .../test_multidb/test_failure_detector.py | 153 +++++++----------- tests/test_asyncio/test_scenario/conftest.py | 9 +- .../test_scenario/test_active_active.py | 22 +-- tests/test_multidb/conftest.py | 5 +- tests/test_multidb/test_command_executor.py | 5 +- tests/test_multidb/test_failure_detector.py | 149 +++++++---------- tests/test_scenario/conftest.py | 6 +- tests/test_scenario/test_active_active.py | 30 ++-- 16 files changed, 277 insertions(+), 268 deletions(-) diff --git a/redis/asyncio/multidb/command_executor.py b/redis/asyncio/multidb/command_executor.py index 95209298f4..2526c4ed9e 100644 --- a/redis/asyncio/multidb/command_executor.py +++ b/redis/asyncio/multidb/command_executor.py @@ -199,7 +199,9 @@ def pubsub(self, **kwargs): async def execute_command(self, *args, **options): async def callback(): - return await self._active_database.client.execute_command(*args, **options) + response = await self._active_database.client.execute_command(*args, **options) + await self._register_command_execution(args) + return response return await self._execute_with_failure_detection(callback, args) @@ -209,7 +211,9 @@ async def callback(): for command, options in command_stack: pipe.execute_command(*command, **options) - return await pipe.execute() + response = await pipe.execute() + await self._register_command_execution(command_stack) + return response return await self._execute_with_failure_detection(callback, command_stack) @@ -222,13 +226,15 @@ async def execute_transaction( watch_delay: Optional[float] = None, ): async def callback(): - return await self._active_database.client.transaction( + response = await self._active_database.client.transaction( func, *watches, shard_hint=shard_hint, value_from_callable=value_from_callable, watch_delay=watch_delay ) + await self._register_command_execution(()) + return response return await self._execute_with_failure_detection(callback) @@ -236,9 +242,12 @@ async def execute_pubsub_method(self, method_name: str, *args, **kwargs): async def callback(): method = getattr(self.active_pubsub, method_name) if iscoroutinefunction(method): - return await method(*args, **kwargs) + response = await method(*args, **kwargs) else: - return method(*args, **kwargs) + response = method(*args, **kwargs) + + await self._register_command_execution(args) + return response return await self._execute_with_failure_detection(callback, *args) @@ -280,6 +289,10 @@ async def _check_active_database(self): async def _on_command_fail(self, error, *args): await self._event_dispatcher.dispatch_async(AsyncOnCommandsFailEvent(args, error)) + async def _register_command_execution(self, cmd: tuple): + for detector in self._failure_detectors: + await detector.register_command_execution(cmd) + def _setup_event_dispatcher(self): """ Registers necessary listeners. diff --git a/redis/asyncio/multidb/config.py b/redis/asyncio/multidb/config.py index 354bbcf5c7..af2029c110 100644 --- a/redis/asyncio/multidb/config.py +++ b/redis/asyncio/multidb/config.py @@ -7,8 +7,7 @@ from redis.asyncio.multidb.database import Databases, Database from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_DELAY, \ DEFAULT_FAILOVER_ATTEMPTS -from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper, \ - DEFAULT_FAILURES_THRESHOLD, DEFAULT_FAILURES_DURATION +from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper from redis.asyncio.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_INTERVAL, \ DEFAULT_HEALTH_CHECK_PROBES, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY from redis.asyncio.retry import Retry @@ -16,7 +15,8 @@ from redis.data_structure import WeightedList from redis.event import EventDispatcherInterface, EventDispatcher from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter, DEFAULT_GRACE_PERIOD -from redis.multidb.failure_detector import CommandFailureDetector +from redis.multidb.failure_detector import CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \ + DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_FAILURES_DETECTION_WINDOW DEFAULT_AUTO_FALLBACK_INTERVAL = 120 @@ -70,8 +70,9 @@ class MultiDbConfig: client_class: The client class used to manage database connections. command_retry: Retry strategy for executing database commands. failure_detectors: Optional list of additional failure detectors for monitoring database failures. - failure_threshold: Threshold for determining database failure. - failures_interval: Time interval for tracking database failures. + min_num_failures: Minimal count of failures required for failover + failure_rate_threshold: Percentage of failures required for failover + failures_detection_window: Time interval for tracking database failures. health_checks: Optional list of additional health checks performed on databases. health_check_interval: Time interval for executing health checks. health_check_probes: Number of attempts to evaluate the health of a database. @@ -105,8 +106,9 @@ class MultiDbConfig: backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 ) failure_detectors: Optional[List[AsyncFailureDetector]] = None - failure_threshold: int = DEFAULT_FAILURES_THRESHOLD - failures_interval: float = DEFAULT_FAILURES_DURATION + min_num_failures: int = DEFAULT_MIN_NUM_FAILURES + failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD + failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW health_checks: Optional[List[HealthCheck]] = None health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES @@ -151,7 +153,11 @@ def databases(self) -> Databases: def default_failure_detectors(self) -> List[AsyncFailureDetector]: return [ FailureDetectorAsyncWrapper( - CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval) + CommandFailureDetector( + min_num_failures=self.min_num_failures, + failure_rate_threshold=self.failure_rate_threshold, + failure_detection_window=self.failures_detection_window + ) ), ] diff --git a/redis/asyncio/multidb/failure_detector.py b/redis/asyncio/multidb/failure_detector.py index cdfcc6ff1e..e6d257e941 100644 --- a/redis/asyncio/multidb/failure_detector.py +++ b/redis/asyncio/multidb/failure_detector.py @@ -2,9 +2,6 @@ from redis.multidb.failure_detector import FailureDetector -DEFAULT_FAILURES_THRESHOLD = 1000 -DEFAULT_FAILURES_DURATION = 2 - class AsyncFailureDetector(ABC): @abstractmethod @@ -12,6 +9,11 @@ async def register_failure(self, exception: Exception, cmd: tuple) -> None: """Register a failure that occurred during command execution.""" pass + @abstractmethod + async def register_command_execution(self, cmd: tuple) -> None: + """Register a command execution.""" + pass + @abstractmethod def set_command_executor(self, command_executor) -> None: """Set the command executor for this failure.""" @@ -27,5 +29,8 @@ def __init__(self, failure_detector: FailureDetector) -> None: async def register_failure(self, exception: Exception, cmd: tuple) -> None: self._failure_detector.register_failure(exception, cmd) + async def register_command_execution(self, cmd: tuple) -> None: + self._failure_detector.register_command_execution(cmd) + def set_command_executor(self, command_executor) -> None: self._failure_detector.set_command_executor(command_executor) \ No newline at end of file diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index 562dcfd6fe..481364de9a 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -224,7 +224,9 @@ def failover_strategy_executor(self) -> FailoverStrategyExecutor: def execute_command(self, *args, **options): def callback(): - return self._active_database.client.execute_command(*args, **options) + response = self._active_database.client.execute_command(*args, **options) + self._register_command_execution(args) + return response return self._execute_with_failure_detection(callback, args) @@ -234,13 +236,17 @@ def callback(): for command, options in command_stack: pipe.execute_command(*command, **options) - return pipe.execute() + response = pipe.execute() + self._register_command_execution(command_stack) + return response return self._execute_with_failure_detection(callback, command_stack) def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options): def callback(): - return self._active_database.client.transaction(transaction, *watches, **options) + response = self._active_database.client.transaction(transaction, *watches, **options) + self._register_command_execution(()) + return response return self._execute_with_failure_detection(callback) @@ -256,7 +262,9 @@ def callback(): def execute_pubsub_method(self, method_name: str, *args, **kwargs): def callback(): method = getattr(self.active_pubsub, method_name) - return method(*args, **kwargs) + response = method(*args, **kwargs) + self._register_command_execution(args) + return response return self._execute_with_failure_detection(callback, *args) @@ -298,6 +306,10 @@ def _check_active_database(self): self.active_database = self._failover_strategy_executor.execute() self._schedule_next_fallback() + def _register_command_execution(self, cmd: tuple): + for detector in self._failure_detectors: + detector.register_command_execution(cmd) + def _setup_event_dispatcher(self): """ Registers necessary listeners. diff --git a/redis/multidb/config.py b/redis/multidb/config.py index ff9872ffd4..f78114f014 100644 --- a/redis/multidb/config.py +++ b/redis/multidb/config.py @@ -11,8 +11,8 @@ from redis.event import EventDispatcher, EventDispatcherInterface from redis.multidb.circuit import PBCircuitBreakerAdapter, CircuitBreaker, DEFAULT_GRACE_PERIOD from redis.multidb.database import Database, Databases -from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_FAILURES_THRESHOLD, \ - DEFAULT_FAILURES_DURATION +from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \ + DEFAULT_FAILURES_DETECTION_WINDOW, DEFAULT_FAILURE_RATE_THRESHOLD from redis.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_PROBES, \ DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY from redis.multidb.failover import FailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_ATTEMPTS, \ @@ -71,8 +71,9 @@ class MultiDbConfig: client_class: The client class used to manage database connections. command_retry: Retry strategy for executing database commands. failure_detectors: Optional list of additional failure detectors for monitoring database failures. - failure_threshold: Threshold for determining database failure. - failures_interval: Time interval for tracking database failures. + min_num_failures: Minimal count of failures required for failover + failure_rate_threshold: Percentage of failures required for failover + failures_detection_window: Time interval for tracking database failures. health_checks: Optional list of additional health checks performed on databases. health_check_interval: Time interval for executing health checks. health_check_probes: Number of attempts to evaluate the health of a database. @@ -107,8 +108,9 @@ class MultiDbConfig: backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3 ) failure_detectors: Optional[List[FailureDetector]] = None - failure_threshold: int = DEFAULT_FAILURES_THRESHOLD - failures_interval: float = DEFAULT_FAILURES_DURATION + min_num_failures: int = DEFAULT_MIN_NUM_FAILURES + failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD + failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW health_checks: Optional[List[HealthCheck]] = None health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES @@ -152,7 +154,11 @@ def databases(self) -> Databases: def default_failure_detectors(self) -> List[FailureDetector]: return [ - CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval), + CommandFailureDetector( + min_num_failures=self.min_num_failures, + failure_rate_threshold=self.failure_rate_threshold, + failure_detection_window=self.failures_detection_window + ), ] def default_health_checks(self) -> List[HealthCheck]: diff --git a/redis/multidb/failure_detector.py b/redis/multidb/failure_detector.py index 6b918b152a..ca657c4e52 100644 --- a/redis/multidb/failure_detector.py +++ b/redis/multidb/failure_detector.py @@ -1,3 +1,4 @@ +import math import threading from abc import ABC, abstractmethod from datetime import datetime, timedelta @@ -7,8 +8,9 @@ from redis.multidb.circuit import State as CBState -DEFAULT_FAILURES_THRESHOLD = 1000 -DEFAULT_FAILURES_DURATION = 2 +DEFAULT_MIN_NUM_FAILURES = 1000 +DEFAULT_FAILURE_RATE_THRESHOLD = 0.1 +DEFAULT_FAILURES_DETECTION_WINDOW = 2 class FailureDetector(ABC): @@ -17,6 +19,11 @@ def register_failure(self, exception: Exception, cmd: tuple) -> None: """Register a failure that occurred during command execution.""" pass + @abstractmethod + def register_command_execution(self, cmd: tuple) -> None: + """Register a command execution.""" + pass + @abstractmethod def set_command_executor(self, command_executor) -> None: """Set the command executor for this failure.""" @@ -28,56 +35,65 @@ class CommandFailureDetector(FailureDetector): """ def __init__( self, - threshold: int = DEFAULT_FAILURES_THRESHOLD, - duration: float = DEFAULT_FAILURES_DURATION, + min_num_failures: int = DEFAULT_MIN_NUM_FAILURES, + failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD, + failure_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW, error_types: Optional[List[Type[Exception]]] = None, ) -> None: """ Initialize a new CommandFailureDetector instance. Args: - threshold: The number of failures that must occur within the duration to trigger failure detection. - duration: The time window in seconds during which failures are counted. + min_num_failures: Minimal count of failures required for failover + failure_rate_threshold: Percentage of failures required for failover + failure_detection_window: Time interval for executing health checks. error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted. The detector tracks command failures within a sliding time window. When the number of failures exceeds the threshold within the specified duration, it triggers failure detection. """ self._command_executor = None - self._threshold = threshold - self._duration = duration + self._min_num_failures = min_num_failures + self._failure_rate_threshold = failure_rate_threshold + self._failure_detection_window = failure_detection_window self._error_types = error_types + self._commands_executed: int = 0 self._start_time: datetime = datetime.now() - self._end_time: datetime = self._start_time + timedelta(seconds=self._duration) - self._failures_within_duration: List[tuple[datetime, tuple]] = [] + self._end_time: datetime = self._start_time + timedelta(seconds=self._failure_detection_window) + self._failures_count: int = 0 self._lock = threading.RLock() def register_failure(self, exception: Exception, cmd: tuple) -> None: - failure_time = datetime.now() - - if not self._start_time < failure_time < self._end_time: - self._reset() - with self._lock: if self._error_types: if type(exception) in self._error_types: - self._failures_within_duration.append((datetime.now(), cmd)) + self._failures_count += 1 else: - self._failures_within_duration.append((datetime.now(), cmd)) + self._failures_count += 1 - self._check_threshold() + self._check_threshold() def set_command_executor(self, command_executor) -> None: self._command_executor = command_executor - def _check_threshold(self): + def register_command_execution(self, cmd: tuple) -> None: with self._lock: - if len(self._failures_within_duration) >= self._threshold: - self._command_executor.active_database.circuit.state = CBState.OPEN + if not self._start_time < datetime.now() < self._end_time: self._reset() + self._commands_executed += 1 + + def _check_threshold(self): + if ( + self._failures_count >= self._min_num_failures + and self._failures_count >= (math.ceil(self._commands_executed * self._failure_rate_threshold)) + ): + self._command_executor.active_database.circuit.state = CBState.OPEN + self._reset() + def _reset(self) -> None: with self._lock: self._start_time = datetime.now() - self._end_time = self._start_time + timedelta(seconds=self._duration) - self._failures_within_duration = [] \ No newline at end of file + self._end_time = self._start_time + timedelta(seconds=self._failure_detection_window) + self._failures_count = 0 + self._commands_executed = 0 \ No newline at end of file diff --git a/tests/test_asyncio/test_multidb/conftest.py b/tests/test_asyncio/test_multidb/conftest.py index f5ea12d9b0..7695332754 100644 --- a/tests/test_asyncio/test_multidb/conftest.py +++ b/tests/test_asyncio/test_multidb/conftest.py @@ -9,7 +9,7 @@ DEFAULT_HEALTH_CHECK_POLICY from redis.data_structure import WeightedList from redis.multidb.circuit import State as CBState, CircuitBreaker -from redis.asyncio import Redis +from redis.asyncio import Redis, ConnectionPool from redis.asyncio.multidb.database import Database, Databases @pytest.fixture() @@ -37,6 +37,7 @@ def mock_db(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) @@ -51,6 +52,7 @@ def mock_db1(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) @@ -65,6 +67,7 @@ def mock_db2(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) diff --git a/tests/test_asyncio/test_multidb/test_command_executor.py b/tests/test_asyncio/test_multidb/test_command_executor.py index 3f64e6aa0b..01a8326e5a 100644 --- a/tests/test_asyncio/test_multidb/test_command_executor.py +++ b/tests/test_asyncio/test_multidb/test_command_executor.py @@ -46,6 +46,7 @@ async def test_execute_command_on_active_database(self, mock_db, mock_db1, mock_ await executor.set_active_database(mock_db2) assert await executor.execute_command('SET', 'key', 'value') == 'OK2' assert mock_ed.register_listeners.call_count == 1 + assert mock_fd.register_command_execution.call_count == 2 @pytest.mark.asyncio @pytest.mark.parametrize( @@ -82,6 +83,7 @@ async def test_execute_command_automatically_select_active_database( assert await executor.execute_command('SET', 'key', 'value') == 'OK2' assert mock_ed.register_listeners.call_count == 1 assert mock_selector.call_count == 2 + assert mock_fd.register_command_execution.call_count == 2 @pytest.mark.asyncio @pytest.mark.parametrize( @@ -124,6 +126,7 @@ async def test_execute_command_fallback_to_another_db_after_fallback_interval( assert await executor.execute_command('SET', 'key', 'value') == 'OK1' assert mock_ed.register_listeners.call_count == 1 assert mock_selector.call_count == 3 + assert mock_fd.register_command_execution.call_count == 3 @pytest.mark.asyncio @pytest.mark.parametrize( diff --git a/tests/test_asyncio/test_multidb/test_failure_detector.py b/tests/test_asyncio/test_multidb/test_failure_detector.py index 3c1eb4fabd..a4d7407609 100644 --- a/tests/test_asyncio/test_multidb/test_failure_detector.py +++ b/tests/test_asyncio/test_multidb/test_failure_detector.py @@ -4,6 +4,7 @@ import pytest from redis.asyncio.multidb.command_executor import AsyncCommandExecutor +from redis.asyncio.multidb.database import Database from redis.asyncio.multidb.failure_detector import FailureDetectorAsyncWrapper from redis.multidb.circuit import State as CBState from redis.multidb.failure_detector import CommandFailureDetector @@ -12,127 +13,95 @@ class TestFailureDetectorAsyncWrapper: @pytest.mark.asyncio @pytest.mark.parametrize( - 'mock_db', + 'min_num_failures,failure_rate_threshold,circuit_state', [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + (2, 0.4, CBState.OPEN), + (2, 0, CBState.OPEN), + (0, 0.4, CBState.OPEN), + (3, 0.4, CBState.CLOSED), + (2, 0.41, CBState.CLOSED), ], - indirect=True, - ) - async def test_failure_detector_open_circuit_on_threshold_exceed_and_interval_not_exceed(self, mock_db): - fd = FailureDetectorAsyncWrapper(CommandFailureDetector(5, 1)) - mock_ce = Mock(spec=AsyncCommandExecutor) - mock_ce.active_database = mock_db - fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.OPEN - - @pytest.mark.asyncio - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + ids=[ + "exceeds min num failures AND failures rate", + "exceeds min num failures AND failures rate == 0", + "min num failures == 0 AND exceeds failures rate", + "do not exceeds min num failures", + "do not exceeds failures rate", ], - indirect=True, ) - async def test_failure_detector_do_not_open_circuit_if_threshold_not_exceed_and_interval_not_exceed(self, mock_db): - fd = FailureDetectorAsyncWrapper(CommandFailureDetector(5, 1)) + async def test_failure_detector_correctly_reacts_to_failures( + self, + min_num_failures, + failure_rate_threshold, + circuit_state + ): + fd = FailureDetectorAsyncWrapper(CommandFailureDetector(min_num_failures, failure_rate_threshold)) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=AsyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) + await fd.register_command_execution(('GET', 'key')) + await fd.register_command_execution(('GET','key')) + await fd.register_failure(Exception(), ('GET', 'key')) - assert mock_db.circuit.state == CBState.CLOSED + await fd.register_command_execution(('GET', 'key')) + await fd.register_command_execution(('GET','key')) + await fd.register_command_execution(('GET','key')) + await fd.register_failure(Exception(), ('GET', 'key')) - @pytest.mark.asyncio - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, - ], - indirect=True, - ) - async def test_failure_detector_do_not_open_circuit_on_threshold_exceed_and_interval_exceed(self, mock_db): - fd = FailureDetectorAsyncWrapper(CommandFailureDetector(5, 0.3)) - mock_ce = Mock(spec=AsyncCommandExecutor) - mock_ce.active_database = mock_db - fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await asyncio.sleep(0.1) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await asyncio.sleep(0.1) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await asyncio.sleep(0.1) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await asyncio.sleep(0.1) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.CLOSED - - # 4 more failures as the last one already refreshed timer - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.OPEN + assert mock_db.circuit.state == circuit_state @pytest.mark.asyncio @pytest.mark.parametrize( - 'mock_db', + 'min_num_failures,failure_rate_threshold', [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + (3, 0.0), + (3, 0.6), + ], + ids=[ + "do not exceeds min num failures, during interval", + "do not exceeds min num failures AND failure rate, during interval", ], - indirect=True, ) - async def test_failure_detector_refresh_timer_on_expired_duration(self, mock_db): - fd = FailureDetectorAsyncWrapper(CommandFailureDetector(5, 0.3)) + async def test_failure_detector_do_not_open_circuit_on_interval_exceed(self, min_num_failures, failure_rate_threshold): + fd = FailureDetectorAsyncWrapper( + CommandFailureDetector(min_num_failures, failure_rate_threshold, 0.3) + ) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=AsyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) assert mock_db.circuit.state == CBState.CLOSED - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await asyncio.sleep(0.4) + await fd.register_command_execution(('GET', 'key')) + await fd.register_failure(Exception(), ('GET', 'key')) + await asyncio.sleep(0.16) + await fd.register_command_execution(('GET', 'key')) + await fd.register_command_execution(('GET', 'key')) + await fd.register_command_execution(('GET', 'key')) + await fd.register_failure(Exception(), ('GET', 'key')) + await asyncio.sleep(0.16) + await fd.register_command_execution(('GET', 'key')) + await fd.register_failure(Exception(), ('GET', 'key')) assert mock_db.circuit.state == CBState.CLOSED - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.CLOSED - await fd.register_failure(Exception(), ('SET', 'key1', 'value1')) + # 2 more failure as last one already refreshed timer + await fd.register_command_execution(('GET', 'key')) + await fd.register_failure(Exception(), ('GET', 'key')) + await fd.register_command_execution(('GET', 'key')) + await fd.register_failure(Exception(), ('GET', 'key')) assert mock_db.circuit.state == CBState.OPEN @pytest.mark.asyncio - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, - ], - indirect=True, - ) - async def test_failure_detector_open_circuit_on_specific_exception_threshold_exceed(self, mock_db): + async def test_failure_detector_open_circuit_on_specific_exception_threshold_exceed(self): fd = FailureDetectorAsyncWrapper(CommandFailureDetector(5, 1, error_types=[ConnectionError])) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=AsyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) diff --git a/tests/test_asyncio/test_scenario/conftest.py b/tests/test_asyncio/test_scenario/conftest.py index 00ddd36a76..c152e51fb5 100644 --- a/tests/test_asyncio/test_scenario/conftest.py +++ b/tests/test_asyncio/test_scenario/conftest.py @@ -7,12 +7,13 @@ from redis.asyncio import Redis, RedisCluster from redis.asyncio.multidb.client import MultiDBClient -from redis.asyncio.multidb.config import DEFAULT_FAILURES_THRESHOLD, DEFAULT_HEALTH_CHECK_INTERVAL, DatabaseConfig, \ +from redis.asyncio.multidb.config import DEFAULT_HEALTH_CHECK_INTERVAL, DatabaseConfig, \ MultiDbConfig from redis.asyncio.multidb.event import AsyncActiveDatabaseChanged from redis.asyncio.retry import Retry from redis.backoff import ExponentialBackoff from redis.event import AsyncEventListenerInterface, EventDispatcher +from redis.multidb.failure_detector import DEFAULT_MIN_NUM_FAILURES from tests.test_scenario.conftest import get_endpoints_config, extract_cluster_fqdn from tests.test_scenario.fault_injector_client import FaultInjectorClient @@ -40,7 +41,7 @@ async def r_multi_db(request) -> AsyncGenerator[tuple[MultiDBClient, CheckActive username = endpoint_config.get('username', None) password = endpoint_config.get('password', None) - failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) + min_num_failures = request.param.get('min_num_failures', DEFAULT_MIN_NUM_FAILURES) command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.1, base=0.01), retries=10)) # Retry configuration different for health checks as initial health check require more time in case @@ -82,7 +83,7 @@ async def r_multi_db(request) -> AsyncGenerator[tuple[MultiDBClient, CheckActive client_class=client_class, databases_config=db_configs, command_retry=command_retry, - failure_threshold=failure_threshold, + min_num_failures=min_num_failures, health_checks=health_checks, health_check_probes=3, health_check_interval=health_check_interval, @@ -97,7 +98,7 @@ async def teardown(): if isinstance(client.command_executor.active_database.client, Redis): await client.command_executor.active_database.client.connection_pool.disconnect() - await asyncio.sleep(15) + await asyncio.sleep(20) yield client, listener, endpoint_config await teardown() \ No newline at end of file diff --git a/tests/test_asyncio/test_scenario/test_active_active.py b/tests/test_asyncio/test_scenario/test_active_active.py index 693dd33bbe..55b604528b 100644 --- a/tests/test_asyncio/test_scenario/test_active_active.py +++ b/tests/test_asyncio/test_scenario/test_active_active.py @@ -43,8 +43,8 @@ class TestActiveActive: @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -89,7 +89,7 @@ async def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_in @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2, "health_checks": + {"client_class": Redis, "min_num_failures": 2, "health_checks": [ LagAwareHealthCheck( verify_tls=False, @@ -98,7 +98,7 @@ async def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_in ], "health_check_interval": 20, }, - {"client_class": RedisCluster, "failure_threshold": 2, "health_checks": + {"client_class": RedisCluster, "min_num_failures": 2, "health_checks": [ LagAwareHealthCheck( verify_tls=False, @@ -149,8 +149,8 @@ async def test_multi_db_client_uses_lag_aware_health_check(self, r_multi_db, fau @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -198,8 +198,8 @@ async def callback(): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -247,8 +247,8 @@ async def callback(): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -294,7 +294,7 @@ async def callback(pipe: Pipeline): @pytest.mark.asyncio @pytest.mark.parametrize( "r_multi_db", - [{"failure_threshold": 2}], + [{"min_num_failures": 2}], indirect=True ) @pytest.mark.timeout(200) diff --git a/tests/test_multidb/conftest.py b/tests/test_multidb/conftest.py index 3b1f7f369b..ce4658868f 100644 --- a/tests/test_multidb/conftest.py +++ b/tests/test_multidb/conftest.py @@ -2,7 +2,7 @@ import pytest -from redis import Redis +from redis import Redis, ConnectionPool from redis.data_structure import WeightedList from redis.multidb.circuit import State as CBState, CircuitBreaker from redis.multidb.config import MultiDbConfig, DatabaseConfig, DEFAULT_HEALTH_CHECK_INTERVAL, \ @@ -39,6 +39,7 @@ def mock_db(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) @@ -53,6 +54,7 @@ def mock_db1(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) @@ -67,6 +69,7 @@ def mock_db2(request) -> Database: db = Mock(spec=Database) db.weight = request.param.get("weight", 1.0) db.client = Mock(spec=Redis) + db.client.connection_pool = Mock(spec=ConnectionPool) cb = request.param.get("circuit", {}) mock_cb = Mock(spec=CircuitBreaker) diff --git a/tests/test_multidb/test_command_executor.py b/tests/test_multidb/test_command_executor.py index 044fef0f8c..2001d64f04 100644 --- a/tests/test_multidb/test_command_executor.py +++ b/tests/test_multidb/test_command_executor.py @@ -44,6 +44,7 @@ def test_execute_command_on_active_database(self, mock_db, mock_db1, mock_db2, m executor.active_database = mock_db2 assert executor.execute_command('SET', 'key', 'value') == 'OK2' assert mock_ed.register_listeners.call_count == 1 + assert mock_fd.register_command_execution.call_count == 2 @pytest.mark.parametrize( 'mock_db,mock_db1,mock_db2', @@ -78,6 +79,7 @@ def test_execute_command_automatically_select_active_database( assert executor.execute_command('SET', 'key', 'value') == 'OK2' assert mock_ed.register_listeners.call_count == 1 assert mock_fs.database.call_count == 2 + assert mock_fd.register_command_execution.call_count == 2 @pytest.mark.parametrize( 'mock_db,mock_db1,mock_db2', @@ -118,6 +120,7 @@ def test_execute_command_fallback_to_another_db_after_fallback_interval( assert executor.execute_command('SET', 'key', 'value') == 'OK1' assert mock_ed.register_listeners.call_count == 1 assert mock_fs.database.call_count == 3 + assert mock_fd.register_command_execution.call_count == 3 @pytest.mark.parametrize( 'mock_db,mock_db1,mock_db2', @@ -137,7 +140,7 @@ def test_execute_command_fallback_to_another_db_after_failure_detection( mock_db2.client.execute_command.side_effect = ['OK2', ConnectionError, ConnectionError, ConnectionError] mock_fs.database.side_effect = [mock_db1, mock_db2, mock_db1] threshold = 3 - fd = CommandFailureDetector(threshold, 1) + fd = CommandFailureDetector(threshold, 0.0, 1) ed = EventDispatcher() databases = create_weighted_list(mock_db, mock_db1, mock_db2) diff --git a/tests/test_multidb/test_failure_detector.py b/tests/test_multidb/test_failure_detector.py index 28687f2a11..3e71ab6aa5 100644 --- a/tests/test_multidb/test_failure_detector.py +++ b/tests/test_multidb/test_failure_detector.py @@ -4,6 +4,7 @@ import pytest from redis.multidb.command_executor import SyncCommandExecutor +from redis.multidb.database import Database from redis.multidb.failure_detector import CommandFailureDetector from redis.multidb.circuit import State as CBState from redis.exceptions import ConnectionError @@ -11,123 +12,91 @@ class TestCommandFailureDetector: @pytest.mark.parametrize( - 'mock_db', + 'min_num_failures,failure_rate_threshold,circuit_state', [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + (2, 0.4, CBState.OPEN), + (2, 0, CBState.OPEN), + (0, 0.4, CBState.OPEN), + (3, 0.4, CBState.CLOSED), + (2, 0.41, CBState.CLOSED), ], - indirect=True, - ) - def test_failure_detector_open_circuit_on_threshold_exceed_and_interval_not_exceed(self, mock_db): - fd = CommandFailureDetector(5, 1) - mock_ce = Mock(spec=SyncCommandExecutor) - mock_ce.active_database = mock_db - fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.OPEN - - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + ids=[ + "exceeds min num failures AND failures rate", + "exceeds min num failures AND failures rate == 0", + "min num failures == 0 AND exceeds failures rate", + "do not exceeds min num failures", + "do not exceeds failures rate", ], - indirect=True, ) - def test_failure_detector_do_not_open_circuit_if_threshold_not_exceed_and_interval_not_exceed(self, mock_db): - fd = CommandFailureDetector(5, 1) + def test_failure_detector_correctly_reacts_to_failures( + self, + min_num_failures, + failure_rate_threshold, + circuit_state + ): + fd = CommandFailureDetector(min_num_failures, failure_rate_threshold) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=SyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) + fd.register_command_execution(('GET', 'key')) + fd.register_command_execution(('GET','key')) + fd.register_failure(Exception(), ('GET', 'key')) - assert mock_db.circuit.state == CBState.CLOSED + fd.register_command_execution(('GET', 'key')) + fd.register_command_execution(('GET','key')) + fd.register_command_execution(('GET','key')) + fd.register_failure(Exception(), ('GET', 'key')) - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, - ], - indirect=True, - ) - def test_failure_detector_do_not_open_circuit_on_threshold_exceed_and_interval_exceed(self, mock_db): - fd = CommandFailureDetector(5, 0.3) - mock_ce = Mock(spec=SyncCommandExecutor) - mock_ce.active_database = mock_db - fd.set_command_executor(mock_ce) - assert mock_db.circuit.state == CBState.CLOSED - - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - sleep(0.1) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - sleep(0.1) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - sleep(0.1) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - sleep(0.1) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.CLOSED - - # 4 more failure as last one already refreshed timer - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.OPEN + assert mock_db.circuit.state == circuit_state @pytest.mark.parametrize( - 'mock_db', + 'min_num_failures,failure_rate_threshold', [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + (3, 0.0), + (3, 0.6), + ], + ids=[ + "do not exceeds min num failures, during interval", + "do not exceeds min num failures AND failure rate, during interval", ], - indirect=True, ) - def test_failure_detector_refresh_timer_on_expired_duration(self, mock_db): - fd = CommandFailureDetector(5, 0.3) + def test_failure_detector_do_not_open_circuit_on_interval_exceed(self, min_num_failures, failure_rate_threshold): + fd = CommandFailureDetector(min_num_failures, failure_rate_threshold, 0.3) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=SyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) assert mock_db.circuit.state == CBState.CLOSED - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - sleep(0.4) + fd.register_command_execution(('GET', 'key')) + fd.register_failure(Exception(), ('GET', 'key')) + sleep(0.16) + fd.register_command_execution(('GET', 'key')) + fd.register_command_execution(('GET', 'key')) + fd.register_command_execution(('GET', 'key')) + fd.register_failure(Exception(), ('GET', 'key')) + sleep(0.16) + fd.register_command_execution(('GET', 'key')) + fd.register_failure(Exception(), ('GET', 'key')) assert mock_db.circuit.state == CBState.CLOSED - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) - - assert mock_db.circuit.state == CBState.CLOSED - fd.register_failure(Exception(), ('SET', 'key1', 'value1')) + # 2 more failure as last one already refreshed timer + fd.register_command_execution(('GET', 'key')) + fd.register_failure(Exception(), ('GET', 'key')) + fd.register_command_execution(('GET', 'key')) + fd.register_failure(Exception(), ('GET', 'key')) assert mock_db.circuit.state == CBState.OPEN - @pytest.mark.parametrize( - 'mock_db', - [ - {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, - ], - indirect=True, - ) - def test_failure_detector_open_circuit_on_specific_exception_threshold_exceed(self, mock_db): + def test_failure_detector_open_circuit_on_specific_exception_threshold_exceed(self): fd = CommandFailureDetector(5, 1, error_types=[ConnectionError]) + mock_db = Mock(spec=Database) + mock_db.circuit.state = CBState.CLOSED mock_ce = Mock(spec=SyncCommandExecutor) mock_ce.active_database = mock_db fd.set_command_executor(mock_ce) diff --git a/tests/test_scenario/conftest.py b/tests/test_scenario/conftest.py index 6c32cf0699..d49aca5605 100644 --- a/tests/test_scenario/conftest.py +++ b/tests/test_scenario/conftest.py @@ -13,7 +13,7 @@ from redis.multidb.client import MultiDBClient from redis.multidb.config import DatabaseConfig, MultiDbConfig, DEFAULT_HEALTH_CHECK_INTERVAL from redis.multidb.event import ActiveDatabaseChanged -from redis.multidb.failure_detector import DEFAULT_FAILURES_THRESHOLD +from redis.multidb.failure_detector import DEFAULT_MIN_NUM_FAILURES from redis.multidb.healthcheck import EchoHealthCheck, DEFAULT_HEALTH_CHECK_DELAY from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.client import Redis @@ -74,7 +74,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen username = endpoint_config.get('username', None) password = endpoint_config.get('password', None) - failure_threshold = request.param.get('failure_threshold', DEFAULT_FAILURES_THRESHOLD) + min_num_failures = request.param.get('min_num_failures', DEFAULT_MIN_NUM_FAILURES) command_retry = request.param.get('command_retry', Retry(ExponentialBackoff(cap=0.1, base=0.01), retries=10)) # Retry configuration different for health checks as initial health check require more time in case @@ -116,7 +116,7 @@ def r_multi_db(request) -> tuple[MultiDBClient, CheckActiveDatabaseChangedListen client_class=client_class, databases_config=db_configs, command_retry=command_retry, - failure_threshold=failure_threshold, + min_num_failures=min_num_failures, health_check_probes=3, health_check_interval=health_check_interval, event_dispatcher=event_dispatcher, diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index d641fb65bb..327676a78d 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -41,13 +41,13 @@ class TestActiveActive: def teardown_method(self, method): # Timeout so the cluster could recover from network failure. - sleep(15) + sleep(20) @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -96,8 +96,8 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2, "health_check_interval": 20}, - {"client_class": RedisCluster, "failure_threshold": 2, "health_check_interval": 20}, + {"client_class": Redis, "min_num_failures": 2, "health_check_interval": 20}, + {"client_class": RedisCluster, "min_num_failures": 2, "health_check_interval": 20}, ], ids=["standalone", "cluster"], indirect=True @@ -156,8 +156,8 @@ def test_multi_db_client_uses_lag_aware_health_check(self, r_multi_db, fault_inj @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -214,8 +214,8 @@ def callback(): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -273,8 +273,8 @@ def callback(): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -329,8 +329,8 @@ def callback(pipe: Pipeline): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True @@ -389,8 +389,8 @@ def handler(message): @pytest.mark.parametrize( "r_multi_db", [ - {"client_class": Redis, "failure_threshold": 2}, - {"client_class": RedisCluster, "failure_threshold": 2}, + {"client_class": Redis, "min_num_failures": 2}, + {"client_class": RedisCluster, "min_num_failures": 2}, ], ids=["standalone", "cluster"], indirect=True