Understanding Inter-Process Communications in C++ using ZeroMQ
Do you want to start a new project with several threads and processes that communicate with each other and are confused how to implement it?
Or maybe you want to refactor an old buggy software that involves the inter-thread/process communication (IPC)?
This article should give you a fast and deep understanding about the IPC. But first let we refresh our knowledge.
Refreshing the basic knowledge
Inter-process communication between C++ applications can be implemented on Linux in a number of ways:
- Shared memory: Shared memory enables several processes to access a single memory location. We can use the "shm open" function to construct a shared memory object and "mmap" to map the shared memory into your process's address space in order to access shared memory.
- Signals: We can use signals to let processes communicate with each other to send straightforward notifications. By employing the kill function, signals can be delivered to stop a process from working. A signal handler function can be used by the receiving process to handle the signal.
- Pipes: By writing to and reading from a pipe, we may use pipes to let processes communicate with one another. The write and read functions can be used by processes to send and receive data through pipes that have been built using the pipe function.
- Unix domain sockets: These sockets can be used to enable process communication over a network. Similar to TCP/IP sockets, Unix domain sockets communicate through the file system as opposed to a network. To communicate over the socket, we can utilize the bind, listen, accept, transmit, and recv functions as well as the socket function to build a Unix domain socket.
- Message queues: These can be used to create a queue for processes to exchange messages in. The msgget function is used to build message queues, and the msgsnd and msgrcv methods are used to send and receive messages.
- Message passing: This type of communication involves processes exchanging messages with one another over a system-defined message queue. Being able to transmit and receive data asynchronously and at various rates makes message passing a versatile and effective kind of communication.
A developer with little practical expertise with IPC may find the aforementioned solutions to be intimidating. So let me list the finest scenarios and specifications that fit the communication channels:
- Shared memory: Shared memory works well for transferring huge amounts of data between processes while allowing for concurrent access to the data by the processes. Since there are no system calls or context changes involved, shared memory makes it possible to designate a shared memory space that may be accessed by numerous processes. Shared memory is not appropriate for communication between processes that are executing on different computers since it necessitates careful synchronization to avoid race situations and other issues. In my opinion, it serves more as a mechanism of information transmission than actual direct contact.
- Signals: Signals are a quick and easy way for processes to communicate with one another. They enable one process to notify another process, preventing the receiving process's operation from continuing. Instead of transferring data, signals are frequently used to deliver brief notifications or to stop a process in its tracks. Large data transfers or communication between processes that are executing on different machines are not appropriate for them.
- Pipes: Pipes are a simple and easy-to-use method of communication between processes that are related to each other. Pipes allow you to send data from one process to another by writing to one end of the pipe and reading from the other end. Pipes are typically used for communication between parent and child processes, or between processes that are connected in a pipeline. Pipes have a limited buffer size, so they are not suitable for transferring large amounts of data.
- Unix domain sockets: Unix domain sockets are a powerful and flexible method of communication between processes that are running on the same machine. They provide a network-like interface for communication, allowing you to use the same socket functions as you would for TCP/IP sockets. Unix domain sockets are suitable for communication between processes that are running on the same machine, and they can be used to communicate between processes that are running on different machines if they are connected through a network.
- Message queues: Message queues are a method of communication that allows processes to exchange messages in a queue. Message queues are suitable for communication between processes that need to send and receive small amounts of data asynchronously. Message queues provide a buffer for storing messages, allowing the sender and receiver to operate at different speeds.
- Message passing: Message passing is a more complicated form of communication than other methods, necessitating the implementation of system calls and likely IPC approaches, such as semaphores or mutexes, to organize access to the message queue. It is usually utilized in systems that necessitate a strong level of concurrency and parallelism, for instance real-time systems and multi-core systems.
Normally, a communication between processes/threads (App A and App B) happens through these steps:
- App A sends a message.
- App B receive a message or get notified.
- App B processes the incoming message and execute an action or an appropriate logic.
- App B can send an acknowledge message.
In order to guarantee such a communication protocoll we can use the hand-shake protocoll. However, mostly we will utilize the message queue with a publish subscribe model that allows the scalability in a distributed system. For a mature product, I wouldn't recommend to use shared memory because of the many potential bugs produced by the undetected race conditions.
Libraries and Frameworks in C++
When constructing a C++ program to operate on QNX, making use of message passing is beneficial as it can be hard to replicate on other operating systems. Message passing is advantageous in some cases, but it might not be suitable due to its complexity and the overhead of system calls and IPC procedures.
For the publish-subscribe model, there are a variety of communication libraries available in C++. Deciding which library to use depends on the project's specific needs and limits. Here are some options to think about:
- Boost.Asio1: is a C++ library which is both cross-platform and simple to employ, and is centered around network and low-level I/O programming. It supports publish-subscribe communication through the usage of TCP/IP sockets and it is highly acclaimed, having been used for a diversity of networking operations. Documented thoroughly, it is a great choice for any networking needs.
- ZeroMQ2: ZeroMQ is a user-friendly and capable messaging library that is optimized for swiftness. It offers a collection of abstractions for message queue, covering publish-subscribe communication. Its proficiency makes it the ideal choice for many applications, from distributed systems to high-frequency trading.
- Google Protocol Buffers3: "Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams."
There is also another implementation of IPC which is very stable and wide-spread in the ROS (Robot Operating System) framework. The IPC is called as topics. Unfortunately, we can't just use and import the ROS topics to a C++ application, since it parts of the ROS framework. ROS topics4 are designed to be reliable and flexible, but they can be slower than other communication mechanisms due to the overhead of the ROS framework.
ROS topics vs ZeroMQ
When determining which technology to use for publish-subscribe communication between processes, ROS topics and ZeroMQ are both potential options. But they are created with different goals in mind and each have their own advantages and disadvantages. Here are a few factors to take into consideration:
Complexity: ROS topics are part of a large framework for building robotics applications. Thus, they comes with a lot of features and functionality, but they also have a learning curve and more complex to use. ZeroMQ, on the other hand, is a relatively simple and lightweight messaging library.
Performance: ROS topics are designed to be reliable and flexible, but they can be slower than other communication mechanisms due to the overhead of the ROS framework. ZeroMQ is designed to be fast and efficient, and it is often used in high-performance applications such as distributed systems and high-frequency trading.
Data serialization: ROS provides its own serialization format called ROS message serialization (ROSmsg). ZeroMQ does not provide a built-in serialization format, but it can be used in conjunction with a serialization library such as Google Protocol Buffers (Protobuf) to provide data serialization.
Platform support: ROS is primarily designed for Linux, but also supports other operating systems such as Windows and macOS. ZeroMQ is cross-platform and supports a wide range of operating systems.
Overall ZeroMQ library provides a wide support for real world cases including multithreading. We don’t need to implement locks, mutexes, or any other form of inter-thread communication except messages sent across ZeroMQ sockets. That can save us a lot of bug due to the race conditions. Its documentation describes a lot of communication patterns to be implemented based on our requirements.
Example Codes
Enough reading about theory! Let's start to get hand typing on the keyboard. Based on the above considerations, the ZeroMQ is the best choice for me. Furthermore, I can combine the library with the Google Protocol Buffer to deal with more complex data serialization. I would like to share how to implement a simple IPC using the both cross-platform libraries.
Requirements
sudo apt install libzmq3-dev libprotobuf-dev
Defining Protocol Format
We can define our messages in a *.proto file. All the message can be defined in one *.proto file. Protocol buffer supports also enumeration. For this example I save the format definition in message.proto.
/* Info represents a request message, with date, id, message text and message type */
syntax = "proto3";
message Info {
string date = 1;
int32 id = 2;
repeated Message messages = 3;
enum MessageType {
NONE = 0;
DEBUG = 1;
WARNING = 2;
ERROR = 3;
}
message Message {
string text = 1;
MessageType type = 2;
}
}
Run the protoc compiler:
protoc --cpp_out=$DST_DIR message.proto
Then, the compiler will generate two files message.pb.h and message.pb.cc to an output folder named generated. The header file will be included in our application code.
Example codes
App A that sends messages as the publisher.
#include "zmq.hpp"
#include "generated/message.pb.h"
#include <iostream>
// We wait for 10 subscribers
#define SUBSCRIBERS_EXPECTED 1
int main()
{
zmq::context_t ctx;
zmq::context_t context(1);
// Socket to talk to clients
zmq::socket_t publisher(context, zmq::socket_type::pub);
int sndhwm = 0;
publisher.set(zmq::sockopt::sndhwm, sndhwm);
publisher.bind("tcp://*:5561");
// Socket to receive signals
zmq::socket_t syncservice(context, zmq::socket_type::rep);
syncservice.bind("tcp://*:5562");
// Get synchronization from subscribers
int subscribers = 0;
zmq::message_t sync_zmsg;
while (subscribers < SUBSCRIBERS_EXPECTED)
{
std::cout << "Wait for synchronization request\n";
(void)syncservice.recv(sync_zmsg);
std::cout << "Send synchronization reply\n";
(void)syncservice.send(sync_zmsg, zmq::send_flags::none);
subscribers++;
}
// Create a Protobuf message and serialize it
// Now broadcast exactly 1M updates followed by END
std::string serialized;
Info::Message *info_msg;
for (auto i = 0; i < 10; i++)
{
Info info;
info.set_date("10.10.2023");
info.set_id(i);
info_msg = info.add_messages();
info_msg->set_text("Hello, ZMQ & Protobuf!");
info_msg->set_type(Info_MessageType::Info_MessageType_DEBUG);
info.SerializeToString(&serialized);
zmq::message_t zmsg(serialized.size());
memcpy(zmsg.data(), serialized.data(), serialized.size());
(void)publisher.send(zmsg, zmq::send_flags::none);
}
publisher.send(zmq::str_buffer("END"), zmq::send_flags::none);
std::cout.flush();
return 0;
}
This code creates a ZeroMQ context and two sockets, a publisher socket and a reply socket. They bind the socket to two different endpoints. In ZeroMQ the "socket" (PUB, SUB, PUSH, PULL, etc) is a software "object". A zeromq socket is abstracted from the transports. For example, a single zeromq "socket" may both bind and connect and do so to multiple transports. So, we must think of a zeromq "socket" as being something conceptually "above" the low-level transport mechanism.
Afterwards, it adds a Protobuf message, serializes it using the SerializeToString method, and sends it over the socket using the send method.
Next, we write an App B that receives those messages as the subscriber.
#include "zmq.hpp"
#include "generated/message.pb.h"
#include <iostream>
int main(int argc, char *argv[])
{
zmq::context_t context(1);
// First, connect our subscriber socket
zmq::socket_t subscriber(context, zmq::socket_type::sub);
subscriber.connect("tcp://localhost:5561");
subscriber.set(zmq::sockopt::subscribe, "");
// Second, synchronize with publisher
zmq::socket_t syncclient(context, zmq::socket_type::req);
syncclient.connect("tcp://localhost:5562");
// - send a synchronization request
syncclient.send(zmq::str_buffer(""), zmq::send_flags::none);
// - wait for synchronization reply
zmq::message_t sync_zmsg;
(void)syncclient.recv(sync_zmsg);
// Third, get our updates and report how many we got
int update_nbr = 0;
zmq::message_t zmsg;
while (1)
{
(void)subscriber.recv(zmsg, zmq::recv_flags::none);
std::string serialized(static_cast<char *>(zmsg.data()), zmsg.size());
Info info;
info.ParseFromString(serialized);
if (serialized.compare("END") == 0)
{
break;
}
std::cout << "Receiving id: " << info.id() << " date: " << info.date();
for (auto info_msg : info.messages())
{
std::cout << " Type: " << info_msg.type() << " Message: " << info_msg.text() << "\n";
}
update_nbr++;
}
std::cout << "Received " << update_nbr << " updates" << std::endl;
return 0;
}
This code creates a ZeroMQ context and a SUB socket, and connects the socket to the endpoint. It then subscribes to the topic and waits for a message. When a message is received, it is deserialized using the ParseFromString()
method and printed to the console. Since the Message
is defined as a nested message in the proto file, we can acquire the message using the for-loop. We can build the both code with this compiler commands:
$ g++ pub.cpp generated/message.pb.cc -o pub -lzmq -lprotobuf -Wall
$ g++ sub.cpp generated/message.pb.cc -o sub -lzmq -lprotobuf -Wall
Finally, we can run the executables in two different terminals. First, we run the publisher ./pub
, then run the subscriber ./sub
on another console. Below are the results:
$ ./pub
Wait for synchronization request
Send synchronization reply
$ ./sub
Receiving id: 0 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 1 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 2 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 3 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 4 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 5 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 6 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 7 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 8 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Receiving id: 9 date: 10.10.2023 Type: 1 Message: Hello, ZMQ & Protobuf!
Received 10 updates
I hope this article can help you to understand more deeply the IPC for your project. Please check my repository for more details of the above example. If you have a question or a better approach, just send me a message on LinkedIn or write a Gitlab issue.
-
Boost.Asio https://theboostcpplibraries.com/boost.asio
↩ -
ZeroMQ https://zeromq.org/
↩ -
Google Protocol Buffers https://developers.google.com/protocol-buffers/docs/cpptutorial
↩ -
ROS Topics https://wiki.ros.org/ROS/Tutorials/UnderstandingTopics
↩