Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
e6d90e4
MultiDbClient implementation (#3696)
vladvildanov Aug 11, 2025
8c09cbe
Added support for Pipeline and transactions (#3707)
vladvildanov Aug 11, 2025
4a40ee4
Added support for Pub/Sub mode in MultiDbClient (#3722)
vladvildanov Aug 13, 2025
fb500f6
Refactored docblocks (#3744)
vladvildanov Aug 18, 2025
d6cdaeb
Refactored healthcheck and failure detector to extend default one (#3…
vladvildanov Aug 20, 2025
68fe530
Added MultiDbClient support with OSS Cluster API (#3734)
vladvildanov Aug 20, 2025
8daa531
Added LagAwareHealthCheck for MultiDBClient (#3737)
vladvildanov Aug 22, 2025
f9fdc99
Added lag_aware_tolerance parameter to LagAwareHealthcheck (#3752)
vladvildanov Aug 26, 2025
866003b
Extract additional interfaces and abstract classes (#3754)
vladvildanov Aug 28, 2025
ec8113b
Added async implementation of MultiDBClient (#3762)
vladvildanov Sep 10, 2025
1dfffd2
Added pipeline and transaction support for MultiDBClient (#3763)
vladvildanov Sep 10, 2025
4817a26
Added pub/sub support for MultiDBClient (#3764)
vladvildanov Sep 11, 2025
481d89e
Added support for Lag-Aware Healthcheck and OSS Cluster API (#3768)
vladvildanov Sep 17, 2025
f81206b
Refactored Healthcheck and Failover strategy logic (#3771)
vladvildanov Sep 18, 2025
5dff45a
Merge branch 'master' of github.com:redis/redis-py into feat/active-a…
vladvildanov Sep 19, 2025
457a35c
Removed redundant dependency
vladvildanov Sep 19, 2025
c21da4b
Fixed async tests
vladvildanov Sep 19, 2025
821cc54
Increased lag-aware tolerance
vladvildanov Sep 19, 2025
a7540c1
Fixed typing issue, increase health_check_interval, added timeout han…
vladvildanov Sep 19, 2025
ca8166c
Decreased retry cap, increased failure delay
vladvildanov Sep 22, 2025
f50299e
Fixed async teardown
vladvildanov Sep 22, 2025
115d996
Fixed tests
vladvildanov Sep 22, 2025
6556e8a
Added graceful connection closing, added graceful hc tasks termination
vladvildanov Sep 22, 2025
063e795
Make sure active connection will be disconnected on failover
vladvildanov Sep 22, 2025
3ef34b1
Close cluster connection on failover
vladvildanov Sep 22, 2025
a1c0633
Refactored Failure Detector (#3775)
vladvildanov Sep 25, 2025
f5231ee
Decreased timeouts
vladvildanov Sep 25, 2025
413ea86
Added missing fixture
vladvildanov Sep 25, 2025
3ed14e4
Fixed None exception
vladvildanov Sep 30, 2025
6b0c4ad
Merge branch 'master' of github.com:redis/redis-py into feat/active-a…
vladvildanov Oct 2, 2025
71fc90f
Codestyle changes
vladvildanov Oct 2, 2025
57d328b
Codestyle changes
vladvildanov Oct 2, 2025
18b5b52
Skip async scenario tests
vladvildanov Oct 2, 2025
2ad005a
Codestyle change
vladvildanov Oct 2, 2025
2672fce
Fixed unused arguments
vladvildanov Oct 2, 2025
be5d2e8
Refactored bg scheduler
vladvildanov Oct 2, 2025
bca7a31
Fixed tests
vladvildanov Oct 2, 2025
d9ad720
Fixed tests
vladvildanov Oct 2, 2025
031b705
Codestyle fixes
vladvildanov Oct 2, 2025
e3bfa78
Reduce timeouts to avoid overlaping with healthcheck
vladvildanov Oct 2, 2025
5d5ff26
Marked tests non-clsuter only
vladvildanov Oct 2, 2025
cb51f8a
Update timeouts
vladvildanov Oct 2, 2025
d87ab3d
Merge branch 'master' into feat/active-active
vladvildanov Oct 2, 2025
7f7ea76
Skip scenario tests
vladvildanov Oct 2, 2025
852e835
Merge branch 'feat/active-active' of github.com:redis/redis-py into f…
vladvildanov Oct 2, 2025
933b3ad
Updated timeouts
vladvildanov Oct 2, 2025
c350d61
Merge branch 'master' into feat/active-active
petyaslavova Oct 2, 2025
cdef58b
Increased timeout
vladvildanov Oct 3, 2025
003853b
Merge branch 'feat/active-active' of github.com:redis/redis-py into f…
vladvildanov Oct 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/install_and_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ cd ${TESTDIR}
# install, run tests
pip install ${PKG}
# Redis tests
pytest -m 'not onlycluster' --ignore=tests/test_scenario
pytest -m 'not onlycluster' --ignore=tests/test_scenario --ignore=tests/test_asyncio/test_scenario
# RedisCluster tests
CLUSTER_URL="redis://localhost:16379/0"
CLUSTER_SSL_URL="rediss://localhost:27379/0"
Expand Down
1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ uvloop
vulture>=2.3.0
numpy>=1.24.0
redis-entraid==1.0.0
pybreaker>=1.4.0
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ocsp = [
jwt = [
"PyJWT>=2.9.0",
]
circuit_breaker = [
"pybreaker>=1.4.0"
]

[project.urls]
Changes = "https://github.com/redis/redis-py/releases"
Expand Down
12 changes: 9 additions & 3 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,7 @@ async def run(
*,
exception_handler: Optional["PSWorkerThreadExcHandlerT"] = None,
poll_timeout: float = 1.0,
pubsub=None,
) -> None:
"""Process pub/sub messages using registered callbacks.

Expand All @@ -1263,9 +1264,14 @@ async def run(
await self.connect()
while True:
try:
await self.get_message(
ignore_subscribe_messages=True, timeout=poll_timeout
)
if pubsub is None:
await self.get_message(
ignore_subscribe_messages=True, timeout=poll_timeout
)
else:
await pubsub.get_message(
ignore_subscribe_messages=True, timeout=poll_timeout
)
except asyncio.CancelledError:
raise
except BaseException as e:
Expand Down
6 changes: 5 additions & 1 deletion redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def __init__(
else:
self._event_dispatcher = event_dispatcher

self.startup_nodes = startup_nodes
self.nodes_manager = NodesManager(
startup_nodes,
require_full_coverage,
Expand Down Expand Up @@ -2253,7 +2254,10 @@ async def _reinitialize_on_error(self, error):
await self._pipe.cluster_client.nodes_manager.initialize()
self.reinitialize_counter = 0
else:
self._pipe.cluster_client.nodes_manager.update_moved_exception(error)
if isinstance(error, AskError):
self._pipe.cluster_client.nodes_manager.update_moved_exception(
error
)

self._executing = False

Expand Down
18 changes: 18 additions & 0 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def __init__(
self._connect_callbacks: List[weakref.WeakMethod[ConnectCallbackT]] = []
self._buffer_cutoff = 6000
self._re_auth_token: Optional[TokenInterface] = None
self._should_reconnect = False

try:
p = int(protocol)
Expand Down Expand Up @@ -343,6 +344,12 @@ async def connect_check_health(
if task and inspect.isawaitable(task):
await task

def mark_for_reconnect(self):
self._should_reconnect = True

def should_reconnect(self):
return self._should_reconnect

@abstractmethod
async def _connect(self):
pass
Expand Down Expand Up @@ -1240,6 +1247,9 @@ async def release(self, connection: AbstractConnection):
# Connections should always be returned to the correct pool,
# not doing so is an error that will cause an exception here.
self._in_use_connections.remove(connection)
if connection.should_reconnect():
await connection.disconnect()

self._available_connections.append(connection)
await self._event_dispatcher.dispatch_async(
AsyncAfterConnectionReleasedEvent(connection)
Expand Down Expand Up @@ -1267,6 +1277,14 @@ async def disconnect(self, inuse_connections: bool = True):
if exc:
raise exc

async def update_active_connections_for_reconnect(self):
"""
Mark all active connections for reconnect.
"""
async with self._lock:
for conn in self._in_use_connections:
conn.mark_for_reconnect()

async def aclose(self) -> None:
"""Close the pool, disconnecting all connections"""
await self.disconnect()
Expand Down
Empty file added redis/asyncio/http/__init__.py
Empty file.
265 changes: 265 additions & 0 deletions redis/asyncio/http/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
import asyncio
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Mapping, Optional, Union

from redis.http.http_client import HttpClient, HttpResponse

DEFAULT_USER_AGENT = "HttpClient/1.0 (+https://example.invalid)"
DEFAULT_TIMEOUT = 30.0
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}


class AsyncHTTPClient(ABC):
@abstractmethod
async def get(
self,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
"""
Invoke HTTP GET request."""
pass

@abstractmethod
async def delete(
self,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
"""
Invoke HTTP DELETE request."""
pass

@abstractmethod
async def post(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
"""
Invoke HTTP POST request."""
pass

@abstractmethod
async def put(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
"""
Invoke HTTP PUT request."""
pass

@abstractmethod
async def patch(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
"""
Invoke HTTP PATCH request."""
pass

@abstractmethod
async def request(
self,
method: str,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Union[bytes, str]] = None,
timeout: Optional[float] = None,
) -> HttpResponse:
"""
Invoke HTTP request with given method."""
pass


class AsyncHTTPClientWrapper(AsyncHTTPClient):
"""
An async wrapper around sync HTTP client with thread pool execution.
"""

def __init__(self, client: HttpClient, max_workers: int = 10) -> None:
"""
Initialize a new HTTP client instance.

Args:
client: Sync HTTP client instance.
max_workers: Maximum number of concurrent requests.

The client supports both regular HTTPS with server verification and mutual TLS
authentication. For server verification, provide CA certificate information via
ca_file, ca_path or ca_data. For mutual TLS, additionally provide a client
certificate and key via client_cert_file and client_key_file.
"""
self.client = client
self._executor = ThreadPoolExecutor(max_workers=max_workers)

async def get(
self,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor, self.client.get, path, params, headers, timeout, expect_json
)

async def delete(
self,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.client.delete,
path,
params,
headers,
timeout,
expect_json,
)

async def post(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.client.post,
path,
json_body,
data,
params,
headers,
timeout,
expect_json,
)

async def put(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.client.put,
path,
json_body,
data,
params,
headers,
timeout,
expect_json,
)

async def patch(
self,
path: str,
json_body: Optional[Any] = None,
data: Optional[Union[bytes, str]] = None,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
timeout: Optional[float] = None,
expect_json: bool = True,
) -> Union[HttpResponse, Any]:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.client.patch,
path,
json_body,
data,
params,
headers,
timeout,
expect_json,
)

async def request(
self,
method: str,
path: str,
params: Optional[
Mapping[str, Union[None, str, int, float, bool, list, tuple]]
] = None,
headers: Optional[Mapping[str, str]] = None,
body: Optional[Union[bytes, str]] = None,
timeout: Optional[float] = None,
) -> HttpResponse:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._executor,
self.client.request,
method,
path,
params,
headers,
body,
timeout,
)
Empty file.
Loading
Loading