Celery: Advanced Routing Techniques

Nowadays the microservice architecture is considered the most scalable approach for a wide range of business problems, in particular because it promises fast and lean development cycles.

The best case scenario for microservices is when the data entities that define our applications are completely decoupled. Unfortunately that is rarely the case, and managing the communication between microservices is far from the easiest task a team may encounter.

In the most simple use case, we can use plain HTTPS requests to send and receive messages from and to other services.

Unfortunately this methodology does in fact tend to couple the microservices and depending on scale, could deteriorate the performance of the application.

Use case: A simple ecommerce

As a case study we’ll draft out the architecture of a simple ecommerce, we start with these three microservices:

Order – Manages the orders and its lines (e.g. in review, dispatched).
Logistic – Manages the moving about of the items.
Billing – Manages the company general ledger.

When a customers fills his basket with whichever item he wants and completes the payment procedure, we’ll be generating an order.

The Order microservice may need to send the information to another microservice(s), for example to the Billing and the Logistic microservices.

In the HTTPS scenario, the Order microservice needs to know of the existence of those services, namely Billing and Logistics, and of their API structure. This poses the following problems:
If a third microservice needs to be added to the loop, the code of Order needs to be altered directly and API changes may need to cascade to other microservices.

Additionally we may have long chains of HTTP requests and an API gateway that needs to manage both internal and client generated traffic. This could slow down our application significantly.

Another way to manage the communication between microservices is by using asynchronous messaging; one of the benefits of using async is that it allows a microservice to extend the logic whilst not requiring any alterations in the producers’ source code, thereby following the open-closed principle.

Unfortunately using asynchronous messaging at scale may be quite the challenge on its own, and the python asynchronous ecosystem is unfortunately, still pretty immature leaving developers with little to no reference.

In this article I will present an example implemented using Celery, attrs, and cattrs, which tries to be as exhaustive as possible.

Asynchronous messaging using Celery

Albeit we can choose among various libraries like pika, I will implement it using Celery and Kombu.

In particular we will create specific Topic exchanges that will be named after our microservices, and let each interested microservice subscribe to the various events using routing_keys.

We will also define our events using attrs, it has all the features of python dataclasses plus some other candy, like validation and a wider compatibility, which includes python 2.7 and python 3.4+.

The event_manager common package

Now we will create a library that will be common among our microservices, we will call it event_manager, the scope of this package is to declare the Exchanges, the dataclasses,  eventually their versions, and some utility classes.

The Order object

We will represent Order and OrderLine as dataclasses using attrs, this is not an ORM representation but a minimal representation as a message:

import attr


@attr.s(auto_attribs=True)
class OrderLine:
    id: int
    quantity: int
    price: float


@attr.s
class Order:
    id: int = attr.ib()
    lines: Sequence[OrderLine] = attr.ib(default=list)

The event class

Now we will declare a topic exchange, this will allow us to bind it to multiple queues.

from kombu import Exchange

ORDER_EXCHANGE = Exchange('order', type='topic')

We also create a class, lets call it Event, that will help us with abstracting some of the complexity, the class will do a number of things:

  • register a number of callbacks which will be called when the message is received.
  • Use cattrs to de/serialize our dataclass
  • Create a shared task under the hood.

The class will implement the descriptor protocol so that we will be able to declare each event while building the class.

from ...


Message = TypeVar('Message')


class Event:

    def __init__(self, exchange: Exchange, routing_key: str):
        ...

    def __get__(self, instance: Message, owner: Type[Message]
    ) -> Union['Event', Task]:
        ...

    def register_callback(self, callback: Callable[[Message], Any]):
        ...

For a full implementation see the code on github.

We can now add a new line to the Order class, as you can see we are setting up a versioning:


class Order:
    ...
    # represent the submission of an order
    submit = Event(ORDER_EXCHANGE, 'order.v1.submit')
    # represent the cancellation of an order
    chargeback = Event(ORDER_EXCHANGE, 'order.v1.chargeback')
    # other events
 

An Order is submitted

Now the Order microservice will be able to create an order message and send events through it:

from event_manager.types.order import Order, OrderLine
order = Order(1, [
    OrderLine(1, 2, 3),
    OrderLine(2, 1, 4.5),
])
order.submit()

The Billing Microservice

In the billing microservice we will need to bind a queue, we will make sure that it will receive the message wathever the version, we will make sure that the message is received regardless of its version:

from kombu import Queue

from event_manager.exchanges import ORDER_EXCHANGE

QUEUES = (
Queue(f'billing_order', exchange=ORDER_EXCHANGE,
routing_key='order.*.submit'),
)

And register at least one callback:

from event_manager.types.order import Order


@Order.submit.register_callback
def billing_received(order: Order):
    print(f'billing received a task for order {order}')

You can go check my repository on github to find a complete example on how this will work.

All in all asynchronous messaging is, likely, the way to go when it comes to the communication between microservices. Unfortunately the ecosystem is still a bit lacking when talking about a framework able to painlessly help developers build and manage complex networks of microservices, on the other hand this means that it is, once again, the time for pioneering new solutions.

Licensed under: Attribution-ShareAlike 4.0 International

Do you know the difference between list() and [], if not; head to this article to read more.

Leave a Reply

Your email address will not be published. Required fields are marked *