我们需要以下这些工具代码:
这个日志组件我经常使用,直接见代码即可。HareMQ/mqcommon/logger.hpp
。
在 mqcommon
里面创建一个 helper.hpp
即可,把 demo 里面实现的 db.hpp
里面的内容复制过去即可。
具体解释可以见 demo 的文档: docs/sqlite.md
其实就是字符串切割,之前也写过很多次了,用boost
里面的就行了。
class string_helper {
static size_t split(const std::string& str, const std::string& sep, std::vector<std::string>* out, bool if_compress = true) {
// boost split
if (if_compress) {
boost::split(*out, str, boost::is_any_of(sep), boost::token_compress_on);
return out->size();
} else {
boost::split(*out, str, boost::is_any_of(sep), boost::token_compress_off);
return out->size();
}
}
};
[!TIP] UUID(Universally Unique ldentifier), 也叫通用唯一识别码,通常由32位16进制数字字符组成。UUID的标准型式包含32个16进制数字字符,以连字号分为五段,形式为8-4-4-4-12的32个字符,如:
550e8400-e29b-41d4-a716-446655440000
。在这里,uuid生成,我们采用生成8个随机数字,加上8字节序号,共16字节数组生成32位16进制字符的组合形式来确保全局唯一的同时能够根据序号来分辨数据。
class uuid_helper {
public:
std::string uuid() {
std::random_device rd;
std::mt19937_64 generator(rd());
std::uniform_int_distribution<int> distribution(0, 255);
std::stringstream ss;
for (int i = 0; i < 8; ++i) {
ss << std::setw(2) << std::setfill('0') << std::hex << distribution(generator);
if (i == 3 || i == 5 || i == 7)
ss << "-";
static std::atomic<size_t> seq(1); // 这里一定要静态,保证多次调用都是自增的
size_t num = seq.fetch_add(1);
for (int i = 7; i >= 0; i--) {
ss << std::setw(2) << std::setfill('0') << std::hex << ((num >> (i * 8)) & 0xff);
if (i == 6)
ss << "-";
}
}
return ss.str();
}
};
基本框架:
class file_helper {
private:
std::string __file_name;
public:
file_helper(const std::string& file_name)
: __file_name(file_name) { }
~file_helper() = default;
public:
bool exists();
size_t size();
bool read(std::string& body);
bool read(std::string& body, size_t offset, size_t len);
bool write(const std::string& body);
bool write(const std::string& body, size_t offset);
bool create();
bool remove();
bool create_dir();
bool remove_dir();
static std::string parent_dir(const std::string& file_name);
bool rename(const std::string& name);
};
mptest/file_test.cc
先测试两个简单的功能:
void test1() {
hare_mq::file_helper helper("../mqcommon/logger.hpp");
hare_mq::LOG(DEBUG) << "file if exists: " << helper.exists() << std::endl;
hare_mq::LOG(DEBUG) << "file size: " << helper.size() << std::endl;
}
测试目录创建和文件创建:
void test2() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
if (helper.exists() == false) {
std::string p = hare_mq::file_helper::parent_dir(helper.path()); // 先获取父级目录
hare_mq::LOG(DEBUG) << p << std::endl;
if (hare_mq::file_helper(p).exists() == false) {
// 创建目录
hare_mq::file_helper::create_dir(p);
}
hare_mq::file_helper::create(helper.path());
}
}
测试全局的读写:
void test3() {
hare_mq::file_helper helper1("../mqcommon/logger.hpp");
hare_mq::file_helper helper2("./aaa/bbb/ccc/tmp.hpp");
std::string body;
helper1.read(body);
// write to tmp.hpp
helper2.write(body);
}
把logger.hpp
里面的数据读出来,然后写到tmp.hpp
里面去。
测试特定位置的读写:
void test4() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
// 把6-19个字节读取出来
char str[16] = {0};
helper.read(str, 6, 13); // 这里要读 (6, 19] 这些字符,应该传入 6, 13
hare_mq::LOG(DEBUG) << std::string(str) << std::endl;
helper.write("123456\n", 19, 7);
}
通过测试。
测试一下rename
:
void test5() {
hare_mq::file_helper helper("./aaa/bbb/ccc/tmp.hpp");
helper.rename(hare_mq::file_helper::parent_dir(helper.path()) + "/test.hpp");
}
符合预期。
测试删除:
void test6() {
hare_mq::LOG(DEBUG) << "before run" << std::endl;
system("tree .");
hare_mq::file_helper::create("./aaa/bbb/ccc/tmp.hpp");
hare_mq::LOG(DEBUG) << "run: create(\"./aaa/bbb/ccc/tmp.hpp\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove("./aaa/bbb/ccc/tmp.hpp");
hare_mq::LOG(DEBUG) << "run: remove(\"./aaa/bbb/ccc/tmp.hpp\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove_dir("./aaa/bbb/ccc/");
hare_mq::LOG(DEBUG) << "run: remove_dir(\"./aaa/bbb/ccc/\");" << std::endl;
system("tree .");
hare_mq::file_helper::remove_dir("./aaa");
hare_mq::LOG(DEBUG) << "run: remove_dir(\"./aaa\");" << std::endl;
system("tree .");
}
符合预期。
因此定义消息类型,其实就是定义一个消息类型的proto文件,并生成相关代码。
消息的结构:
routing_key
定义proto文件。
syntax = "proto3";
package hare_mq;
enum ExchangeType {
UNKNOWTYPE = 0;
DIRECT = 1;
FANOUT = 2;
TOPIC = 3;
};
enum DeliveryMode {
UNKNOWTYPE = 0;
UNDURABLE = 1;
DURABLE = 2;
};
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
};
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
string valid = 4;
};
现在要开始编写mqserver
里面的exchagne,.hpp
了。
代码基本结构如下所示:
namespace hare_mq {
/**
* 1. 交换机类
* 2. 交换机数据持久化管理类
* 3. 交换机数据内存管理类
*/
struct exchange {
/* 交换机类 */
public:
using ptr = std::shared_ptr<exchange>;
std::string name; // 交换机名称
ExchangeType type; // 交换机类型
bool durable; // 持久化标志
bool auto_delete; // 自动删除标志
std::unordered_map<std::string, std::string> args; // 其他参数
public:
exchange(const std::string ename,
ExchangeType etype,
bool edurable,
bool eauto_delete,
const std::unordered_map<std::string, std::string>& eargs)
: name(ename)
, type(etype)
, auto_delete(eauto_delete)
, args(eargs) { }
// args存储的格式是键值对,在存储数据库的时候,会组织一个字符串进行存储 key=value&key=value
void set_args(const std::string& str_args) {
/**
* 解析 str_args 字符串: key=value&key=value... 存到 args 成员变量中去
*/
}
std::string get_args() {
/**
* set_args()的反操作,把args里面的数据序列化成 key=value&key=value... 的格式
*/
}
};
class exchange_mapper {
/* 交换机数据持久化管理类 */
private:
sqlite_helper __sql_helper; // sqlite操作句柄
public:
exchange_mapper(const std::string& dbfile); // 构造,需要传递数据库文件名称
public:
void create_table(); // 创建表
void remove_table(); // 删除表
void insert(exchange::ptr& e); // 插入交换机
void remove(const std::string& name); // 移除交换机
exchange::ptr one(const std::string& name); // 获取单个交换机
std::unordered_map<std::string, exchange::ptr> all(); // 获取全部交换机
};
class exchange_manager {
/* 交换机数据内存管理类 */
private:
exchange_mapper __mapper; // 持久化管理
std::unordered_map<std::string, exchange::ptr> __exchanges; // 管理所有的交换机
std::mutex __mtx; // exchange_manager 会被多线程调用,管理一个互斥锁
public:
exchange_manager(const std::string& dbfile);
void declare_exchange(const std::string& name,
ExchangeType type,
bool durable,
bool auto_delete,
std::unordered_map<std::string, std::string>& args); // 声明交换机
void delete_exchange(const std::string& name); // 删除交换机
exchange::ptr select_exchange(const std::string& name); // 选择一台交换机
bool exists(const std::string& name); // 判断交换机是否存在
void clear_exchange(); // 清理所有交换机
};
} // namespace hare_mq
具体代码可以见代码所示。
单元测试部分我找到了很多bug,改了一些地方。具体见代码。HareMQ/mqtest/exchange_test.cc
这一部分的基本结构如下所示。
定义队列描述数据类:
定义队列数据持久化类(数据持久化的sqlite3数据库中)
定义队列数据管理类:
具体代码和exchange.hpp
的基本是完全一样的,这里不重复,具体可见代码。
这一部分和交换机部分基本完全相同。
具体见代码。HareMQ/mqtest/queue_test.cc
本质上就是一个交换机关联了哪些队列的描述。
定义绑定信息类:
binding_key
(分发匹配规则-决定了哪些数据能被交换机放入队列)定义绑定信息数据持久化类:
定义绑定信息数据管理类:
同样,也是第三个类才是对外的,和前面其实都是一样的。
一些实现的tips:
using msg_queue_binding_map = std::unordered_map<std::string, binding::ptr>;
using binding_map = std::unordered_map<std::string, msg_queue_binding_map>;
为什么这样设计?
因为一个交换机可以有多个绑定信息,但是一个绑定信息一定只对应一个队列。
所以让队列和绑定信息先构造一个一一对应的map。
其余基本东西和前面写过的基本都相同。要注意这里:
static int select_callback(void* arg, int numcol, char** row, char** fields) {
binding_map* result = (binding_map*)arg;
binding::ptr bp = std::make_shared<binding>(row[0], row[1], row[2]);
// 为了防止绑定信息已经存在,不能直接创建队列映射,直接添加,这样会覆盖历史数据
// 因此要先获得交换机对应的映射对象,往里面添加数据
// 但是若这个时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
msg_queue_binding_map& qmap = (*result)[bp->exchange_name]; // 这里比较巧妙
qmap.insert({ bp->msg_queue_name, bp });
return 0;
}
注意类型,不是直接插入。注意这两种类型即可。
using msg_queue_binding_map = std::unordered_map<std::string, binding::ptr>;
using binding_map = std::unordered_map<std::string, msg_queue_binding_map>;
绑定信息是否需要持久化取决于:交换机持久化+队列持久化,绑定信息才需要持久化。
bool bind(const std::string& ename, const std::string& qname, const std::string& key, bool durable)
但是为了这些hpp之间是解耦合的,因此这里直接传递bool durable
,让外部,让联合调用的时候直接告诉我是否需要持久化即可,而不是在这个文件里面去获取交换机和队列的数据,这样就耦合起来了,不是特别好。
测试和前面基本相同,不再重复,具体可见代码。HareMQ/mqtest/binding_test.cc
。
前面的章节基本上都是在做准备,从消息管理开始就是核心了。
消息有什么要素,这个protobuf文件里面都有定义好了,现在重新解释一下。
message BasicProperties {
string id = 1;
DeliveryMode delivery_mode = 2;
string routing_key = 3;
};
message Message {
message Payload {
BasicProperties properties = 1;
string body = 2;
string valid = 3;
};
Payload payload = 1;
uint32 offset = 2;
uint32 length = 3;
};
消息是可以分为两部分的,第一部分是数据消息本身,也就是网络传输的核心数据,就是上面的Payload
,第二部分就是服务器上的消息管理所需要的额外要素,最主要的就是持久化管理。
在服务器上消息管理需要的额外要素有哪些呢:
关于valid
字段,修改一下要放到Payload
里面才行,因为Payload
的内容都是需要持久化存储的。
思想:以队列为单元进行消息的持久化管理。
当消息文件垃圾回收时,需要加载所有有效消息,重新生成新的数据文件。但是生成新的数据文件后,消息的存储位置就发生了变化,这时候需要更新内存中的数据。 这时候就需要将所有的队列数据进行加锁,然后进行更新,这样锁的冲突太频繁,效率低。 因此,如果每个队列都有自己独立的数据文件,则每次只需要对操作的队列数据进行加锁即可。
这里要注意:
[!NOTE] 消息的持久化管理是不使用数据库的,我们直接使用文件,因为我们不是用来查询的,而是用来备份的,而且消息的长度经常不定,所以选择使用文件更好。
如果要存储到文件中,就要把格式规范好。
我们的格式是: [4字节长度|数据][4字节长度|数据][4字节长度|数据][4字节长度|数据]...
这种格式进行存储。
通过这种方法就能解决粘包问题了。
框架如下所示。
namespace hare_mq {
#define DATAFILE_SUBFIX ".mqd"
#define TMPFILE_SUBFIX ".mqd.tmp" // 定义持久化文件和临时文件的文件名后缀
using message_ptr = std::shared_ptr<Message>;
class message_mapper {
private:
std::string __queue_name; // 队列名
std::string __data_file; // 持久化文件
std::string __tmp_file; // 临时文件
public:
message_mapper(const std::string& base_dir, const std::string& qname);
void create_msg_file();
void remove_msg_file();
void insert(message_ptr& msg);
void remove(const message_ptr& msg);
std::list<message_ptr> gc(); // 垃圾回收
};
} // namespace hare_mq
具体实现可以见代码。
对应持久化管理(消息在磁盘中),现在在内存中的管理,也是要以队列为单位进行管理。
所以需要两层封装的,每一个队列都需要一个类去对他进行管理,然后最终对外的,要管理所有队列,因此还需要一层封装。
所以内层:队列消息管理
操作:
字段:
基本结构如下所示:
/* 队列管理(上面是持久化,这里是内存的)*/
class queue_message {
private:
std::string __queue_name; // 队列名称
size_t __valid_count; // 有效消息数量
size_t __total_count; // 总共消息数量
message_mapper __mapper; // 持久化的句柄
std::list<message_ptr> __msgs; // 待推送的消息
std::unordered_map<std::string, message_ptr> __durable_msgs; // 待持久化的消息
std::unordered_map<std::string, message_ptr> __wait_ack_msgs; // 待确认的消息
public:
queue_message(const std::string& base_dir, const std::string& qname)
: __mapper(base_dir, qname) { }
bool insert(const BasicProperties* bp, const std::string& body);
bool remove(const std::string& msg_id); // ack, 每次remove后要去检查是否需要gc
message_ptr front(); // 获取队首消息
size_t push_count();
size_t total_count();
size_t durable_count();
size_t wait_ack_count();
void clear();
};
管理的是每一个队列的消息。
管理的成员:
提供的操作:
class message_manager {
private:
std::mutex __mtx;
std::string __base_dir;
std::unordered_map<std::string, queue_message::ptr> __queue_msgs; // map
public:
using ptr = std::shared_ptr<message_manager>;
message_manager(const std::string& base_dir)
: __base_dir(base_dir) { }
void init_queue_msg(const std::string& qname) {
queue_message::ptr qmp;
{ // lock
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it != __queue_msgs.end())
return;
qmp = std::make_shared<queue_message>(__base_dir, qname);
__queue_msgs.insert(std::make_pair(qname, qmp));
}
qmp->recovery(); // no lock
} // 创建队列
void destroy_queue_msg(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) // 没找到这个队列,直接返回
return;
qmp = it->second;
__queue_msgs.erase(it);
}
qmp->clear();
} // 销毁队列
bool insert(const std::string& qname, BasicProperties* bp, const std::string& body, DeliveryMode mode) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "insert msg failed, no this queue: " << qname << std::endl;
return false;
}
qmp = it->second;
}
return qmp->insert(bp, body, mode);
} // 向 qname 插入一个消息
message_ptr front(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "get queue front failed, no this queue: " << qname << std::endl;
return message_ptr();
}
qmp = it->second;
}
return qmp->front();
} // 获取 qname 这个队列的队首消息
void ack(const std::string& qname, const std::string msg_id) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "ack mesg failed, no this queue: " << qname << std::endl;
return;
}
qmp = it->second;
}
qmp->remove(msg_id); // 确认就是删除
} // 对 qname 中的 msg_id 进行确认
size_t getable_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in getable_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->getable_count();
}
size_t total_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in total_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->total_count();
}
size_t durable_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in durable_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->durable_count();
}
size_t wait_ack_count(const std::string& qname) {
queue_message::ptr qmp;
{
std::unique_lock<std::mutex> lock(__mtx);
auto it = __queue_msgs.find(qname);
if (it == __queue_msgs.end()) {
LOG(ERROR) << "error in wait_ack_count(), no this queue: " << qname << std::endl;
return 0;
}
qmp = it->second;
}
return qmp->wait_ack_count();
}
void clear() {
std::unique_lock<std::mutex> lock(__mtx);
for (auto& q : __queue_msgs)
q.second->clear();
}
};
其实就是对前面的所有模块进行整合。让虚拟机这个操作类向更上一层提供接口。
本质:虚拟机操作模块就是对前面的模块的封装
定义虚拟机类包含以下成员:
虚拟机的操作:
虚拟机管理操作:
虚拟机类的结构如下所示
namespace hare_mq {
class virtual_host {
private:
exchange_manager::ptr __emp;
msg_queue_manager::ptr __mqmp;
binding_manager::ptr __bmp;
message_manager::ptr __mmp; // 四个句柄
public:
virtual_host(const std::string& basedir, const std::string& dbfile);
bool declare_exchange(const std::string& name,
ExchangeType type,
bool durable,
bool auto_delete,
std::unordered_map<std::string, std::string>& args); // 声明交换机
void delete_exchange(const std::string& name); // 删除交换机
bool declare_queue(const std::string& qname,
bool qdurable,
bool qexclusive,
bool qauto_delete,
const std::unordered_map<std::string, std::string>& qargs); // 声明队列
void delete_queue(const std::string& name); // 删除队列
bool bind(const std::string& ename, const std::string& qname, const std::string& key); // 绑定交换机和队列
void unbind(const std::string& ename, const std::string& qname); // 解除绑定交换机和队列
msg_queue_binding_map exchange_bindings(const std::string& ename); // 获取一台交换机的所有绑定信息
bool basic_publish(const std::string& qname, BasicProperties* bp, const std::string& body, DeliveryMode mode); // 发布一条消息
message_ptr basic_consume(const std::string& qname); // 消费一条消息
bool basic_ack(const std::string& qname, const std::string& msgid); // 确认一条消息
};
} // namespace hare_mq
实现后的代码如代码所示。
[!TIP] 这个模块的本质:判断一个消息中的
routing_key
与队列的binding_key
是否匹配成功,即:这条到达交换机的消息,要发送到哪一个队列里面去?
客户端将消息发不到指定的交换机,交换机这时候要考虑这条数据该放入到哪些与自己绑定的队列中,而这个考量是通过交换机类型以及匹配规则来决定的。
binding_key
与消息中的 routing_key
一致则匹配成功,否则失败。首先需要总结的一点:只需要对传入的数据进行处理即可,不需要自己对数据进行管理!因此这个模块的接口,都应该是 static
的。
需要提供的功能:
routing_key
是否合法binding_key
是否合法key
的匹配binding_key
和 routing_key
的规则routing_key
必须是: a~z
, A~Z
, 0~9
, .
, _
组成
binding_key
必须是: a~z
, A~Z
, 0~9
, .
, _
, #
, *
组成,注意,#
, *
是通配符。*
可以匹配任意一个单词,#
可以匹配零个或者任意多个单词(注意是单词,不是字母),通配符要独立存在,比如可以 a.#.b
, 不能 a.a#.b
。
#
两边不能有其他通配符,因为 #
本身就表示多个单词,所以如果两边还有是没有意义的。
方法: 动态规划
这里的逻辑比较复杂。
整体思路和 LeetCode 里面的这一题非常相像: https://leetcode.cn/problems/wildcard-matching/description/
因此这里不重复解释原理了,具体直接见代码。
static bool __dp_matching(const std::string& routing_key, const std::string& binding_key) {
// 1. 将binding_key与routing_key进行字符串分割,得到各个的单词数组
std::vector<std::string> bkeys, rkeys;
int n_bkey = string_helper::split(binding_key, ".", &bkeys);
int n_rkey = string_helper::split(binding_key, ".", &rkeys);
// 2. 定义标记数组,并初始化[0][0]位置为true,其他位置为false
std::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));
dp[0][0] = true;
// 3. 如果binding_key以#起始,则将#对应行的第0位置置为1
for (int i = 1; i <= bkeys.size(); ++i) {
if (bkeys[i - 1] == "#") {
dp[i][0] = true;
continue;
}
break;
}
// 4. 使用routing_key中的每个单词与binding_key中的每个单词进行匹配并标记数组
for (int i = 1; i <= n_bkey; i++) {
for (int j = 1; j <= n_rkey; j++) {
// 如果当前 bkey 是单词或者 * ,或者两个单词相同,表示单词匹配成功,从左上方继承结果
if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*")
dp[i][j] = dp[i - 1][j - 1];
// 如果当前 bkye 是 # , 则需要从左上,左边,上边继承结果
else if (bkeys[i - 1] == "#")
dp[i][j] = dp[i - 1][j - 1] || dp[i][j - 1] || dp[i - 1][j];
}
}
return dp[n_bkey][n_rkey];
}
客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而户想要订阅哪一个队这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前列的消息。
而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的息给删除掉。意义了,因此也需要将相关的消费者信息给删除掉。
操作:
元素:
vector
架子如下所示:
namespace hare_mq {
using consumer_callback = std::function<void(const std::string& tag, const BasicProperties*, const std::string)>;
struct consumer {
using ptr = std::shared_ptr<consumer>;
std::string tag; // 消费者标识
std::string qname; // 订阅的队列名称
bool auto_ack; // 自动确认标志
consumer_callback callback; // 回调
consumer() { }
consumer(const std::string& ctag, const std::string& queue_name, bool ack_flag, consumer_callback& cb)
: tag(ctag)
, qname(queue_name)
, auto_ack(ack_flag)
, callback(cb) { }
};
/* 以队列为单元的消费者结构 */
class queue_consumer {
private:
std::string __qname;
std::mutex __mtx;
uint64_t __rr_seq; // 轮转序号
std::vector<consumer::ptr> __consumers; // 管理的所有消费者对象
public:
queue_consumer(const std::string& qname);
consumer::ptr create(const std::string& ctag, const std::string& queue_name, bool ack_flag, consumer_callback& cb); // 创建消费者
void remove(const std::string& ctag);
consumer::ptr choose();
bool empty();
bool exists(const std::string& ctag);
void clear();
};
} // namespace hare_mq
在 AMQP模型中,除了通信连接的 Connection
概念外,还有一个 Channel
的概念,Channel
是针对于 Connection
连接的一个更细粒度的通信通道,多个 Channel
可以使用同一个通信连接 Connection
进行通信,但是同一个 Connection
的 Channel
之间相互独立。
管理信息:
protobuf
协议处理句柄:网络通信前的协议处理管理的操作:
信道管理: 信道的增删查
搭建好的框架如下所示。
namespace hare_mq {
class channel {
public:
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>; //
private:
std::string __cid; // 信道标识
consumer::ptr __consumer; // 在haremq中一个信道对应一个消费者,不一定有效,因为信道不一定是消费者关联的
muduo::net::TcpConnectionPtr __conn; // 连接句柄
ProtobufCodecPtr __codec; // 协议处理
consumer_manager::ptr __cmp; // 消费者管理句柄
virtual_host::ptr __host; // 虚拟机对象管理句柄
public:
channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn) { }
~channel();
// 交换机的声明和删除
void declare_exchange();
void delete_exchange();
// 队列的声明和删除
void declare_queue();
void delete_queue();
// 队列的绑定与解除绑定
void bind();
void unbind();
// 消息的发布和确认
void basic_publish();
void basic_ack();
// 订阅/取消订阅队列消息
void basic_consume();
void basic_cancel();
};
} // namespace hare_mq
简单来说:就是上面实现的所有功能,需要提供网络接口,给客户端去调用(客户端可以是生产者也可以是消费者)
具体通信过程采用 Muduo
库来实现,使用 TCP 作为通信的底层协议,同时在这个基础上自定义应用层协议,完成客户端对服务器功能的远程调用。
需要提供的网络接口:
channel
exchange
queue
binding
message
message
ack
message
(服务器->客户端)这个就直接用 Muduo
库里,陈硕大佬已经写好的结合 Protobuf
方法了,报文格式如图所示。
len
: 4个字节,表示整个报文的长度nameLen
: 4个字节,标识 typeName
数组的长度typeName
: 是一个字节数组,占 nameLen
个字节,标识请求/响应报文的类型名称,作用是分发不同的消息到对应的远程接口调用中protobufData
: 是个字节数组,占 len-nameLen-8
个字节,表示请求/响应参数数据通过 protobuf
序列化之后 的二进制checkSum
: 4个字节,表示整个消息的校验和,作用是为了校验请求/响应报文的完成性上面的参数,不只有 ProtobufData
需要传递,所有提到的参数都需要传递,所以需要在pb文件中定义好!
/*
* Write by Yufc
* See https://github.com/ffengc/HareMQ
* please cite my project link: https://github.com/ffengc/HareMQ when you use this code
*/
syntax = "proto3";
package hare_mq;
import "msg.proto";
/* 信道的打开与关闭 */
message openChannelRequest {
string rid = 1; // 请求id
string cid = 2; // 信道id
};
message closeChannelRequest {
string rid = 1;
string cid = 2;
};
/* 交换机的声明与删除 */
message declareExchangeRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
ExchangeType exchange_type = 4;
bool durable = 5;
bool auto_delete = 6;
map<string, string> args = 7;
};
message deleteExchangeRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
};
/* 队列的声明与删除 */
message declareQueueRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
bool exclusive = 4;
bool durable = 5;
bool auto_delete = 6;
map<string, string> args = 7;
};
message deleteQueueRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
};
/* 队列的绑定与解除绑定 */
message bindRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
string binding_key = 5;
};
message unbindRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string queue_name = 4;
};
/* 消息的发布 */
message basicPublishRequest {
string rid = 1;
string cid = 2;
string exchange_name = 3;
string body = 4;
BasicProperties properties = 5;
};
/* 消息的确认 */
message basicAckRequest {
string rid = 1;
string cid = 2;
string queue_name = 3;
string message_id = 4;
};
/* 队列的订阅 */
message basicConsumeRequest {
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
bool auto_ack = 5;
};
/* 订阅的取消 */
message basicCancelRequest {
string rid = 1;
string cid = 2;
string consumer_tag = 3;
string queue_name = 4;
}
/* 消息的推送 */
message basicConsumeResponse {
string cid = 1; // 不需要rid,这个是一个响应
string consumer_tag = 2;
string body = 3;
BasicProperties properties = 4;
}
/* 通用响应 */
message basicCommonResponse {
string rid = 1; // 针对rid请求的响应
string cid = 2;
bool ok = 3;
};
[!CAUTION] 看懂这个,就能理解这些文件是干嘛的了!
exchange.hpp
交换机相关的代码queue.hpp
队列相关的代码binding.hpp
绑定相关代码,把交换机和队列建立联系,交换机和队列的绑定message.hpp
这个是消息的管理。为什么还需要消息的管理呢?前面的交换机管理,队列管理和绑定管理,其实都是消息的载体,也就是说交换机啊,队列啊,都是消息经过的地方。route.hpp
解决了消息应该如何从交换机到队列的问题。也就是说,binding.hpp
只是解决了,交换机绑定队列的过程,也就是说,binding.hpp
只是绑定的工具,如何绑定,绑定谁,route.hpp
来告诉你virtual_host.hpp
, 这个文件里面提供的方法,就是整个 MQ 项目服务端提供的方法。virtual_host
则效率非常低,因此引入 channel.hpp
作为一个帮手,来提高效率,本质是一样的,因此可以看到 channel.hpp
里面提供的方法,和 virtual_host.hpp
提供的方法是一样的,channel.hpp
就是 host
的助理。channel.hpp
里面这些接口谁来调用呢?让网络接口的回调来调用!定义好proto之后,我们的 channel
架子如下:给我请求,我帮你处理,所以参数都是请求。
namespace hare_mq {
class channel {
public:
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;
using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;
using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;
using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;
using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;
using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;
using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;
using bindRequestPtr = std::shared_ptr<bindRequest>;
using unbindRequestPtr = std::shared_ptr<unbindRequest>;
using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;
using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;
using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;
using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;
using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>; //
private:
std::string __cid; // 信道标识
consumer::ptr __consumer; // 在haremq中一个信道对应一个消费者,不一定有效,因为信道不一定是消费者关联的
muduo::net::TcpConnectionPtr __conn; // 连接句柄
ProtobufCodecPtr __codec; // 协议处理
consumer_manager::ptr __cmp; // 消费者管理句柄
virtual_host::ptr __host; // 虚拟机对象管理句柄
public:
channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn) { }
~channel();
// 交换机的声明和删除
void declare_exchange(const declareExchangeRequestPtr& req);
void delete_exchange(const deleteExchangeRequestPtr& req);
// 队列的声明和删除
void declare_queue(const declareQueueRequestPtr& req);
void delete_queue(const deleteQueueRequestPtr& req);
// 队列的绑定与解除绑定
void bind(const bindRequestPtr& req);
void unbind(const unbindRequestPtr& req);
// 消息的发布和确认
void basic_publish(const basicPublishRequestPtr& req);
void basic_ack(const basicAckRequestPtr& req);
// 订阅/取消订阅队列消息
void basic_consume(const basicConsumeRequestPtr& req);
void basic_cancel(const basicCancelRequestPtr& req);
};
} // namespace hare_mq
map
类型的冲突问题void declare_exchange(const declareExchangeRequestPtr& req) {
__host->declare_exchange(req->exchange_name(),
req->exchange_type(),
req->durable(),
req->auto_delete(),
req->args());
}
写函数的时候遇到了问题,最后一个参数 req->args()
和 unordered_map
类型对应不上,项目指引上的解决方法是把所有的 std::unordered_map
, 全部修改成 google::protobuf::Map
类型。但是因为之前的代码都经过测试了,我不想动,因此写了两种 Map
的相互转换函数。
如下所示。
class map_helper {
public:
static std::unordered_map<std::string, std::string> ConvertProtoMapToStdMap(const google::protobuf::Map<std::string, std::string>& proto_map) {
std::unordered_map<std::string, std::string> std_map;
for (const auto& kv : proto_map) {
std_map[kv.first] = kv.second;
}
return std_map;
}
static google::protobuf::Map<std::string, std::string> ConvertStdMapToProtoMap(const std::unordered_map<std::string, std::string>& std_map) {
google::protobuf::Map<std::string, std::string> proto_map;
for (const auto& kv : std_map) {
proto_map[kv.first] = kv.second;
}
return proto_map;
}
};
可以进行一些测试:
TEST(map_helper_test, test) {
google::protobuf::Map<std::string, std::string> proto_map;
proto_map["one"] = "1";
proto_map["two"] = "2";
std::unordered_map<std::string, std::string> std_map = map_helper::ConvertProtoMapToStdMap(proto_map);
ASSERT_EQ(std_map.size(), 2);
ASSERT_EQ(std_map["one"], "1");
ASSERT_EQ(std_map["two"], "2");
google::protobuf::Map<std::string, std::string> converted_proto_map = map_helper::ConvertStdMapToProtoMap(std_map);
ASSERT_EQ(converted_proto_map.size(), 2);
ASSERT_EQ(converted_proto_map["one"], "1");
ASSERT_EQ(converted_proto_map["two"], "2");
}
对于 channel
的其他操作,大家可以直接看代码了。
上面 channel
已经完善好了,现在只需要提供一个对 channel
进行管理的 channel_manager
类即可。
class channel_manager {
private:
std::unordered_map<std::string, channel::ptr> __channels;
std::mutex __mtx; //
public:
channel_manager() = default;
~channel_manager() = default;
void open_channel(const std::string& cid,
const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr conn,
const thread_pool::ptr& pool) { }
void close_channel(const std::string& cid);
channel::ptr select_channel(const std::string& cid);
};
连接是干嘛的,为什么还要封装一层,实现起来连接和信道基本都一样啊,为什么还需要?看看这图就知道了。
一个连接(socket)可能有多个信道,所以连接管理就是把多个信道管理起来。
也就是说,一个连接有一个channel_manager
。
简简单单,封装一下即可。
namespace hare_mq {
class connection {
private:
muduo::net::TcpConnectionPtr __conn;
ProtobufCodecPtr __codec;
consumer_manager::ptr __cmp;
virtual_host::ptr __host;
thread_pool::ptr __pool;
channel_manager::ptr __channels; //
public:
connection(const virtual_host::ptr& host,
const consumer_manager::ptr& cmp,
const ProtobufCodecPtr& codec,
const muduo::net::TcpConnectionPtr& conn,
const thread_pool::ptr& pool)
: __conn(conn)
, __codec(codec)
, __cmp(cmp)
, __host(host)
, __pool(pool)
, __channels(std::make_shared<channel_manager>()) { }
~connection() = default;
void open_channel(const openChannelRequestPtr& req) {
// 1. 判断信道ID是否重复 2. 创建信道
bool ret = __channels->open_channel(req->rid(), __host, __cmp, __codec, __conn, __pool);
if (ret == false)
return basic_response(false, req->rid(), req->cid());
// 3. 给客户端回复
return basic_response(true, req->rid(), req->cid());
}
void close_channel(const closeChannelRequestPtr& req) {
__channels->close_channel(req->cid());
return basic_response(true, req->rid(), req->cid());
} //
private:
void basic_response(bool ok, const std::string& rid, const std::string& cid) {
basicCommonResponse resp;
resp.set_rid(rid);
resp.set_cid(cid);
resp.set_ok(ok);
__codec->send(__conn, resp); // 发送响应给客户端
} //
};
} // namespace hare_mq
就是把之前的东西整合起来,把网络表层接口写好,对外提供服务!
基于前面实现过的demo代码进行改造就可以了,只需要实现服务器内部提供服务的各个业务接口即可。
在各个业务处理函数中,也比较简单,创建信道之后,每次请求过来之后,找到对应的信道句柄,通过调用句柄调用前面封装好的处理接口进行请求处理,最终返回处理结果即可。
在 RabbitMQ
中,提供服务的是信道,因此在客户端的实现中,弱化了 Client
客户端的概念,也就是说在 RabbitMQ
中并不会向用户展示网络通信的概念出来,而是以一种提供服务的形式来体现。其实现思想类似于普通的功能接口封装,一个接口实现一个功能,接口内部完成向客户端请求的过程,但是对外并不需要体现出客户端与服务端通信的概念,用户需要什么服务就调用什么接口就行。基于以上的思想,客户端的实现共分为四大模块:
订阅者模块:
信道模块:
连接模块:
异步线程模块:
主要需要异步处理两个任务:
在这里为了简化实现,规定客户端只能订阅一个队列。
代码和服务端的是重复的!具体可见代码。
这一部分逻辑和服务端基本是一样的,服务端主要是接收请求,客户端主要就是发送请求。
具体见代码。
其实就是对线程池和 EventLoop
进行一个封装。
主要需要异步处理两个任务:
namespace hare_mq {
class async_worker {
public:
using ptr = std::shared_ptr<async_worker>;
muduo::net::EventLoopThread loop_thread;
thread_pool pool;
};
} // namespace hare_mq
代码非常简单,后续连接模块可以直接使用这个 async_worker
, 里面有线程池也有事件监控。
client.hpp
其实和服务端的是一样的,由连接模块来创建信道。
事件监控检测连接啥时候来,来了之后,就创建信道,很简单。
搭建步骤:
exchange1
queue1, binding_key=queue1
queue2, binding_key=news.music.#
测试:
routing_key
和 binding_key
相同的队列)routing_key='news.music.pop'
,理论结果:只有 queue2
能拿到消息首先明确我们所实现的项目:仿 RabbitMQ
实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能。
其次项目中所用到的技术:基于 muduo
库实现底层网络通信服务器和客户端的搭建,在应用层基于 protobuf
协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite
来进行数据的持久化管理以及基于 AMQP
模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用 gtest
框架进行单元测试,完成项目的最终实现。