SQS— Introduction to FIFO Queues


Simple Queue Service offers an easy interface to make use of message queueing — where you can store messages to be later processed by your logic, commonly used with microservices or distributed systems (a system that is spread across multiple nodes/computers).

What’s a first-in-first-out queue?
A first-in-first-out queue is somewhat equivalent to a queue at a shop — the first message that makes it to the queue is the first message that is pushed to the consumer, as shown in the example below.


The most important attribute that we’ll focus on in this article is the MessageGroupId required attribute, the attribute is the backbone of how fifo queues handle ordering in AWS.

It’s used to communicate to the queue on which ‘partition’ you’d like to enqueue the message, as shown below:


The order of messages is maintained within every message group (partition), not across multiple message groups — meaning that if you have multiple users carrying out actions, ideally you want the message group to be something on the lines of user_<user_id> so actions from a particular user are grouped and processed in the order they happen.

How can I have multiple consumers reading from the same queue?
In the example below, we’ll go over how AWS handles maintaining the order of messages whilst having multiple consumers reading from the same queue.


Taking a look at the diagram above, we’re seeing:

  • Groups of customers — equivalent to messages grouped by a MessageGroupId (Group 1, Group 2, Group 3)
  • Shop — equivalent to a fifo queue
  • Multiple employees — equivalent to multiple consumers reading from the same queue (commonly referred to as competing consumers)

Scenario 1
We only have messages in Group 1
The first message from Group 1 is picked up by one of the consumers and that message group is locked (other messages can’t be sent out to the consumers) until that first message is acknowledged.

Scenario 2
We have messages in all message groups
The consumers will pick messages from any message group but the order within every message group is maintained through the locking mechanism described in Scenario 1; this is where one needs to pay close attention as to how the messages are grouped in order to promote interleaving.

Scenario 3
We have an issue with processing a message from Group 1
The unacknowledged message blocks the entire message group, until the message is handled either through the visibility timeout expiring and the message re-sent, or the max retries is reached and the message is sent to the dead-letter queue.

How can I promote interleaving?
It all depends on the data model; although if we take a simple example of a data structure:


Making the assumption that we care about the order of events on the vehicles we can take two routes:

Grouping by dealer group
This would mean that a dealer group can only have one consumer at a time — since a message group locks to maintain order as explained in Scenario 1.

This would result in a backlog of events and poor performance.

Grouping by dealer
This would mean that every dealer can have its own consumer, which would lead to better performance.

One can try and go lower in the data structure to gain better performance — but in a nutshell, the less contagious your data is, the more likely you are to have a great outcome in terms of performance (better processing)


Configuration Overview


What’s visibility timeout used for?
The amount of time you want SQS to wait before re-sending the same (unacknowledged) message.

I would recommend that you profile (calculate) how long it takes for your logic to process a single message, and add reasonable padding — which would guarantee that SQS won’t send out the same message whilst you’re still processing it.

A more robust solution would be to have a ‘heartbeat’ — where you extend the visibility timeout of a message whilst processing. (Examples: Python / JavaScript)

What’s the delivery delay setting used for?
The amount of time you want SQS to wait before making a new message available.

A delay of five seconds would mean that once you add a message to a queue, that particular message cannot be retrieved by any of your consumers until that delay has expired.

What’s the receive message wait time used for?

  • Receive message wait time is set to 0
    A request is sent to the servers and a query is executed; a response is returned to the client (with or without results) — referred to as short polling .
  • Receive message wait time is set to larger than 0
    A request is sent to the servers and the server looks for results for the specified amount of time, once the time expires the results (if any) are returned — referred to as long polling.

What’s the message retention period used for?
The amount of time you want SQS to retain messages for — any messages older than the time specified will be deleted.

What’s a dead-letter queue?


A dead-letter queue refers to a queue that is used to store messages that are not acknowledged or processed successfully.

How are messages acknowledged?
A received message is not automatically acknowledged in SQS — one has to explicitly delete the message (or move it to another queue — such as a dead-letter queue) for it to be acknowledged and not re-sent once the visibility timeout expires.

Celery: Advanced Routing Techniques

Nowadays the microservice architecture is considered the most scalable approach for a wide range of business problems, in particular because it promises fast and lean development cycles.

The best case scenario for microservices is when the data entities that define our applications are completely decoupled. Unfortunately that is rarely the case, and managing the communication between microservices is far from the easiest task a team may encounter.

In the most simple use case, we can use plain HTTPS requests to send and receive messages from and to other services.

Unfortunately this methodology does in fact tend to couple the microservices and depending on scale, could deteriorate the performance of the application.

Use case: A simple ecommerce

As a case study we’ll draft out the architecture of a simple ecommerce, we start with these three microservices:

Order – Manages the orders and its lines (e.g. in review, dispatched).
Logistic – Manages the moving about of the items.
Billing – Manages the company general ledger.

When a customers fills his basket with whichever item he wants and completes the payment procedure, we’ll be generating an order.

The Order microservice may need to send the information to another microservice(s), for example to the Billing and the Logistic microservices.

In the HTTPS scenario, the Order microservice needs to know of the existence of those services, namely Billing and Logistics, and of their API structure. This poses the following problems:
If a third microservice needs to be added to the loop, the code of Order needs to be altered directly and API changes may need to cascade to other microservices.

Additionally we may have long chains of HTTP requests and an API gateway that needs to manage both internal and client generated traffic. This could slow down our application significantly.

Another way to manage the communication between microservices is by using asynchronous messaging; one of the benefits of using async is that it allows a microservice to extend the logic whilst not requiring any alterations in the producers’ source code, thereby following the open-closed principle.

Unfortunately using asynchronous messaging at scale may be quite the challenge on its own, and the python asynchronous ecosystem is unfortunately, still pretty immature leaving developers with little to no reference.

In this article I will present an example implemented using Celery, attrs, and cattrs, which tries to be as exhaustive as possible.

Asynchronous messaging using Celery

Albeit we can choose among various libraries like pika, I will implement it using Celery and Kombu.

In particular we will create specific Topic exchanges that will be named after our microservices, and let each interested microservice subscribe to the various events using routing_keys.

We will also define our events using attrs, it has all the features of python dataclasses plus some other candy, like validation and a wider compatibility, which includes python 2.7 and python 3.4+.

The event_manager common package

Now we will create a library that will be common among our microservices, we will call it event_manager, the scope of this package is to declare the Exchanges, the dataclasses,  eventually their versions, and some utility classes.

The Order object

We will represent Order and OrderLine as dataclasses using attrs, this is not an ORM representation but a minimal representation as a message:

import attr


@attr.s(auto_attribs=True)
class OrderLine:
    id: int
    quantity: int
    price: float


@attr.s
class Order:
    id: int = attr.ib()
    lines: Sequence[OrderLine] = attr.ib(default=list)

The event class

Now we will declare a topic exchange, this will allow us to bind it to multiple queues.

from kombu import Exchange

ORDER_EXCHANGE = Exchange('order', type='topic')

We also create a class, lets call it Event, that will help us with abstracting some of the complexity, the class will do a number of things:

  • register a number of callbacks which will be called when the message is received.
  • Use cattrs to de/serialize our dataclass
  • Create a shared task under the hood.

The class will implement the descriptor protocol so that we will be able to declare each event while building the class.

from ...


Message = TypeVar('Message')


class Event:

    def __init__(self, exchange: Exchange, routing_key: str):
        ...

    def __get__(self, instance: Message, owner: Type[Message]
    ) -> Union['Event', Task]:
        ...

    def register_callback(self, callback: Callable[[Message], Any]):
        ...

For a full implementation see the code on github.

We can now add a new line to the Order class, as you can see we are setting up a versioning:


class Order:
    ...
    # represent the submission of an order
    submit = Event(ORDER_EXCHANGE, 'order.v1.submit')
    # represent the cancellation of an order
    chargeback = Event(ORDER_EXCHANGE, 'order.v1.chargeback')
    # other events
 

An Order is submitted

Now the Order microservice will be able to create an order message and send events through it:

from event_manager.types.order import Order, OrderLine
order = Order(1, [
    OrderLine(1, 2, 3),
    OrderLine(2, 1, 4.5),
])
order.submit()

The Billing Microservice

In the billing microservice we will need to bind a queue, we will make sure that it will receive the message wathever the version, we will make sure that the message is received regardless of its version:

from kombu import Queue

from event_manager.exchanges import ORDER_EXCHANGE

QUEUES = (
Queue(f'billing_order', exchange=ORDER_EXCHANGE,
routing_key='order.*.submit'),
)

And register at least one callback:

from event_manager.types.order import Order


@Order.submit.register_callback
def billing_received(order: Order):
    print(f'billing received a task for order {order}')

You can go check my repository on github to find a complete example on how this will work.

All in all asynchronous messaging is, likely, the way to go when it comes to the communication between microservices. Unfortunately the ecosystem is still a bit lacking when talking about a framework able to painlessly help developers build and manage complex networks of microservices, on the other hand this means that it is, once again, the time for pioneering new solutions.

Licensed under: Attribution-ShareAlike 4.0 International

Do you know the difference between list() and [], if not; head to this article to read more.

What’s the difference between list() and []

What are the key differences between using list() and []?

The most obvious and visible key difference between list() and [] is the syntax. Putting the syntax aside for a minute here, someone whose new or intermediately exposed to python might argue that they’re both lists or derive from the same class; that is true. Which furthermore increases the importance of understanding the key differences of both, most of which are outlined below.

list() is a function and [] is literal syntax.

Literal syntax
Literal Syntax – src:excess.org
Function
Function – src:excess.org

Let’s take a look at what happens when we call list() and [] respectively through the disassembler.

>>> import dis
>>> print(dis.dis(lambda: list()))
  1           0 LOAD_GLOBAL              0 (list)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 RETURN_VALUE
None
>>> print(dis.dis(lambda: []))
  1           0 BUILD_LIST               0
              3 RETURN_VALUE
None
The output from the disassembler above shows that the literal syntax version doesn’t require a global lookup, denoted by the op code LOAD_GLOBAL or a function call, denoted by the op code CALL_FUNCTION.

As a result, literal syntax is faster than it’s counterpart. – Let’s take a second and look at the timings below.

import timeit
>>> timeit.timeit('[]', number=10**4)
0.0014592369552701712
>>> timeit.timeit('list()', number=10**4)
0.0033833282068371773
On another note it’s equally important and worth pointing out that literal syntax, [] does not unpack values. An example of unpacking is shown below.

>>> list('abc') # unpacks value
['a', 'b', 'c']
>>> ['abc'] # value remains packed
['abc']

What’s a literal in python?

Literals are notations or a way of writing constant or raw variable values which python recognises as built-in types.


It has been fun and interesting to write the first of many to come PythonRight blog posts; in the next blog post we’ll be going over the beauty of unpacking , so stay tuned. 😉 – If you have any feedback or any other topics that you’d like to see explained in detail, do feel free to comment.