Skip to content

Bug in retry-handling in the PubSub-client #3203

@sondrfos

Description

@sondrfos

Version:
redis-py: 5.0.3
redis: 7.2.4

Platform:
Python 3.12.2 running on Debian bookworm

Description:
First off I want to say thanks for maintaining this library :)

So to the issue at hand:
It seems like the retry-functionality in the PubSub-client (both async and sync) is not working correctly.
To simulate a crash of redis I am closing and restarting redis while our system is running. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.

Having dug though the source code I have found the pathways causing the crash.

async def _execute(self, conn, command, *args, **kwargs):
"""
Connect manually upon disconnection. If the Redis server is down,
this will fail and raise a ConnectionError as desired.
After reconnection, the ``on_connect`` callback should have been
called by the # connection to resubscribe us to any channels and
patterns we were previously listening to
"""
return await conn.retry.call_with_retry(
lambda: command(*args, **kwargs),
lambda error: self._disconnect_raise_connect(conn, error),
)

The _execute-function runs periodicially, and whenever redis is closed the command fails.
This causes the _disconnect_raise_connect-function to be called:
async def _disconnect_raise_connect(self, conn, error):
"""
Close the connection and raise an exception
if retry_on_error is not set or the error is not one
of the specified error types. Otherwise, try to
reconnect
"""
await conn.disconnect()
if (
conn.retry_on_error is None
or isinstance(error, tuple(conn.retry_on_error)) is False
):
raise error
await conn.connect()

_disconnect_raise_connect closes the connection, conn.retry_on_error is True making the function skip raising error.
This is followed by an attempt at reconnecting.
async def connect(self):
"""Connects to the Redis server if not already connected"""
if self.is_connected:
return
try:
await self.retry.call_with_retry(
lambda: self._connect(), lambda error: self.disconnect()
)
except asyncio.CancelledError:
raise # in 3.7 and earlier, this is an Exception, not BaseException
except (socket.timeout, asyncio.TimeoutError):
raise TimeoutError("Timeout connecting to server")
except OSError as e:
raise ConnectionError(self._error_message(e))
except Exception as exc:
raise ConnectionError(exc) from exc

Since we are not currently connected, the first if-statement is False. A call with retry is made to the _connect-function.
async def _connect(self):
"""Create a TCP socket connection"""
async with async_timeout(self.socket_connect_timeout):
reader, writer = await asyncio.open_connection(
**self._connection_arguments()
)
self._reader = reader
self._writer = writer
sock = writer.transport.get_extra_info("socket")
if sock:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
try:
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in self.socket_keepalive_options.items():
sock.setsockopt(socket.SOL_TCP, k, v)
except (OSError, TypeError):
# `socket_keepalive_options` might contain invalid options
# causing an error. Do not leave the connection open.
writer.close()
raise

Inside the _connect-function an attempt is made to connect to redis. Since redis is unavailable an OSError is created. OSError is not in the list of supported_errors causing it to bypass the error handling in the call_with_retry-function and is propagated to the connect-function. Here it is converted to a ConnectionError and is propagated all the way to the top unhandled.

Is this intended behaviour?
Is this something that could be handled in a different way?

We have temporarily fixed this by adding OSError to the list of supported_errors, but this feels like quite a hacky solution.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions