diff --git a/src/instana/instrumentation/aio_pika.py b/src/instana/instrumentation/aio_pika.py index ef16dfa9..a47e09f7 100644 --- a/src/instana/instrumentation/aio_pika.py +++ b/src/instana/instrumentation/aio_pika.py @@ -47,14 +47,22 @@ async def publish_with_instana( tracer, parent_span, _ = get_tracer_tuple() parent_context = parent_span.get_span_context() if parent_span else None + def _bind_args( + message: Type["AbstractMessage"], + routing_key: str, + *args: object, + **kwargs: object, + ) -> Tuple[object, ...]: + return (message, routing_key, args, kwargs) + + (message, routing_key, args, kwargs) = _bind_args( + *args, **kwargs + ) + with tracer.start_as_current_span( "rabbitmq", span_context=parent_context ) as span: connection = instance.channel._connection - message = kwargs["message"] if kwargs.get("message") else args[0] - routing_key = ( - kwargs["routing_key"] if kwargs.get("routing_key") else args[1] - ) _extract_span_attributes( span, connection, "publish", routing_key, instance.name @@ -66,6 +74,9 @@ async def publish_with_instana( message.properties.headers, disable_w3c_trace_context=True, ) + + args = (message, routing_key) + args + try: response = await wrapped(*args, **kwargs) except Exception as exc: diff --git a/tests/clients/test_aio_pika.py b/tests/clients/test_aio_pika.py index 4071e568..20e97618 100644 --- a/tests/clients/test_aio_pika.py +++ b/tests/clients/test_aio_pika.py @@ -56,6 +56,9 @@ async def publish_message(self, params_combination: str = "both_args") -> None: elif params_combination == "arg_kwarg": args = (message,) kwargs = {"routing_key": queue_name} + elif params_combination == "arg_kwarg_empty_key": + args = (message,) + kwargs = {"routing_key": ""} else: # params_combination == "both_args" args = (message, queue_name) @@ -102,6 +105,15 @@ async def on_message(msg): await queue.consume(on_message) await asyncio.sleep(1) # Wait to ensure the message is processed + def assert_span_info(self, rabbitmq_span: "ReadableSpan", sort: str, key: str = "test.queue") -> None: + assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" + assert rabbitmq_span.data["rabbitmq"]["sort"] == sort + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["key"] == key + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + @pytest.mark.parametrize( "params_combination", ["both_args", "both_kwargs", "arg_kwarg"], @@ -127,13 +139,8 @@ def test_basic_publish(self, params_combination) -> None: assert not rabbitmq_span.ec # Span attributes - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 + key = "" if params_combination == "arg_kwarg_empty_key" else self.queue_name + self.assert_span_info(rabbitmq_span, "publish", key) def test_basic_publish_as_root_exit_span(self) -> None: agent.options.allow_exit_as_root = True @@ -151,13 +158,7 @@ def test_basic_publish_as_root_exit_span(self) -> None: assert not rabbitmq_span.ec # Span attributes - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish" - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 + self.assert_span_info(rabbitmq_span, "publish") @pytest.mark.parametrize( "connect_method", @@ -189,17 +190,8 @@ def test_basic_consume(self, connect_method) -> None: assert not test_span.ec # Span attributes - def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == sort - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 - - assert_span_info(rabbitmq_publisher_span, "publish") - assert_span_info(rabbitmq_consumer_span, "consume") + self.assert_span_info(rabbitmq_publisher_span, "publish") + self.assert_span_info(rabbitmq_consumer_span, "consume") @pytest.mark.parametrize( "connect_method", @@ -231,14 +223,5 @@ def test_consume_with_exception(self, connect_method) -> None: assert not test_span.ec # Span attributes - def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: - assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" - assert rabbitmq_span.data["rabbitmq"]["sort"] == sort - assert rabbitmq_span.data["rabbitmq"]["address"] - assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" - assert rabbitmq_span.stack - assert isinstance(rabbitmq_span.stack, list) - assert len(rabbitmq_span.stack) > 0 - - assert_span_info(rabbitmq_publisher_span, "publish") - assert_span_info(rabbitmq_consumer_span, "consume") + self.assert_span_info(rabbitmq_publisher_span, "publish") + self.assert_span_info(rabbitmq_consumer_span, "consume")