Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/instana/instrumentation/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ async def callback_wrapper(
_extract_span_attributes(
span, connection, "consume", message.routing_key, message.exchange
)
try:
response = await wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return response
try:
response = await wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return response

wrapped_callback = callback_wrapper(callback)
if kwargs.get("callback"):
Expand Down
58 changes: 58 additions & 0 deletions tests/clients/test_aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ async def consume_message(self, connect_method) -> None:
if queue.name in message.body.decode():
break

async def consume_with_exception(self, connect_method) -> None:
connection = await connect_method()

async def on_message(msg):
raise RuntimeError("Simulated Exception")

async with connection:
# Creating channel
channel = await connection.channel()

# Declaring queue
queue = await channel.declare_queue(self.queue_name)

await queue.consume(on_message)
await asyncio.sleep(1) # Wait to ensure the message is processed

@pytest.mark.parametrize(
"params_combination",
["both_args", "both_kwargs", "arg_kwarg"],
Expand Down Expand Up @@ -184,3 +200,45 @@ def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None:

assert_span_info(rabbitmq_publisher_span, "publish")
assert_span_info(rabbitmq_consumer_span, "consume")

@pytest.mark.parametrize(
"connect_method",
[connect, connect_robust],
)
def test_consume_with_exception(self, connect_method) -> None:
with tracer.start_as_current_span("test"):
self.loop.run_until_complete(self.publish_message())
self.loop.run_until_complete(self.consume_with_exception(connect_method))

spans = self.recorder.queued_spans()
assert len(spans) == 3

rabbitmq_publisher_span = spans[0]
rabbitmq_consumer_span = spans[1]
test_span = spans[2]

# Same traceId
assert test_span.t == rabbitmq_publisher_span.t
assert rabbitmq_publisher_span.t == rabbitmq_consumer_span.t

# Parent relationships
assert rabbitmq_publisher_span.p == test_span.s
assert rabbitmq_consumer_span.p == rabbitmq_publisher_span.s

# Error logging
assert not rabbitmq_publisher_span.ec
assert rabbitmq_consumer_span.ec == 1
assert not test_span.ec

# Span attributes
def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None:
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == sort
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0

assert_span_info(rabbitmq_publisher_span, "publish")
assert_span_info(rabbitmq_consumer_span, "consume")
Loading