Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/instana/instrumentation/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,22 @@ async def publish_with_instana(
tracer, parent_span, _ = get_tracer_tuple()
parent_context = parent_span.get_span_context() if parent_span else None

def _bind_args(
message: Type["AbstractMessage"],
routing_key: str,
*args: object,
**kwargs: object,
) -> Tuple[object, ...]:
return (message, routing_key, args, kwargs)

(message, routing_key, args, kwargs) = _bind_args(
*args, **kwargs
)

with tracer.start_as_current_span(
"rabbitmq", span_context=parent_context
) as span:
connection = instance.channel._connection
message = kwargs["message"] if kwargs.get("message") else args[0]
routing_key = (
kwargs["routing_key"] if kwargs.get("routing_key") else args[1]
)

_extract_span_attributes(
span, connection, "publish", routing_key, instance.name
Expand All @@ -66,6 +74,9 @@ async def publish_with_instana(
message.properties.headers,
disable_w3c_trace_context=True,
)

args = (message, routing_key) + args

try:
response = await wrapped(*args, **kwargs)
except Exception as exc:
Expand Down
55 changes: 19 additions & 36 deletions tests/clients/test_aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ async def publish_message(self, params_combination: str = "both_args") -> None:
elif params_combination == "arg_kwarg":
args = (message,)
kwargs = {"routing_key": queue_name}
elif params_combination == "arg_kwarg_empty_key":
args = (message,)
kwargs = {"routing_key": ""}
else:
# params_combination == "both_args"
args = (message, queue_name)
Expand Down Expand Up @@ -102,6 +105,15 @@ async def on_message(msg):
await queue.consume(on_message)
await asyncio.sleep(1) # Wait to ensure the message is processed

def assert_span_info(self, rabbitmq_span: "ReadableSpan", sort: str, key: str = "test.queue") -> None:
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == sort
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == key
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0

@pytest.mark.parametrize(
"params_combination",
["both_args", "both_kwargs", "arg_kwarg"],
Expand All @@ -127,13 +139,8 @@ def test_basic_publish(self, params_combination) -> None:
assert not rabbitmq_span.ec

# Span attributes
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish"
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0
key = "" if params_combination == "arg_kwarg_empty_key" else self.queue_name
self.assert_span_info(rabbitmq_span, "publish", key)

def test_basic_publish_as_root_exit_span(self) -> None:
agent.options.allow_exit_as_root = True
Expand All @@ -151,13 +158,7 @@ def test_basic_publish_as_root_exit_span(self) -> None:
assert not rabbitmq_span.ec

# Span attributes
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == "publish"
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0
self.assert_span_info(rabbitmq_span, "publish")

@pytest.mark.parametrize(
"connect_method",
Expand Down Expand Up @@ -189,17 +190,8 @@ def test_basic_consume(self, connect_method) -> None:
assert not test_span.ec

# Span attributes
def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None:
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == sort
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0

assert_span_info(rabbitmq_publisher_span, "publish")
assert_span_info(rabbitmq_consumer_span, "consume")
self.assert_span_info(rabbitmq_publisher_span, "publish")
self.assert_span_info(rabbitmq_consumer_span, "consume")

@pytest.mark.parametrize(
"connect_method",
Expand Down Expand Up @@ -231,14 +223,5 @@ def test_consume_with_exception(self, connect_method) -> None:
assert not test_span.ec

# Span attributes
def assert_span_info(rabbitmq_span: "ReadableSpan", sort: str) -> None:
assert rabbitmq_span.data["rabbitmq"]["exchange"] == "test.exchange"
assert rabbitmq_span.data["rabbitmq"]["sort"] == sort
assert rabbitmq_span.data["rabbitmq"]["address"]
assert rabbitmq_span.data["rabbitmq"]["key"] == "test.queue"
assert rabbitmq_span.stack
assert isinstance(rabbitmq_span.stack, list)
assert len(rabbitmq_span.stack) > 0

assert_span_info(rabbitmq_publisher_span, "publish")
assert_span_info(rabbitmq_consumer_span, "consume")
self.assert_span_info(rabbitmq_publisher_span, "publish")
self.assert_span_info(rabbitmq_consumer_span, "consume")
Loading