On one of my project I needed to create a simple API allowing clients to publish some messages into an Azure EventHub. Because authentication is ensured by a JWT we could not rely on our clients to directly publish messages to the Eventhub (and by the way I prefer abstracting this away through a HTTP API).
TL;DR
To create automated tests for your service and ensure it properly publish messages to your eventhub with expected content you can use
asyncio.ensure_future()
function. It allows to run an event consumer in the background right before sending your test message. Next to that, you can leverageasyncio.Queue
to collect received messages and compare them with what you sent at the end of your test. In the meanwhile you can use a Docker container to emulate an Azure Eventhub Go to the solution
A simple notification service
The first implementation was pretty straighforward.
Note that I'll skip the HTTP API part as its not relevant for this article
from typing import Any, Protocol
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from pydantic import TypeAdapter
class Notifier(Protocol):
async def send(self, data: dict[str, Any]) -> None: ...
class AzureEventHubNotifier(Notifier):
"""Implementation of the Notifier port which uses an Azure Event Hub for sending notifications as events."""
def __init__(self, eventhub_producer: EventHubProducerClient) -> None:
self.hub = eventhub_producer
self.type_adapter = TypeAdapter(dict[str, Any])
async def send(self, data: dict[str, Any]) -> None:
evt = EventData(self.type_adapter.dump_json(data))
await self.hub.send_event(evt)
Note the use of Pydantic here solely for serializing
dict
into a JSON string. In particular for types which are not converted by default by the standardjson
module likedate
.
Basically, we get a simple service with one function send
for sending some data as a message in the event hub. The service expect an Azure EventHubProducerClient
which is a client coming from the Azure SDK for Python in the azure-eventhub
package.
So far so good.
Then, I wanted to test this service. My first issue was that the eventhub I wanted to test against on Azure was actually private so I was not able to reach it directly from my machine.
Azure Eventhub emulator
To be able to test and automate my tests against a running and available Eventhub I found out that Microsoft actually provide a Docker container for this purpose.
You can very quickly spin up a local Eventhub emulator for your developments using the following docker-compose.yml
and azure-eventhubs-emulator-config.json
files:
# docker-compose.yml
name: healthblocks-validator-fap
services:
eventhubs:
image: mcr.microsoft.com/azure-messaging/eventhubs-emulator:latest
volumes:
- ./azure-eventhubs-emulator-config.json:/Eventhubs_Emulator/ConfigFiles/Config.json
ports:
- 5672:5672
environment:
BLOB_SERVER: azurite
METADATA_SERVER: azurite
ACCEPT_EULA: Y
depends_on:
- azurite
azurite:
image: mcr.microsoft.com/azure-storage/azurite:latest
ports:
- 10000:10000
- 10001:10001
- 10002:10002
Note that this docker compose yaml file is coming from the Microsoft official documentation that I've simplified a bit (usage of docker network is not mandatory))
// azure-eventhubs-emulator-config.json
{
"UserConfig": {
"NamespaceConfig": [
{
"Type": "EventHub",
"Name": "emulatorNs1",
"Entities": [
{
"Name": "eh1",
"PartitionCount": "2",
"ConsumerGroups": [
{
"Name": "cg1"
}
]
}
]
}
],
"LoggingConfig": {
"Type": "File"
}
}
}
This file is actually for configuring the eventhub namespace, consumer groups, entities, ...
From that point a simple docker compose up
got you a running event hub locally after a couple of seconds. To connect to it you can simply use the connection string provided in the logs of the event hub.
import asyncio
from azure.eventhub.aio import EventHubProducerClient
con_string = "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
async def test_send_message() -> None:
async with EventHubProducerClient.from_connection_string(
con_string, eventhub_name="eh1"
) as hub:
notifier = AzureEventHubNotifier(hub)
await notifier.send({"Hello": "World!"})
print("Message sent!") # noqa: T201
asyncio.run(test_send_message())
Don't forget to specify the
eventhub_name
which is not provided in the connection string directly. This value correspond to the name of an entity in your event hub json config file.
So now we have an event hub and snippet of code for testing our beautiful service. This is great but, how do we actually ensure our service properly sent the mesage to the hub? Also, what if we want to automatize this test using pytest
for example? The best way could be to consume messages sent to the hub and ensure we actually received the one we sent so far.
Obviously, for the sake of this example, our service is so simple that there is no doubt that if the
send
function does not raise an error, there is lot of chances that the message actually landed into the hub.
Asyncio and Future to the rescue
To consume messages from our eventhub using the Azure SDK for Python we have to do something like this
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient
con_string = "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
async def on_event(_: object, event: EventData | None) -> None:
if event is None:
return
print(f"Received: {event.body_as_json()}")
async def consume_messages() -> None:
async with EventHubConsumerClient.from_connection_string(
con_string, consumer_group="cg1"
) as consumer:
await consumer.receive(on_event)
asyncio.run(consume_messages())
There is two main issues here:
- The
receive
function is actually blocking. But because its a coroutine you have to put someawait
in front of it to actually dispatch it. Any code after this statement will never be executed (except if the consumer is closed for some reasons) - The received events are processed in a callback function so you can not put any
assert
statements here becausepytest
will not be able to catch it.
Consuming events in the background
The first thing we have to do is to receive events in the background so we can keep running some code once started. I found out how to do this on stackoverflow (apologizes to the author, I lost the original thread) using some gem from asyncio
import asyncio
from contextlib import suppress
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
con_string = "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
async def on_event(_: object, event: EventData | None) -> None:
if event is None:
return
print(f"Received: {event.body_as_json()}")
async def consume_and_stop() -> None:
# Init our consumer and producers
async with (
EventHubConsumerClient.from_connection_string(
con_string, eventhub_name="eh1", consumer_group="cg1"
) as consumer,
EventHubProducerClient.from_connection_string(
con_string, eventhub_name="eh1", consumer_group="cg1"
) as producer,
):
# Start receiving messages on the background
task = asyncio.ensure_future(consumer.receive(on_event))
# Wait enough time for the consumer to be settle (otherwise we may miss some events)
await asyncio.sleep(2)
# Send some JSON formatted events
await producer.send_event(EventData('"Hello World!"'))
await producer.send_event(EventData('"Foo Bar!"'))
# Leave enough time for our consumer to receive events
await asyncio.sleep(2)
# Stops receiving events
task.cancel()
with suppress(asyncio.CancelledError):
# Don't forget to await the task in the end to avoid your Python program to hang forever
await task
asyncio.run(consume_and_stop())
Received: Hello World!
Received: Foo Bar!
The idea is to use asyncio.ensure_future
function to actually dispatch our coroutine and get a Future
object wrapping the task. This way we are no longer blocked.
We add a small sleep statement because receiving events is not instant so we could miss messages sent right after.
At the end, we cancel the task, which stops receiving events and we finally await the canceled task without raising the cancelation error.
Accessing received events in the main thread
As we mentionned, events received are passed to a given callback function. This mechanism makes it harded to use those events in our main thread (for comparing what we sent to what we a received in a test for example).
Thanksfully there is a dedicated data structure for this use case right in the standard library: asyncio.Queue
. We can leverage this async queue for collecting the events we received. Let's update our previous script.
import asyncio
from contextlib import suppress
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
con_string = "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
# Use a queue for storing events received from the callback into the main thread
events: asyncio.Queue[EventData] = asyncio.Queue()
async def on_event(_: object, event: EventData | None) -> None:
if event is None:
return
# Push events in the queue
await events.put(event)
async def consume_and_stop() -> None:
async with (
EventHubConsumerClient.from_connection_string(
con_string, eventhub_name="eh1", consumer_group="cg1"
) as consumer,
EventHubProducerClient.from_connection_string(
con_string, eventhub_name="eh1", consumer_group="cg1"
) as producer,
):
task = asyncio.ensure_future(consumer.receive(on_event))
await asyncio.sleep(2)
await producer.send_event(EventData('"Hello World!"'))
await producer.send_event(EventData('"Foo Bar!"'))
await asyncio.sleep(2)
# Stops receiving events
task.cancel()
with suppress(asyncio.CancelledError):
# Don't forget to await the task in the end to avoid your Python program to hang forever
await task
asyncio.run(consume_and_stop())
# Collect events bodies received as strings
# NOTE: queue are not iterable so we can not simply list(events)
events_bodies = [events.get_nowait().body_as_str() for _ in range(events.qsize())]
# NOTE: The order of the events might change each run
assert '"Hello World!"' in events_bodies
assert '"Foo Bar!"' in events_bodies
Note that we could also have used
pytest-docker
ortestcontainers
libraries to easily spin up our docker containers on the fly during our tests. Maybe I'll add this later on and update this article
TADA! Now we can both produce and consume events on a same thread and then we can access received events for doing any kind of tests or verifications.
Wrapping up as pytest
tests
Now that we got our main pieces we can wrap it up into nice automated tests using pytest
import asyncio
from collections.abc import AsyncIterator, Iterator
from contextlib import suppress
from typing import Any
import pytest
import pytest_asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubConsumerClient, EventHubProducerClient
from validator_fap.adapters.notifier import AzureEventHubNotifier
con_string = "Endpoint=sb://localhost:5672;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
@pytest_asyncio.fixture
async def eventhub_producer() -> AsyncIterator[EventHubProducerClient]:
async with EventHubProducerClient.from_connection_string(
con_string, eventhub_name="eh1"
) as producer:
yield producer
@pytest_asyncio.fixture
async def eventhub_consumer() -> AsyncIterator[EventHubConsumerClient]:
async with EventHubConsumerClient.from_connection_string(
con_string,
eventhub_name="eh1",
consumer_group="cg1",
) as consumer:
yield consumer
@pytest_asyncio.fixture
async def events(
eventhub_consumer: EventHubConsumerClient,
) -> AsyncIterator[Iterator[dict[str, Any]]]:
"""Start listening for event hubs events and returns a generator which yield received events."""
events: asyncio.Queue[EventData] = asyncio.Queue()
async def on_event(_: object, event: EventData | None) -> None:
"""Push events into our queue."""
if event is not None:
await events.put(event)
def collect_events() -> Iterator[dict[str, Any]]:
"""Collect events in the queue as a generator.
Using a generator here defer events loading during tests.
"""
while not events.empty():
yield events.get_nowait().body_as_json()
# Start receiving messages on the background
task = asyncio.ensure_future(eventhub_consumer.receive(on_event))
# Wait enough time for the consumer to be settle (otherwise we may miss some events)
await asyncio.sleep(2)
yield collect_events()
# Properly cancel receiver
task.cancel()
with suppress(asyncio.CancelledError):
await task
@pytest.fixture
def sut(eventhub_producer: EventHubProducerClient) -> AzureEventHubNotifier:
return AzureEventHubNotifier(eventhub_producer)
@pytest.mark.asyncio
async def test_send_health_data_publish_a_message_on_azure_event_hub(
sut: AzureEventHubNotifier,
events: Iterator[dict[str, Any]],
) -> None:
# Given
data = {"foo": "bar"}
# When
await sut.send(data)
# Then
await asyncio.sleep(1) # slight delay before actually receiving messages
assert list(events) == [data]
Note that this code snippet omitted the
AzureEventHubNotifier
class described at the beginning of the article
Lot of stuff out there but basically:
- We create a fixture for setting up an eventhub producer to be used be our SUT (system under test,
AzureEventHubNotifier
) to send messages to the hub. - We create a fixture for setting up an eventhub consumer to actually receive messages sent (if any)
- We create another
events
fixture which takes advantage of the fact that code inside functions usingyield
statements (generators and async generators) are not executed until the generator is being consumed. Here, theevents
fixture immediately start receiving events in the background and push them into a queue. The fixture yield the result of thecollect_events()
function which itself returns a generator. - Our test use the
events
fixture which starts receiving events in the background and gives us a generator that will yield received events - After setting up our test data, we send an event, wait a couple of second for the event to be received and finally consume our events generator to compare with what we sent