The New Kid on the Block

Introduction

To start off, we need to understand what pub/sub messaging is. PUBLISH/SUBSCRIBE messaging is an asynchronous service that employs publishers or producers to generate messages to a topic where a subscriber or consumer reads and processes the messages from the topic. I know this sounds very complicated but, to put it simply, a producer pushes data through a topic or a queue for it to be processed by a consumer. Multiple consumers can be subscribed to a single topic, and at the same time, a single consumer can be subscribed to various topics.

So why is this beneficial at all?

Well, first things first, the fact that the producer and consumer are completely decoupled and work completely independent of each other. The producer does not know what the consumer is up to; all it does is push the messages to the topic. If the consumer is down, or something has occurred to prevent the consumer from processing the messages, the producer carries on with its duties. The same case for the consumer, as long as the topic it is subscribed to has messages to process, it processes them (except in the case of ephemeral subscriptions, which deletes it if no consumer is available). This prevents the whole system from going down when only one part of it is having issues. This also makes scaling of one of them more manageable.

Multiple systems offer this, from Apache Kafka and RabbitMQ, but for this tutorial, we’ll use the ‘new’ kid on the block, Apache Pulsar.

What is Apache Pulsar?

Yahoo initially created Apache Pulsar!, to deal with multiple of their in-house products, such as Flickr, Yahoo Finance and others. It was moved to open source in 2016 and was declared a top-level Apache Software Foundation project in 2018. Hence why I called it ‘new’, its competitors in Kafka and RabbitMQ were released in 2011 and 2007 respectively. Pulsar offers geo-replication, popular language support, real-time durable storage through Apache Bookkeeper and security among numerous benefits.

Now that we know what it is, how about we learn how to use it?

Prerequisite

For this example, we will be running the Pulsar in standalone cluster, so we require:

  1. A machine running MacOS or Linux (no Windows variant exists as of writing), with Java 8 (which can be downloaded from the Oracle website) and at least 2 GB of RAM.
  2. Python 2.7, or 3.4 and above.

In this example, I will be using a Linux machine, so all commands will adhere to that.

Step 1: Installing Apache Pulsar

Of course, before we learn how to use Apache Pulsar, we must first install it. And we need to do that by downloading the tarball using wget.

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.6.1/apache-pulsar-2.6.1-bin.tar.gz

Untar it and then enter the folder using cd.

$ tar xvfz apache-pulsar-2.6.1-bin.tar.gz
$ cd apache-pulsar-2.6.1

Because we are going to be using Pulsar in standalone mode, we can activate it via the command line using:

$ bin/pulsar standalone

You should see a flurry of logs on the command line, don’t feel too threatened, as long as you do not get an error and an exit at any point, the standalone Apache Pulsar should be working! Let’s proceed with making our first producer and consumer to test it and see it in action. Note that in standalone mode, you have to open a new terminal to proceed, seeing as the standalone mode runs on the initially set up terminal. Do that and proceed to the next step.

Step 2: Setting up a Virtual Environment

For this test, I would rather use a virtual environment so as to not interfere with the rest of the system, so I prefer to use the python library virtualenvwrapper for this.

For those who are not familiar with virtual environments, a virtual environment is a tool that is used to keep specific dependencies in a project away from other dependencies in other projects by isolating them in their own ‘environments’.

It’s very simple to install provided you have a python installation on your machine. To install it, you can use PIP, the python package installer, and input the following on the command line:

$ pip install virtualenvwrapper

And in your shell startup line (for example, bash file on linux), add the following three lines:

export WORKON_HOME=$HOME/.virtualenvs
export PROJECT_HOME=$HOME/Devel
source /usr/local/bin/virtualenvwrapper.sh

Changing the directories to where you want the virtual environments to stay and the source to where the script installed with package lies. Then reload the bash file and you should be able to access virtualenvwrapper commands on the command line. Use the mkvirtualenv command to create a new virtual environment under whatever name you wish like so:

$ mkvirtualenv pulsar-test

Where 'pulsar-test' is the name you wish to use for the virtual environment. It can be whatever you wish.

After running the command, the virtual environment will be created and upon completion, you will be within the virtual environment. Now we can start actually programming.

To start, we have to install the Pulsar python client so we can actually be able to interact with Pulsar using python. For that, we use PIP again:

$ pip install pulsar-client

Once that is done, we can write our first producer.

Writing the Producer Application

Producers, as described above, are what is used to provide the messages to a topic, which is then consumed. The data can be whatever you want it to be, as long as it can be processed by the consumer. Let us look at a simple producer that picks up data of multiple different types to see how it handles it.

import pulsar
from pulsar.schema import *


class Data(Record):
    a = Integer()
    b = String()
    c = Float()
    d = Array(String())

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer(
                    topic='data_identify',
                    send_timeout_millis=100000,
                    schema=AvroSchema(Data))


for i in range(10):
    producer.send(Data(a=i, b=str(i), c=float(i), d=[str(i), str(i+1),        str(i+2)]))

client.close()

Let's go through the piece of code line by line to understand how it works.

Schema definition

from pulsar.schema import *


class Data(Record):
    a = Integer()
    b = String()
    c = Float()
    d = Array(String())

The above is used to define schema and what values the producer expects when sending the data. If the data being sent does not adhere to this, an error is thrown.

client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
                    topic='data_identify',
                    send_timeout_millis=1000,
                    schema=AvroSchema(Data))

The client initiates a new producer and consumer, that is assigned to the port 6650. In this case, we create a producer using the create_producer function, in which we give the topic a name 'data_identify', which we will use on the consumer side to consume the topic, send_timeout_millis determines how many milliseconds a producer should wait when waiting for acknowledgement from the consumer before raising an error, in this case it is 10 seconds. Finally, the schema is where we pass the previously defined schema as an Avro data type, this is where it is double checked.

for i in range(10):
    producer.send(Data(a=i, b=str(i), c=float(i), d=[str(i), str(i+1), str(i+2)]))

This is where the producer generates 10 different pieces of data, pushes the data to the topic, using the previously defined schema. And with that, after running the code, we have sent out a 10 different messages to the consumer. Now let's create a consumer to read the data.

Writing the Consumer Application

Here is how the consumer looks.

import pulsar
from pulsar.schema import *


class Data(Record):
    a = Integer()
    b = String()
    c = Float()
    d = Array(String())

client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe(
                  topic='data_identify',
                  subscription_name='read_data',
                  schema=AvroSchema(Data))

while True:
    msg = consumer.receive()
    ex = msg.value()
    try:
        print("Received message type a='{}' b='{}' c='{}' d='{}'".format(type(ex.a), type(ex.b), type(ex.c), type(ex.d)))
        print("Received message a='{}' b='{}' c='{}' d='{}'".format(ex.a, ex.b, ex.c, ex.d))
        consumer.acknowledge(msg)
    except:
        consumer.negative_acknowledge(msg)
 client.close()

Let's break it down again.

As you can see, the schema must be defined again on the consumer so it knows what it can expect. It connects to the same client as the producer. Now let's look at the difference between them.

consumer = client.subscribe(
                  topic='data_identify',
                  subscription_name='read_data',
                  schema=AvroSchema(Data))

Here we define the topic that the consumer is subscribing to, the same as the producer. We then give the subscription a name that is 'read_data' and pass the schema expected to it.

while True:
    msg = consumer.receive()
    ex = msg.value()

Here, we make the code a daemon, that is, a piece of code that constantly runs in the background, using the 'while True'. The next line, the consumer receives the message and assigns it to msg variable, and then the values of the message is assigned to the variable ex.

    try:
        print("Received message type a='{}' b='{}' c='{}' d='{}'".format(type(ex.a), type(ex.b), type(ex.c), type(ex.d)))
        print("Received message a='{}' b='{}' c='{}' d='{}'".format(ex.a, ex.b, ex.c, ex.d))
        consumer.acknowledge(msg)
     except:
        consumer.negative_acknowledge(msg)

Now in this case, the try-except is very important, because of the acknowledgement. If at any point, something goes wrong, the negative_acknowledge function is used to tell the queue that something went wrong and that the message is returned to the queue for reprocessing. But if it was successfully processed, it uses the acknowledge function to tell the queue that everything worked out as intended and it does not get reprocessed.

The output of the two prints should look like as follows:

Received message type a='<class 'int'>' b='<class 'str'>' c='<class 'float'>' d='<class 'list'>'
Received message a='0' b='0' c='0.0' d='['0', '1', '2']'
...
Received message type a='<class 'int'>' b='<class 'str'>' c='<class 'float'>' d='<class 'list'>'
Received message a='9' b='9' c='9.0' d='['9', '10', '11']'

The results have been shortened, but all the 10 values will have been displayed that it has consumed from the topic. If you want, you can add the range in the producer to whatever number you'd want and see how easily it handles it.

And with that, we have written our first producer and consumer.

Conclusion

This was just a brief introduction to pub/sub and how it works, and I hope it gave you a taste of what Apache Pulsar and Python are capable of, as brief as it was. With it being able to process millions of messages per second across millions of topics, Apache Pulsar is a really powerful tool that can aid any business to achieve their true data processing potential.

All this wonderful code can be found here

You've successfully subscribed to Decoded For Devs
Welcome back! You've successfully signed in.
Great! You've successfully signed up.
Your link has expired
Success! Your account is fully activated, you now have access to all content.