Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions tests/test_scenario/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ def client_maint_events(endpoints_config):

def _get_client_maint_events(
endpoints_config,
protocol: int = 3,
enable_maintenance_events: bool = True,
endpoint_type: Optional[EndpointType] = None,
enable_relax_timeout: bool = True,
enable_proactive_reconnect: bool = True,
disable_retries: bool = False,
socket_timeout: Optional[float] = None,
host_config: Optional[str] = None,
):
"""Create Redis client with maintenance events enabled."""

Expand All @@ -74,11 +76,9 @@ def _get_client_maint_events(
raise ValueError("No endpoints found in configuration")

parsed = urlparse(endpoints[0])
host = parsed.hostname
host = parsed.hostname if host_config is None else host_config
port = parsed.port

tls_enabled = True if parsed.scheme == "rediss" else False

if not host:
raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}")

Expand All @@ -99,6 +99,9 @@ def _get_client_maint_events(
else:
retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3)

tls_enabled = True if parsed.scheme == "rediss" else False
logging.info(f"TLS enabled: {tls_enabled}")

client = Redis(
host=host,
port=port,
Expand All @@ -108,7 +111,7 @@ def _get_client_maint_events(
ssl=tls_enabled,
ssl_cert_reqs="none",
ssl_check_hostname=False,
protocol=3, # RESP3 required for push notifications
protocol=protocol, # RESP3 required for push notifications
maintenance_events_config=maintenance_config,
retry=retry,
)
Expand Down
5 changes: 3 additions & 2 deletions tests/test_scenario/fault_injector_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import time
import urllib.request
import urllib.error
from typing import Dict, Any, Optional, Union
from enum import Enum

Expand Down Expand Up @@ -94,7 +95,7 @@ def get_action_status(self, action_id: str) -> Dict[str, Any]:
return self._make_request("GET", f"/action/{action_id}")

def execute_rladmin_command(
self, command: str, bdb_id: str = None
self, command: str, bdb_id: Optional[str] = None
) -> Dict[str, Any]:
"""Execute rladmin command directly as string"""
url = f"{self.base_url}/rladmin"
Expand Down Expand Up @@ -146,4 +147,4 @@ def get_operation_result(
logging.warning(f"Error checking operation status: {e}")
time.sleep(check_interval)
else:
raise TimeoutError(f"Timeout waiting for operation {action_id}")
pytest.fail(f"Timeout waiting for operation {action_id}")
39 changes: 39 additions & 0 deletions tests/test_scenario/hitless_upgrade_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ClientValidations:
def wait_push_notification(
redis_client: Redis,
timeout: int = 120,
fail_on_timeout: bool = True,
connection: Optional[Connection] = None,
):
"""Wait for a push notification to be received."""
Expand All @@ -35,11 +36,15 @@ def wait_push_notification(
logging.debug(
f"Push notification has been received. Response: {push_response}"
)
if test_conn.should_reconnect():
logging.debug("Connection is marked for reconnect")
return
except Exception as e:
logging.error(f"Error reading push notification: {e}")
break
time.sleep(check_interval)
if fail_on_timeout:
pytest.fail("Timeout waiting for push notification")
finally:
# Release the connection back to the pool
try:
Expand Down Expand Up @@ -215,6 +220,40 @@ def find_endpoint_for_bind(

raise ValueError(f"No endpoint ID for {endpoint_name} found in cluster status")

@staticmethod
def execute_failover(
fault_injector: FaultInjectorClient,
endpoint_config: Dict[str, Any],
timeout: int = 60,
) -> Dict[str, Any]:
"""Execute failover command and wait for completion."""

try:
bdb_id = endpoint_config.get("bdb_id")
failover_action = ActionRequest(
action_type=ActionType.FAILOVER,
parameters={
"bdb_id": bdb_id,
},
)
trigger_action_result = fault_injector.trigger_action(failover_action)
action_id = trigger_action_result.get("action_id")
if not action_id:
raise ValueError(
f"Failed to trigger fail over action for bdb_id {bdb_id}: {trigger_action_result}"
)

action_status_check_response = fault_injector.get_operation_result(
action_id, timeout=timeout
)
logging.info(
f"Completed cluster nodes info reading: {action_status_check_response}"
)
return action_status_check_response

except Exception as e:
pytest.fail(f"Failed to get cluster nodes info: {e}")

@staticmethod
def execute_rladmin_migrate(
fault_injector: FaultInjectorClient,
Expand Down
Loading