From the Chinese version, translated using Google Translate
We need the following tool codes:
I often use this log component, just see the code. HareMQ/mqcommon/logger.hpp
.
Create a helper.hpp
in mqcommon
, and copy the content of db.hpp
implemented in the demo.
For detailed explanation, please refer to the demo documentation: docs/sqlite.md
In fact, it is string cutting. I have written it many times before. Just use the one in boost
.
class string_helper {
static size_t split(const std::string& str, const std::string& sep, std::vector<std::string>* out, bool if_compress = true) {
// boost split
if (if_compress) {
boost::split(*out, str, boost::is_any_of(sep), boost::token_compress_on);
return out->size();
} else {
boost::split(*out, str, boost::is_any_of(sep), boost::token_compress_off);
return out->size();
}
}
};
[!TIP] UUID (Universally Unique Identifier), also known as Universally Unique Identifier, is usually composed of 32 hexadecimal characters. The standard form of UUID contains 32 hexadecimal characters, divided into five segments by hyphens, and the form is 8-4-4-4-12 32 characters, such as:
550e8400-e29b-41d4-a716-446655440000
. Here, we generate uuid by generating 8 random numbers, plus an 8-byte sequence number, a total of 16 bytes to generate a combination of 32 hexadecimal characters to ensure global uniqueness and to distinguish data based on the sequence number.
class uuid_helper {
public:
std::string uuid() {
std::random_device rd;
std::mt19937_64 generator(rd());
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; ++i) {
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);
if (i == 3 || i == 5 || i == 7)
ss << "-";
static std::atomic<size_t> seq(1); // 这里一定要静态,保证多次调用都是自增的
size_t num = seq.fetch_add(1);
for (int i = 7; i >= 0; i--) {
ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> (i * 8)) & 0xff);
if (i == 6)
ss << "-";
}
}
return ss.str();
}
};
Basic framework:
class file_helper {
private:
std::string __file_name;
public:
file_helper(const std::string& file_name)
: __file_name(file_name) { }
~file_helper() = default;
public:
bool exists();
size_t size();
bool read(std::string& body);
bool read(std::string& body, size_t offset, size_t len);
bool write(const std::string& body);
bool write(const std::string& body, size_t offset);
bool create();
bool remove();
bool create_dir();
bool remove_dir();
static std::string parent_dir(const std::string& file_name);
bool rename(const std::string& name);
};
mptest/file_test.cc
First test two simple functions:
void test1() {
hare_mq::file_helper helper("../mqcommon/logger.hpp");
hare_mq::LOG(DEBUG) << "file if exists: " << helper.exists() << std::endl;
hare_mq::LOG(DEBUG) << "file size: " << helper.size() << std::endl;
}
Test directory creation and file creation:
void test2() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
if (helper.exists() == false) {
std::string p = hare_mq::file_helper::parent_dir(helper.path()); // Get the parent directory first
hare_mq::LOG(DEBUG) << p << std::endl;
if (hare_mq::file_helper(p).exists() == false) {
// Create a Directory
hare_mq::file_helper::create_dir(p);
}
hare_mq::file_helper::create(helper.path());
}
}
Test global read and write:
void test3() {
hare_mq::file_helper helper1("../mqcommon/logger.hpp");
hare_mq::file_helper helper2("./aaa/bbb/ccc/tmp.hpp");
std::string body;
helper1.read(body);
// write to tmp.hpp
helper2.write(body);
}
Read the data in logger.hpp
and write it to tmp.hpp
.
Test the reading and writing of a specific location:
void test4() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
// Read out bytes 6-19
char str[16] = {0};
helper.read(str, 6, 13); // Here we need to read the characters (6, 19], so we should pass in 6, 13
hare_mq::LOG(DEBUG) << std::string(str) << std::endl;
helper.write("123456\n", 19, 7);
}
Passed the test.
Test rename
:
void test5() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
helper.rename(hare_mq::file_helper::parent_dir(helper.path()) + "/test.hpp");
}
As expected.
Test deletion:
void test6() {
hare_mq::LOG(DEBUG) << "before run" << std::endl;
system("tree .");
hare_mq::file_helper::create("./aaa/bbb/ccc/tmp.hpp");
hare_mq::LOG(DEBUG) << "run: create(\"./aaa/bbb/ccc/tmp.hpp\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove("./aaa/bbb/ccc/tmp.hpp");
hare_mq::LOG(DEBUG) << "run: remove(\"./aaa/bbb/ccc/tmp.hpp\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove_dir("./aaa/bbb/ccc/");
hare_mq::LOG(DEBUG) << "run: remove_dir(\"./aaa/bbb/ccc/\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove_dir("./aaa");
hare_mq::LOG(DEBUG) << "run: remove_dir(\"./aaa\");" << std::endl;
system("tree .");
}
Therefore, defining a message type is actually defining a proto file of the message type and generating related code.
Message structure:
Elements of the message itself:
Message attributes: Message attributes include these contents. Message ID, message delivery mode: non-persistent/persistent mode, message routing_key
Message payload content
Elements required for additional storage of the message
Elements required for additional storage of the message
Storage length of the message
Length of the message
Whether the message is valid: Note that the bool type is not used here, but the character 0/1 is used, because the bool type occupies different lengths when persisted, which will cause the message length to change after modifying the valid bit of the message in the file, so the bool type is not used.
Define the proto file.
syntax = "proto3";
package hare_mq;
enum ExchangeType {
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
enum DeliveryMode {
UNKNOWTYPE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
};
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
string valid = 4;
};
Now we need to start writing exchagne,.hpp
in mqserver
.
The basic structure of the code is as follows:
namespace hare_mq {
/**
* 1. exchange class
* 2. exchange data persistence management class
* 3. exchange data memory management class
*/
struct exchange {
/* exchange class */
public:
using ptr = std::shared_ptr<exchange>;
std::string name;
ExchangeType type;
bool durable;
bool auto_delete;
std::unordered_map<std::string, std::string> args;
public:
exchange(const std::string ename,
ExchangeType etype,
bool edurable,
bool eauto_delete,
const std::unordered_map<std::string, std::string>& eargs)
: name(ename)
, type(etype)
, auto_delete(eauto_delete)
, args(eargs) { }
// The format of args storage is key-value pair. When storing in the database, a string will be organized for storage key=value&key=value
void set_args(const std::string& str_args) {
/**
* Parse str_args string: key=value&key=value... and store it in args member variable
*/
}
std::string get_args() {
/**
* The reverse operation of set_args(), serializing the data in args into the format of key=value&key=value...
*/
}
};
class exchange_mapper {
/* exchange_mapper */
private:
sqlite_helper __sql_helper;
public:
exchange_mapper(const std::string& dbfile);
public:
void create_table();
void remove_table();
void insert(exchange::ptr& e);
void remove(const std::string& name);
exchange::ptr one(const std::string& name);
std::unordered_map<std::string, exchange::ptr> all();
};
class exchange_manager {
/* exchange_manager */
private:
exchange_mapper __mapper;
std::unordered_map<std::string, exchange::ptr> __exchanges;
std::mutex __mtx;
public:
exchange_manager(const std::string& dbfile);
void declare_exchange(const std::string& name,
ExchangeType type,
bool durable,
bool auto_delete,
std::unordered_map<std::string, std::string>& args);
void delete_exchange(const std::string& name);
exchange::ptr select_exchange(const std::string& name);
bool exists(const std::string& name);
void clear_exchange();
};
} // namespace hare_mq
The specific code can be seen in the code.
I found a lot of bugs in the unit test part and changed some places. See the code for details. HareMQ/mqtest/exchange_test.cc
The basic structure of this part is as follows.
Define the queue description data class:
Queue name
Persistence flag
Define the queue data persistence class (data persistence in sqlite3 database)
Create/delete queue data table
Add queue data
Remove queue data
Query all queue data
Define queue data management class:
Create a queue and add management (do not create if it exists)
Delete the queue
Get the specified queue
Determine all queues
Determine whether the specified queue exists
Get the number of queues
Destroy all queue data
The specific code is basically the same as exchange.hpp
, so it will not be repeated here. For details, see the code.
This part is basically the same as the switch part.
See the code for details. HareMQ/mqtest/queue_test.cc
Essentially, it is a description of which queues are associated with a switch.
Define binding information class:
binding_key
(distribution matching rules - determines which data can be put into the queue by the switch)Define binding information data persistence class:
Define binding information data management class:
Similarly, the third class is external, which is actually the same as the previous one.
Some implementation tips:
using msg_queue_binding_map = std::unordered_map<std::string, binding::ptr>;
using binding_map = std::unordered_map<std::string, msg_queue_binding_map>;
Why is it designed this way?
**Because a switch can have multiple binding information, but one binding information must only correspond to one queue. **
**So let the queue and binding information first construct a one-to-one correspondence map. **
The rest of the basic things are basically the same as what was written before. Please note here:
static int select_callback(void* arg, int numcol, char** row, char** fields) {
binding_map* result = (binding_map*)arg;
binding::ptr bp = std::make_shared<binding>(row[0], row[1], row[2]);
// In order to prevent the binding information from already existing, the queue mapping cannot be created directly. If it is added directly, the historical data will be overwritten.
// Therefore, you must first obtain the mapping object corresponding to the switch and add data to it.
// However, if there is no mapping information corresponding to the switch at this time, the acquisition here must use a reference (it will be guaranteed to be automatically created if it does not exist)
msg_queue_binding_map& qmap = (*result)[bp->exchange_name];
qmap.insert({ bp->msg_queue_name, bp });
return 0;
}
Pay attention to the type, not direct insertion. Just pay attention to these two types.
using msg_queue_binding_map = std::unordered_map<std::string, binding::ptr>;
using binding_map = std::unordered_map<std::string, msg_queue_binding_map>;
Whether the binding information needs to be persisted depends on: switch persistence + queue persistence, and only the binding information needs to be persisted.
bool bind(const std::string& ename, const std::string& qname, const std::string& key, bool durable)
However, in order to decouple these hpps, bool durable
is directly passed here, so that the external, joint call can directly tell me whether persistence is needed, instead of getting the data of the switch and queue in this file, which is coupled, which is not particularly good.
The test is basically the same as the previous one, so it will not be repeated. For details, see the code. HareMQ/mqtest/binding_test.cc
.
The previous chapters are basically preparations, and starting from message management is the core.
What are the elements of the message? This protobuf file has defined them all, and now I will explain them again.
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
string valid = 3;
};
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
};
The message can be divided into two parts. The first part is the data message itself, which is the core data of network transmission, which is the Payload
above. The second part is the additional elements required for message management on the server, the most important of which is persistence management.
What are the additional elements required for message management on the server:
valid
field, modify it to put it in Payload
, because the contents of Payload
need to be stored persistently.Idea: Use queues as units for message persistence management.
When the message file is garbage collected, all valid messages need to be loaded and a new data file needs to be regenerated. However, after the new data file is generated, the storage location of the message changes, and the data in the memory needs to be updated.
At this time, all queue data needs to be locked and then updated, so the lock conflicts are too frequent and the efficiency is low.
Therefore, if each queue has its own independent data file, only the queue data of the operation needs to be locked each time.
Here we should pay attention to:
[!NOTE] The persistent management of messages does not use the database. We use files directly because we do not use it for query, but for backup, and the length of the message is often uncertain, so it is better to choose to use files.
If you want to store it in a file, you must standardize the format.
Our format is: [4-byte length|data][4-byte length|data][4-byte length|data][4-byte length|data]...
This format is used for storage.
This method can solve the problem of sticking packages.
Creation and deletion of message files
Persistence of added messages/persistence of deleted messages (not really deleted, just set the flag to invalid)
Historical data recovery/garbage collection
When garbage collection is needed: Because each deletion of data is actually set to invalid, not really deleted, so the data in the file will increase, but not every deletion needs to be recycled. When there are more than 2,000 messages in the file and the valid ratio is less than 50%, it will be recycled.
Recycling ideas:
Load all valid messages in the file, then delete the original file, and then generate a new file to write the data. (This method has risks. What if the writing fails?)
Load all valid messages in the file, write them to a temporary file first, then delete the original file, and finally change the name of the temporary file to the corresponding name.
The framework is as follows.
namespace hare_mq {
#define DATAFILE_SUBFIX ".mqd"
#define TMPFILE_SUBFIX ".mqd.tmp" // Define the file name suffix for persistent files and temporary files
using message_ptr = std::shared_ptr<Message>;
class message_mapper {
private:
std::string __queue_name;
std::string __data_file;
std::string __tmp_file;
public:
message_mapper(const std::string& base_dir, const std::string& qname);
void create_msg_file();
void remove_msg_file();
void insert(message_ptr& msg);
void remove(const message_ptr& msg);
std::list<message_ptr> gc(); // Garbage Collection
};
} // namespace hare_mq
具体实现可以见代码。
For specific implementation, see the code.
Corresponding to persistent management (messages in disk), the management in memory now also needs to be managed in queues.
So two layers of encapsulation are needed. Each queue needs a class to manage it, and then finally to manage all queues externally, so another layer of encapsulation is needed.
So inner layer: queue message management
Operation:
When constructing an object: create/open the queue data file, restore the queue historical message data
Add a message/confirm a message (delete): After each ack (remove), check the garbage collection. When the total amount of persistent data exceeds 2000 and the effective ratio is less than 50%, perform garbage collection
Get the head message
Delete all messages in the queue
Get the number of messages to be pushed
Get the number of messages to be confirmed
Get the number of persistent messages
Fields:
Queue name
Persistent management handle
Message list to be pushed: head insertion and tail deletion method
A hash table of persistent messages: message data needs to be updated after garbage collection (actual storage location)
The basic structure is as follows:
/* queue_message */
class queue_message {
private:
std::string __queue_name; // Queue Name
size_t __valid_count; // Number of valid messages
size_t __total_count; // Total number of messages
message_mapper __mapper; // Persistent handles
std::list<message_ptr> __msgs; // Messages to be pushed
std::unordered_map<std::string, message_ptr> __durable_msgs; // Messages to be persisted
std::unordered_map<std::string, message_ptr> __wait_ack_msgs; // Messages to be confirmed
public:
queue_message(const std::string& base_dir, const std::string& qname)
: __mapper(base_dir, qname) { }
bool insert(const BasicProperties* bp, const std::string& body);
bool remove(const std::string& msg_id); // ack, check whether gc is needed after each remove
message_ptr front(); // Get team leader information
size_t push_count();
size_t total_count();
size_t durable_count();
size_t wait_ack_count();
void clear();
};
Manages the messages of each queue.
Managed members:
Operations provided:
class message_manager {
private:
std::mutex __mtx;
std::string __base_dir;
std::unordered_map<std::string, queue_message::ptr> __queue_msgs; // map
public:
using ptr = std::shared_ptr<message_manager>;
message_manager(const std::string& base_dir)
: __base_dir(base_dir) { }
void init_queue_msg(const std::string& qname) {
queue_message::ptr qmp;
{ // lock
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it != __queue_msgs.end())
return;
qmp = std::make_shared<queue_message>(__base_dir, qname);
__queue_msgs.insert(std::make_pair(qname, qmp));
}
qmp->recovery(); // no lock
}
void destroy_queue_msg(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end())
return;
qmp = it->second;
__queue_msgs.erase(it);
}
qmp->clear();
}
bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, DeliveryMode mode) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "insert msg failed, no this queue: " << qname << std::endl;
return false;
}
qmp = it->second;
}
return qmp->insert(bp, body, mode);
}
message_ptr front(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "get queue front failed, no this queue: " << qname << std::endl;
return message_ptr();
}
qmp = it->second;
}
return qmp->front();
}
void ack(const std::string& qname, const std::string msg_id) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "ack mesg failed, no this queue: " << qname << std::endl;
return;
}
qmp = it->second;
}
qmp->remove(msg_id);
}
size_t getable_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in getable_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->getable_count();
}
size_t total_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in total_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->total_count();
}
size_t durable_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in durable_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->durable_count();
}
size_t wait_ack_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in wait_ack_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->wait_ack_count();
}
void clear() {
std::unique_lock<std::mutex> lock(__mtx);
for (auto& q : __queue_msgs)
q.second->clear();
}
};
In fact, it is to integrate all the previous modules. Let the virtual machine operation class provide an interface to the upper layer.
Essence: The virtual machine operation module is the encapsulation of the previous module
Define the virtual machine class to include the following members:
Virtual machine operation:
Virtual machine management operations:
The structure of the virtual machine class is as follows
namespace hare_mq {
class virtual_host {
private:
exchange_manager::ptr __emp;
msg_queue_manager::ptr __mqmp;
binding_manager::ptr __bmp;
message_manager::ptr __mmp; // Four handles
public:
virtual_host(const std::string& basedir, const std::string& dbfile);
bool declare_exchange(const std::string& name,
ExchangeType type,
bool durable,
bool auto_delete,
std::unordered_map<std::string, std::string>& args); // Declare exchange
void delete_exchange(const std::string& name); // Delete a exchange
bool declare_queue(const std::string& qname,
bool qdurable,
bool qexclusive,
bool qauto_delete,
const std::unordered_map<std::string, std::string>& qargs); // declare queue
void delete_queue(const std::string& name); // Delete a Queue
bool bind(const std::string& ename, const std::string& qname, const std::string& key);
void unbind(const std::string& ename, const std::string& qname);
msg_queue_binding_map exchange_bindings(const std::string& ename);
bool basic_publish(const std::string& qname, BasicProperties* bp, const std::string& body, DeliveryMode mode);
message_ptr basic_consume(const std::string& qname);
bool basic_ack(const std::string& qname, const std::string& msgid);
};
} // namespace hare_mq
The implemented code is as shown in the code.
[!TIP] The essence of this module: determine whether the
routing_key
in a message matches thebinding_key
of the queue, that is: to which queue should the message arriving at the switch be sent?
The client cannot send the message to the specified switch. The switch should consider which queues bound to it should be placed in this data, and this consideration is determined by the switch type and matching rules.
binding_key
in the queue binding information is consistent with the routing_key
in the message, then the match is successful, otherwise it fails.The first point to summarize is: you only need to process the incoming data, and you don’t need to manage the data yourself! Therefore, the interface of this module should all be static
.
Functions to be provided:
routing_key
is legalbinding_key
is legalkey
binding_key
and routing_key
routing_key
Must be: a~z
, A~Z
, 0~9
, .
, _
binding_key
Must be: a~z
, A~Z
, 0~9
, .
, _
, #
, *
. Note that #
and *
are wildcards. *
can match any word, and #
can match zero or any number of words (note that it is a word, not a letter). Wildcards must exist independently, for example, a.#.b
is allowed, but a.a#.b
is not allowed.
There cannot be other wildcards on both sides of #
, because #
itself represents multiple words, so it is meaningless if there are other wildcards on both sides.
Method: Dynamic programming
The logic here is relatively complicated.
The overall idea is very similar to this question in LeetCode: https://leetcode.cn/problems/wildcard-matching/description/
Therefore, I will not repeat the principle here, and you can see the code directly.
static bool __dp_matching(const std::string& routing_key, const std::string& binding_key) {
// 1. Split the binding_key and routing_key into arrays of words
std::vector<std::string> bkeys, rkeys;
int n_bkey = string_helper::split(binding_key, ".", &bkeys);
int n_rkey = string_helper::split(binding_key, ".", &rkeys);
// 2. Define a DP array and initialize position [0][0] as true, other positions as false
std::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));
dp[0][0] = true;
// 3. If the binding_key starts with '#', set the corresponding row's first position to true
for (int i = 1; i <= bkeys.size(); ++i) {
if (bkeys[i - 1] == "#") {
dp[i][0] = true;
continue;
}
break;
}
// 4. Match each word in the routing_key with each word in the binding_key and update the DP array
for (int i = 1; i <= n_bkey; i++) {
for (int j = 1; j <= n_rkey; j++) {
// If the current bkey is a word or '*', or if both words are the same, the match is successful, inherit result from the top-left
if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*")
dp[i][j] = dp[i - 1][j - 1];
// If the current bkey is '#', inherit the result from top-left, left, or above
else if (bkeys[i - 1] == "#")
dp[i][j] = dp[i - 1][j - 1] || dp[i][j - 1] || dp[i - 1][j];
}
}
return dp[n_bkey][n_rkey];
}
Whenever a subscription request is initiated on the client side, it means that there is one more subscriber on the server side (client description for processing messages), and the consumer or subscriber is directly associated with the queue, because the subscription request will describe the message of the current queue.
When a channel is closed or a queue is deleted, the consumer associated with the channel or queue will no longer exist. Therefore, the relevant consumer information also needs to be deleted.
Operation:
Elements:
vector
The framework looks like this:
namespace hare_mq {
// Type definition for consumer callback function that handles messages
using consumer_callback = std::function<void(const std::string& tag, const BasicProperties*, const std::string)>;
// Structure representing a message queue consumer
struct consumer {
using ptr = std::shared_ptr<consumer>; // Shared pointer to consumer
std::string tag; // Identifier for the consumer
std::string qname; // Name of the subscribed queue
bool auto_ack; // Auto acknowledgement flag
consumer_callback callback; // Callback to handle messages
consumer() { }
consumer(const std::string& ctag, const std::string& queue_name, bool ack_flag, consumer_callback& cb)
: tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(cb) { }
};
/* Consumer structure per queue */
class queue_consumer {
private:
std::string __qname; // Queue name
std::mutex __mtx; // Mutex for thread safety
uint64_t __rr_seq; // Round-robin sequencing number
std::vector<consumer::ptr> __consumers; // Managed consumers
public:
queue_consumer(const std::string& qname);
consumer::ptr create(const std::string& ctag, const std::string& queue_name, bool ack_flag, consumer_callback& cb); // Create a consumer
void remove(const std::string& ctag); // Remove a consumer
consumer::ptr choose(); // Choose a consumer for message delivery
bool empty(); // Check if there are no consumers
bool exists(const std::string& ctag); // Check if a consumer exists
void clear(); // Clear all consumers
};
} // namespace hare_mq
In the AMQP model, in addition to the concept of Connection
for communication connection, there is also the concept of Channel
. Channel
is a more fine-grained communication channel for Connection
. Multiple Channel
can use the same communication connection Connection
for communication, but the Channel
of the same Connection
are independent of each other.
Management information:
protobuf
protocol processing handle: protocol processing before network communicationManagement operations:
Channel management: Add, delete, and query channels
**The built framework is shown below. **
namespace hare_mq {
class channel {
public:
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>; // Shared pointer for the protobuf codec
private:
std::string __cid; // Channel identifier
consumer::ptr __consumer; // Each channel corresponds to a consumer in haremq, though not necessarily active as the channel may not be linked to a consumer
muduo::net::TcpConnectionPtr __conn; // TCP connection handle
ProtobufCodecPtr __codec; // Protocol codec handler
consumer_manager::ptr __cmp; // Consumer manager handle
virtual_host::ptr __host; // Virtual host object handle
public:
channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn) {}
~channel();
// Declaration and deletion of exchanges
void declare_exchange();
void delete_exchange();
// Declaration and deletion of queues
void declare_queue();
void delete_queue();
// Binding and unbinding of queues
void bind();
void unbind();
// Publishing and acknowledging messages
void basic_publish();
void basic_ack();
// Subscribing and unsubscribing from queue messages
void basic_consume();
void basic_cancel();
};
} // namespace hare_mq
In short: all the functions implemented above need to provide a network interface for the client to call (the client can be a producer or a consumer)
The specific communication process is implemented using the Muduo
library, using TCP as the underlying communication protocol, and customizing the application layer protocol on this basis to complete the client’s remote call to the server function.
Network interface to be provided:
channel
exchange
queue
binding
message
message
ack
message
(server->client)This is directly used in the Muduo
library. Chen Shuo has already written a method to combine Protobuf
. The message format is shown in the figure.
len
: 4 bytes, indicating the length of the entire message
nameLen
: 4 bytes, indicating the length of the typeName
array
typeName
: is a byte array, occupying nameLen
bytes, identifying the type name of the request/response message, and its function is to distribute different messages to the corresponding remote interface calls
protobufData
: is a byte array, occupying len-nameLen-8
bytes, indicating the binary of the request/response parameter data after being serialized by protobuf
checkSum
: 4 bytes, indicating the checksum of the entire message, and its function is to verify the completeness of the request/response message
For the above parameters, not only ProtobufData
needs to be passed, but all the parameters mentioned need to be passed, so they need to be defined in the pb file!
/*
* Written by Yufc
* See https://github.com/ffengc/HareMQ
* Please cite my project link: https://github.com/ffengc/HareMQ when you use this code
*/
syntax = "proto3";
package hare_mq;
import "msg.proto";
/* Opening and closing of a channel */
message openChannelRequest {
string rid = 1; // Request ID
string cid = 2; // Channel ID
};
message closeChannelRequest {
string rid = 1;
string cid = 2;
};
/* Declaration and deletion of exchanges */
message declareExchangeRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
ExchangeType exchange_type = 4;
bool durable = 5;
bool auto_delete = 6;
map<string, string> args = 7;
};
message deleteExchangeRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
};
/* Declaration and deletion of queues */
message declareQueueRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
bool exclusive = 4;
bool durable = 5;
bool auto_delete = 6;
map<string, string> args = 7;
};
message deleteQueueRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
};
/* Binding and unbinding of queues */
message bindRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
string binding_key = 5;
};
message unbindRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
};
/* Publishing of messages */
message basicPublishRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string body = 4;
BasicProperties properties = 5;
};
/* Acknowledgement of messages */
message basicAckRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
string message_id = 4;
};
/* Subscription to queue messages */
message basicConsumeRequest {
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
bool auto_ack = 5;
};
/* Cancellation of subscription */
message basicCancelRequest {
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
}
/* Pushing of messages */
message basicConsumeResponse {
string cid = 1; // No need for rid, this is a response
string consumer_tag = 2;
string body = 3;
BasicProperties properties = 4;
}
/* Generic response */
message basicCommonResponse {
string rid = 1; // Response to a request identified by rid
string cid = 2;
bool ok = 3;
};
[!CAUTION] If you understand this, you will understand what these files are for!
exchange.hpp
switch-related codequeue.hpp
queue-related codebinding.hpp
binding-related code, establish a connection between the switch and the queue, and bind the switch and the queuemessage.hpp
This is message management. Why do we need message management? The previous switch management, queue management, and binding management are actually the carriers of messages, that is, switches and queues are all places where messages pass.route.hpp
solves the problem of how messages should go from switches to queues. In other words, binding.hpp
only solves the process of binding the switch to the queue, that is, binding.hpp
is just a binding tool. route.hpp
tells you how to bind and who to bind to.virtual_host.hpp
. The methods provided in this file are the methods provided by the entire MQ project server.virtual_host
, the efficiency is very low. Therefore, channel.hpp
is introduced as a helper to improve efficiency. The essence is the same. Therefore, you can see that the methods provided in channel.hpp
are the same as those provided by virtual_host.hpp
. channel.hpp
is the assistant of host
.channel.hpp
? Let the callback of the network interface call!After defining proto, our channel
framework is as follows: Give me a request and I will handle it for you, so the parameters are all requests.
namespace hare_mq {
class channel {
public:
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
using bindRequestPtr = std::shared_ptr<bindRequest>;
using unbindRequestPtr = std::shared_ptr<unbindRequest>;
using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;
using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;
private:
std::string __cid; // Channel identifier
consumer::ptr __consumer; // In haremq, a channel corresponds to a consumer, not necessarily effective, as the channel may not be associated with a consumer
muduo::net::TcpConnectionPtr __conn; // Connection handle
ProtobufCodecPtr __codec; // Protocol handling
consumer_manager::ptr __cmp; // Consumer manager handle
virtual_host::ptr __host; // Virtual host object handle
public:
channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn) { }
~channel();
// Declaration and deletion of exchanges
void declare_exchange(const declareExchangeRequestPtr& req);
void delete_exchange(const deleteExchangeRequestPtr& req);
// Declaration and deletion of queues
void declare_queue(const declareQueueRequestPtr& req);
void delete_queue(const deleteQueueRequestPtr& req);
// Binding and unbinding of queues
void bind(const bindRequestPtr& req);
void unbind(const unbindRequestPtr& req);
// Publishing and acknowledging of messages
void basic_publish(const basicPublishRequestPtr& req);
void basic_ack(const basicAckRequestPtr& req);
// Subscribing/unsubscribing from queue messages
void basic_consume(const basicConsumeRequestPtr& req);
void basic_cancel(const basicCancelRequestPtr& req);
};
} // namespace hare_mq
I encountered a problem when writing the function. The last parameter req->args()
does not match the unordered_map
type. The solution in the project guide is to change all std::unordered_map
to google::protobuf::Map
type. But because the previous code has been tested, I don’t want to change it, so I wrote two conversion functions between Map
.
As shown below.
class map_helper {
public:
static std::unordered_map<std::string, std::string> ConvertProtoMapToStdMap(const google::protobuf::Map<std::string, std::string>& proto_map) {
std::unordered_map<std::string, std::string> std_map;
for (const auto& kv : proto_map) {
std_map[kv.first] = kv.second;
}
return std_map;
}
static google::protobuf::Map<std::string, std::string> ConvertStdMapToProtoMap(const std::unordered_map<std::string, std::string>& std_map) {
google::protobuf::Map<std::string, std::string> proto_map;
for (const auto& kv : std_map) {
proto_map[kv.first] = kv.second;
}
return proto_map;
}
};
Some tests can be performed:
TEST(map_helper_test, test) {
google::protobuf::Map<std::string, std::string> proto_map;
proto_map["one"] = "1";
proto_map["two"] = "2";
std::unordered_map<std::string, std::string> std_map = map_helper::ConvertProtoMapToStdMap(proto_map);
ASSERT_EQ(std_map.size(), 2);
ASSERT_EQ(std_map["one"], "1");
ASSERT_EQ(std_map["two"], "2");
google::protobuf::Map<std::string, std::string> converted_proto_map = map_helper::ConvertStdMapToProtoMap(std_map);
ASSERT_EQ(converted_proto_map.size(), 2);
ASSERT_EQ(converted_proto_map["one"], "1");
ASSERT_EQ(converted_proto_map["two"], "2");
}
**For other operations of channel
, you can directly look at the code. **
The above channel
has been improved. Now we only need to provide a channel_manager
class to manage channel
.
class channel_manager {
private:
std::unordered_map<std::string, channel::ptr> __channels;
std::mutex __mtx; //
public:
channel_manager() = default;
~channel_manager() = default;
void open_channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn,
const thread_pool::ptr& pool) { }
void close_channel(const std::string& cid);
channel::ptr select_channel(const std::string& cid);
};
What is the purpose of connection? Why do we need to encapsulate it? In practice, connection and channel are basically the same. Why do we need it? Just look at this picture to understand.
A connection (socket) may have multiple channels, so connection management is to manage multiple channels.
In other words, a connection has one channel_manager
.
It’s simple, just encapsulate it.
namespace hare_mq {
class connection {
private:
muduo::net::TcpConnectionPtr __conn;
ProtobufCodecPtr __codec;
consumer_manager::ptr __cmp;
virtual_host::ptr __host;
thread_pool::ptr __pool;
channel_manager::ptr __channels; //
public:
connection(const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr& conn,
const thread_pool::ptr& pool)
: __conn(conn)
, __codec(codec)
, __cmp(cmp)
, __host(host)
, __pool(pool)
, __channels(std::make_shared<channel_manager>()) { }
~connection() = default;
void open_channel(const openChannelRequestPtr& req) {
// 1. Determine whether the channel ID is repeated 2. Create a channel
bool ret = __channels->open_channel(req->rid(), __host, __cmp, __codec, __conn, __pool);
if (ret == false)
return basic_response(false, req->rid(), req->cid());
// 3. Reply to the client
return basic_response(true, req->rid(), req->cid());
}
void close_channel(const closeChannelRequestPtr& req) {
__channels->close_channel(req->cid());
return basic_response(true, req->rid(), req->cid());
} //
private:
void basic_response(bool ok, const std::string& rid, const std::string& cid) {
basicCommonResponse resp;
resp.set_rid(rid);
resp.set_cid(cid);
resp.set_ok(ok);
__codec->send(__conn, resp); // Send response to client
} //
};
} // namespace hare_mq
**It is to integrate the previous things, write the network surface interface, and provide services to the outside world! **
It can be modified based on the demo code implemented previously. It only needs to implement the various business interfaces that provide services inside the server.
**In each business processing function, it is also relatively simple. After creating a channel, each time a request comes, find the corresponding channel handle, call the previously encapsulated processing interface through the call handle to process the request, and finally return the processing result. **
In RabbitMQ
, the service is provided by the channel. Therefore, in the implementation of the client, the concept of Client
is weakened, that is, in RabbitMQ
, the concept of network communication will not be shown to the user, but it will be reflected in the form of providing services. Its implementation idea is similar to the ordinary functional interface encapsulation. One interface implements one function, and the process of requesting to the client is completed inside the interface, but the concept of communication between the client and the server does not need to be reflected to the outside world. The user can call any interface for any service he needs. Based on the above ideas, the client implementation is divided into four modules:
Subscriber module:
Channel module:
A module directly facing the user, which contains multiple service interfaces provided to the outside. Users can call the corresponding interface for any service they need
It includes services such as switch declaration/deletion, queue declaration/deletion, message publishing/confirmation, subscription/unsubscription, etc.
Connection module:
Asynchronous thread module:
Mainly need to process two tasks asynchronously:
IO time monitoring thread for client connection
Asynchronous processing thread for pushed messages
Here, in order to simplify the implementation, it is stipulated that the client can only subscribe to one queue.
The code and the server are repeated! See the code for details.
The logic of this part is basically the same as that of the server. The server mainly receives requests, and the client mainly sends requests.
See the code for details.
It is actually a package of thread pool and EventLoop
.
Two tasks need to be processed asynchronously:
namespace hare_mq {
class async_worker {
public:
using ptr = std::shared_ptr<async_worker>;
muduo::net::EventLoopThread loop_thread;
thread_pool pool;
};
} // namespace hare_mq
The code is very simple. The subsequent connection module can directly use this async_worker
, which has a thread pool and event monitoring.
client.hpp
In fact, it is the same as the server. The connection module creates the channel.
Event monitoring detects when the connection comes. When it comes, the channel is created. It’s very simple.
Producer client that publishes messages
Consumer client that subscribes to messages
Building steps:
There must be a producer client
Declare an exchange exchange1
Declare a queue queue1, binding_key=queue1
Declare a queue queue2, binding_key=news.music.#
Bind the two queues to the exchange
Build two consumer clients and subscribe to messages from a queue respectively
Test:
For the first time, set the exchange type to broadcast mode. Theoretical result: both consumer clients can get the message
For the second time, define the exchange as direct exchange mode. Theoretical result: only one client can get the message (queues with the same routing_key
and binding_key
)
For the third time, define the exchange as topic exchange mode, routing_key='news.music.pop'
. Theoretical result: only queue2
Can get the message
First, let’s clarify the project we implemented: imitate RabbitMQ
to implement a simplified version of the message queue component, which implements the construction of the message queue server and client internally, and supports the publishing and subscription of messages between different hosts and the message push function.
Secondly, the technology used in the project: based on the muduo
library to implement the construction of the underlying network communication server and client, at the application layer, based on the protobuf
protocol to design the application layer protocol interface, in data management, the lightweight database sqlite
is used for data persistence management and based on the understanding of the AMQP
model, the integration of the entire message queue project technology is realized, and the gtest
framework is used for unit testing during the implementation of the project to complete the final implementation of the project.