diff --git a/src/instana/instrumentation/aio_pika.py b/src/instana/instrumentation/aio_pika.py index 5e3f58d0..ef16dfa9 100644 --- a/src/instana/instrumentation/aio_pika.py +++ b/src/instana/instrumentation/aio_pika.py @@ -100,12 +100,12 @@ async def callback_wrapper( _extract_span_attributes( span, connection, "consume", message.routing_key, message.exchange ) - try: - response = await wrapped(*args, **kwargs) - except Exception as exc: - span.record_exception(exc) - else: - return response + try: + response = await wrapped(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + else: + return response wrapped_callback = callback_wrapper(callback) if kwargs.get("callback"): diff --git a/tests/clients/test_aio_pika.py b/tests/clients/test_aio_pika.py index 75c1afff..4071e568 100644 --- a/tests/clients/test_aio_pika.py +++ b/tests/clients/test_aio_pika.py @@ -86,6 +86,22 @@ async def consume_message(self, connect_method) -> None: if queue.name in message.body.decode(): break + async def consume_with_exception(self, connect_method) -> None: + connection = await connect_method() + + async def on_message(msg): + raise RuntimeError("Simulated Exception") + + async with connection: + # Creating channel + channel = await connection.channel() + + # Declaring queue + queue = await channel.declare_queue(self.queue_name) + + await queue.consume(on_message) + await asyncio.sleep(1) # Wait to ensure the message is processed + @pytest.mark.parametrize( "params_combination", ["both_args", "both_kwargs", "arg_kwarg"], @@ -184,3 +200,45 @@ def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: assert_span_info(rabbitmq_publisher_span, "publish") assert_span_info(rabbitmq_consumer_span, "consume") + + @pytest.mark.parametrize( + "connect_method", + [connect, connect_robust], + ) + def test_consume_with_exception(self, connect_method) -> None: + with tracer.start_as_current_span("test"): + self.loop.run_until_complete(self.publish_message()) + self.loop.run_until_complete(self.consume_with_exception(connect_method)) + + spans = self.recorder.queued_spans() + assert len(spans) == 3 + + rabbitmq_publisher_span = spans[0] + rabbitmq_consumer_span = spans[1] + test_span = spans[2] + + # Same traceId + assert test_span.t == rabbitmq_publisher_span.t + assert rabbitmq_publisher_span.t == rabbitmq_consumer_span.t + + # Parent relationships + assert rabbitmq_publisher_span.p == test_span.s + assert rabbitmq_consumer_span.p == rabbitmq_publisher_span.s + + # Error logging + assert not rabbitmq_publisher_span.ec + assert rabbitmq_consumer_span.ec == 1 + assert not test_span.ec + + # Span attributes + def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None: + assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange" + assert rabbitmq_span.data["rabbitmq"]["sort"] == sort + assert rabbitmq_span.data["rabbitmq"]["address"] + assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue" + assert rabbitmq_span.stack + assert isinstance(rabbitmq_span.stack, list) + assert len(rabbitmq_span.stack) > 0 + + assert_span_info(rabbitmq_publisher_span, "publish") + assert_span_info(rabbitmq_consumer_span, "consume")