Skip to content

Commit a1c0633

Browse files
authored
Refactored Failure Detector (#3775)
1 parent 3ef34b1 commit a1c0633

File tree

16 files changed

+277
-268
lines changed

16 files changed

+277
-268
lines changed

redis/asyncio/multidb/command_executor.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,9 @@ def pubsub(self, **kwargs):
199199

200200
async def execute_command(self, *args, **options):
201201
async def callback():
202-
return await self._active_database.client.execute_command(*args, **options)
202+
response = await self._active_database.client.execute_command(*args, **options)
203+
await self._register_command_execution(args)
204+
return response
203205

204206
return await self._execute_with_failure_detection(callback, args)
205207

@@ -209,7 +211,9 @@ async def callback():
209211
for command, options in command_stack:
210212
pipe.execute_command(*command, **options)
211213

212-
return await pipe.execute()
214+
response = await pipe.execute()
215+
await self._register_command_execution(command_stack)
216+
return response
213217

214218
return await self._execute_with_failure_detection(callback, command_stack)
215219

@@ -222,23 +226,28 @@ async def execute_transaction(
222226
watch_delay: Optional[float] = None,
223227
):
224228
async def callback():
225-
return await self._active_database.client.transaction(
229+
response = await self._active_database.client.transaction(
226230
func,
227231
*watches,
228232
shard_hint=shard_hint,
229233
value_from_callable=value_from_callable,
230234
watch_delay=watch_delay
231235
)
236+
await self._register_command_execution(())
237+
return response
232238

233239
return await self._execute_with_failure_detection(callback)
234240

235241
async def execute_pubsub_method(self, method_name: str, *args, **kwargs):
236242
async def callback():
237243
method = getattr(self.active_pubsub, method_name)
238244
if iscoroutinefunction(method):
239-
return await method(*args, **kwargs)
245+
response = await method(*args, **kwargs)
240246
else:
241-
return method(*args, **kwargs)
247+
response = method(*args, **kwargs)
248+
249+
await self._register_command_execution(args)
250+
return response
242251

243252
return await self._execute_with_failure_detection(callback, *args)
244253

@@ -280,6 +289,10 @@ async def _check_active_database(self):
280289
async def _on_command_fail(self, error, *args):
281290
await self._event_dispatcher.dispatch_async(AsyncOnCommandsFailEvent(args, error))
282291

292+
async def _register_command_execution(self, cmd: tuple):
293+
for detector in self._failure_detectors:
294+
await detector.register_command_execution(cmd)
295+
283296
def _setup_event_dispatcher(self):
284297
"""
285298
Registers necessary listeners.

redis/asyncio/multidb/config.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@
77
from redis.asyncio.multidb.database import Databases, Database
88
from redis.asyncio.multidb.failover import AsyncFailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_DELAY, \
99
DEFAULT_FAILOVER_ATTEMPTS
10-
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper, \
11-
DEFAULT_FAILURES_THRESHOLD, DEFAULT_FAILURES_DURATION
10+
from redis.asyncio.multidb.failure_detector import AsyncFailureDetector, FailureDetectorAsyncWrapper
1211
from redis.asyncio.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_INTERVAL, \
1312
DEFAULT_HEALTH_CHECK_PROBES, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
1413
from redis.asyncio.retry import Retry
1514
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
1615
from redis.data_structure import WeightedList
1716
from redis.event import EventDispatcherInterface, EventDispatcher
1817
from redis.multidb.circuit import CircuitBreaker, PBCircuitBreakerAdapter, DEFAULT_GRACE_PERIOD
19-
from redis.multidb.failure_detector import CommandFailureDetector
18+
from redis.multidb.failure_detector import CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \
19+
DEFAULT_FAILURE_RATE_THRESHOLD, DEFAULT_FAILURES_DETECTION_WINDOW
2020

2121
DEFAULT_AUTO_FALLBACK_INTERVAL = 120
2222

@@ -70,8 +70,9 @@ class MultiDbConfig:
7070
client_class: The client class used to manage database connections.
7171
command_retry: Retry strategy for executing database commands.
7272
failure_detectors: Optional list of additional failure detectors for monitoring database failures.
73-
failure_threshold: Threshold for determining database failure.
74-
failures_interval: Time interval for tracking database failures.
73+
min_num_failures: Minimal count of failures required for failover
74+
failure_rate_threshold: Percentage of failures required for failover
75+
failures_detection_window: Time interval for tracking database failures.
7576
health_checks: Optional list of additional health checks performed on databases.
7677
health_check_interval: Time interval for executing health checks.
7778
health_check_probes: Number of attempts to evaluate the health of a database.
@@ -105,8 +106,9 @@ class MultiDbConfig:
105106
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
106107
)
107108
failure_detectors: Optional[List[AsyncFailureDetector]] = None
108-
failure_threshold: int = DEFAULT_FAILURES_THRESHOLD
109-
failures_interval: float = DEFAULT_FAILURES_DURATION
109+
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES
110+
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD
111+
failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW
110112
health_checks: Optional[List[HealthCheck]] = None
111113
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
112114
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
@@ -151,7 +153,11 @@ def databases(self) -> Databases:
151153
def default_failure_detectors(self) -> List[AsyncFailureDetector]:
152154
return [
153155
FailureDetectorAsyncWrapper(
154-
CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval)
156+
CommandFailureDetector(
157+
min_num_failures=self.min_num_failures,
158+
failure_rate_threshold=self.failure_rate_threshold,
159+
failure_detection_window=self.failures_detection_window
160+
)
155161
),
156162
]
157163

redis/asyncio/multidb/failure_detector.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@
22

33
from redis.multidb.failure_detector import FailureDetector
44

5-
DEFAULT_FAILURES_THRESHOLD = 1000
6-
DEFAULT_FAILURES_DURATION = 2
7-
85
class AsyncFailureDetector(ABC):
96

107
@abstractmethod
118
async def register_failure(self, exception: Exception, cmd: tuple) -> None:
129
"""Register a failure that occurred during command execution."""
1310
pass
1411

12+
@abstractmethod
13+
async def register_command_execution(self, cmd: tuple) -> None:
14+
"""Register a command execution."""
15+
pass
16+
1517
@abstractmethod
1618
def set_command_executor(self, command_executor) -> None:
1719
"""Set the command executor for this failure."""
@@ -27,5 +29,8 @@ def __init__(self, failure_detector: FailureDetector) -> None:
2729
async def register_failure(self, exception: Exception, cmd: tuple) -> None:
2830
self._failure_detector.register_failure(exception, cmd)
2931

32+
async def register_command_execution(self, cmd: tuple) -> None:
33+
self._failure_detector.register_command_execution(cmd)
34+
3035
def set_command_executor(self, command_executor) -> None:
3136
self._failure_detector.set_command_executor(command_executor)

redis/multidb/command_executor.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ def failover_strategy_executor(self) -> FailoverStrategyExecutor:
224224

225225
def execute_command(self, *args, **options):
226226
def callback():
227-
return self._active_database.client.execute_command(*args, **options)
227+
response = self._active_database.client.execute_command(*args, **options)
228+
self._register_command_execution(args)
229+
return response
228230

229231
return self._execute_with_failure_detection(callback, args)
230232

@@ -234,13 +236,17 @@ def callback():
234236
for command, options in command_stack:
235237
pipe.execute_command(*command, **options)
236238

237-
return pipe.execute()
239+
response = pipe.execute()
240+
self._register_command_execution(command_stack)
241+
return response
238242

239243
return self._execute_with_failure_detection(callback, command_stack)
240244

241245
def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options):
242246
def callback():
243-
return self._active_database.client.transaction(transaction, *watches, **options)
247+
response = self._active_database.client.transaction(transaction, *watches, **options)
248+
self._register_command_execution(())
249+
return response
244250

245251
return self._execute_with_failure_detection(callback)
246252

@@ -256,7 +262,9 @@ def callback():
256262
def execute_pubsub_method(self, method_name: str, *args, **kwargs):
257263
def callback():
258264
method = getattr(self.active_pubsub, method_name)
259-
return method(*args, **kwargs)
265+
response = method(*args, **kwargs)
266+
self._register_command_execution(args)
267+
return response
260268

261269
return self._execute_with_failure_detection(callback, *args)
262270

@@ -298,6 +306,10 @@ def _check_active_database(self):
298306
self.active_database = self._failover_strategy_executor.execute()
299307
self._schedule_next_fallback()
300308

309+
def _register_command_execution(self, cmd: tuple):
310+
for detector in self._failure_detectors:
311+
detector.register_command_execution(cmd)
312+
301313
def _setup_event_dispatcher(self):
302314
"""
303315
Registers necessary listeners.

redis/multidb/config.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
from redis.event import EventDispatcher, EventDispatcherInterface
1212
from redis.multidb.circuit import PBCircuitBreakerAdapter, CircuitBreaker, DEFAULT_GRACE_PERIOD
1313
from redis.multidb.database import Database, Databases
14-
from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_FAILURES_THRESHOLD, \
15-
DEFAULT_FAILURES_DURATION
14+
from redis.multidb.failure_detector import FailureDetector, CommandFailureDetector, DEFAULT_MIN_NUM_FAILURES, \
15+
DEFAULT_FAILURES_DETECTION_WINDOW, DEFAULT_FAILURE_RATE_THRESHOLD
1616
from redis.multidb.healthcheck import HealthCheck, EchoHealthCheck, DEFAULT_HEALTH_CHECK_PROBES, \
1717
DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_DELAY, HealthCheckPolicies, DEFAULT_HEALTH_CHECK_POLICY
1818
from redis.multidb.failover import FailoverStrategy, WeightBasedFailoverStrategy, DEFAULT_FAILOVER_ATTEMPTS, \
@@ -71,8 +71,9 @@ class MultiDbConfig:
7171
client_class: The client class used to manage database connections.
7272
command_retry: Retry strategy for executing database commands.
7373
failure_detectors: Optional list of additional failure detectors for monitoring database failures.
74-
failure_threshold: Threshold for determining database failure.
75-
failures_interval: Time interval for tracking database failures.
74+
min_num_failures: Minimal count of failures required for failover
75+
failure_rate_threshold: Percentage of failures required for failover
76+
failures_detection_window: Time interval for tracking database failures.
7677
health_checks: Optional list of additional health checks performed on databases.
7778
health_check_interval: Time interval for executing health checks.
7879
health_check_probes: Number of attempts to evaluate the health of a database.
@@ -107,8 +108,9 @@ class MultiDbConfig:
107108
backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3
108109
)
109110
failure_detectors: Optional[List[FailureDetector]] = None
110-
failure_threshold: int = DEFAULT_FAILURES_THRESHOLD
111-
failures_interval: float = DEFAULT_FAILURES_DURATION
111+
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES
112+
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD
113+
failures_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW
112114
health_checks: Optional[List[HealthCheck]] = None
113115
health_check_interval: float = DEFAULT_HEALTH_CHECK_INTERVAL
114116
health_check_probes: int = DEFAULT_HEALTH_CHECK_PROBES
@@ -152,7 +154,11 @@ def databases(self) -> Databases:
152154

153155
def default_failure_detectors(self) -> List[FailureDetector]:
154156
return [
155-
CommandFailureDetector(threshold=self.failure_threshold, duration=self.failures_interval),
157+
CommandFailureDetector(
158+
min_num_failures=self.min_num_failures,
159+
failure_rate_threshold=self.failure_rate_threshold,
160+
failure_detection_window=self.failures_detection_window
161+
),
156162
]
157163

158164
def default_health_checks(self) -> List[HealthCheck]:

redis/multidb/failure_detector.py

Lines changed: 39 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import math
12
import threading
23
from abc import ABC, abstractmethod
34
from datetime import datetime, timedelta
@@ -7,8 +8,9 @@
78

89
from redis.multidb.circuit import State as CBState
910

10-
DEFAULT_FAILURES_THRESHOLD = 1000
11-
DEFAULT_FAILURES_DURATION = 2
11+
DEFAULT_MIN_NUM_FAILURES = 1000
12+
DEFAULT_FAILURE_RATE_THRESHOLD = 0.1
13+
DEFAULT_FAILURES_DETECTION_WINDOW = 2
1214

1315
class FailureDetector(ABC):
1416

@@ -17,6 +19,11 @@ def register_failure(self, exception: Exception, cmd: tuple) -> None:
1719
"""Register a failure that occurred during command execution."""
1820
pass
1921

22+
@abstractmethod
23+
def register_command_execution(self, cmd: tuple) -> None:
24+
"""Register a command execution."""
25+
pass
26+
2027
@abstractmethod
2128
def set_command_executor(self, command_executor) -> None:
2229
"""Set the command executor for this failure."""
@@ -28,56 +35,65 @@ class CommandFailureDetector(FailureDetector):
2835
"""
2936
def __init__(
3037
self,
31-
threshold: int = DEFAULT_FAILURES_THRESHOLD,
32-
duration: float = DEFAULT_FAILURES_DURATION,
38+
min_num_failures: int = DEFAULT_MIN_NUM_FAILURES,
39+
failure_rate_threshold: float = DEFAULT_FAILURE_RATE_THRESHOLD,
40+
failure_detection_window: float = DEFAULT_FAILURES_DETECTION_WINDOW,
3341
error_types: Optional[List[Type[Exception]]] = None,
3442
) -> None:
3543
"""
3644
Initialize a new CommandFailureDetector instance.
3745
3846
Args:
39-
threshold: The number of failures that must occur within the duration to trigger failure detection.
40-
duration: The time window in seconds during which failures are counted.
47+
min_num_failures: Minimal count of failures required for failover
48+
failure_rate_threshold: Percentage of failures required for failover
49+
failure_detection_window: Time interval for executing health checks.
4150
error_types: Optional list of exception types to trigger failover. If None, all exceptions are counted.
4251
4352
The detector tracks command failures within a sliding time window. When the number of failures
4453
exceeds the threshold within the specified duration, it triggers failure detection.
4554
"""
4655
self._command_executor = None
47-
self._threshold = threshold
48-
self._duration = duration
56+
self._min_num_failures = min_num_failures
57+
self._failure_rate_threshold = failure_rate_threshold
58+
self._failure_detection_window = failure_detection_window
4959
self._error_types = error_types
60+
self._commands_executed: int = 0
5061
self._start_time: datetime = datetime.now()
51-
self._end_time: datetime = self._start_time + timedelta(seconds=self._duration)
52-
self._failures_within_duration: List[tuple[datetime, tuple]] = []
62+
self._end_time: datetime = self._start_time + timedelta(seconds=self._failure_detection_window)
63+
self._failures_count: int = 0
5364
self._lock = threading.RLock()
5465

5566
def register_failure(self, exception: Exception, cmd: tuple) -> None:
56-
failure_time = datetime.now()
57-
58-
if not self._start_time < failure_time < self._end_time:
59-
self._reset()
60-
6167
with self._lock:
6268
if self._error_types:
6369
if type(exception) in self._error_types:
64-
self._failures_within_duration.append((datetime.now(), cmd))
70+
self._failures_count += 1
6571
else:
66-
self._failures_within_duration.append((datetime.now(), cmd))
72+
self._failures_count += 1
6773

68-
self._check_threshold()
74+
self._check_threshold()
6975

7076
def set_command_executor(self, command_executor) -> None:
7177
self._command_executor = command_executor
7278

73-
def _check_threshold(self):
79+
def register_command_execution(self, cmd: tuple) -> None:
7480
with self._lock:
75-
if len(self._failures_within_duration) >= self._threshold:
76-
self._command_executor.active_database.circuit.state = CBState.OPEN
81+
if not self._start_time < datetime.now() < self._end_time:
7782
self._reset()
7883

84+
self._commands_executed += 1
85+
86+
def _check_threshold(self):
87+
if (
88+
self._failures_count >= self._min_num_failures
89+
and self._failures_count >= (math.ceil(self._commands_executed * self._failure_rate_threshold))
90+
):
91+
self._command_executor.active_database.circuit.state = CBState.OPEN
92+
self._reset()
93+
7994
def _reset(self) -> None:
8095
with self._lock:
8196
self._start_time = datetime.now()
82-
self._end_time = self._start_time + timedelta(seconds=self._duration)
83-
self._failures_within_duration = []
97+
self._end_time = self._start_time + timedelta(seconds=self._failure_detection_window)
98+
self._failures_count = 0
99+
self._commands_executed = 0

0 commit comments

Comments
 (0)