当前位置:网站首页>集群聊天服务器:如何解决跨服务器通信问题 | redis发布-订阅
集群聊天服务器:如何解决跨服务器通信问题 | redis发布-订阅
2022-07-23 21:17:00 【_索伦】
跨服务器通信问题
有两个客户在不同的服务器上登录,两个都在线,但服务器各自的_userConnMap上面没有另一个客户端的登录信息,这样两个客户端想要聊天,就会把消息存储成离线消息,必须等两个客户端在同一台服务器上才能正常通信。

集群服务器之间的通信设计
1. 直接建立TCP连接

上面的设计,让各个ChatServer服务器互相之间直接建立TCP连接进行通信,相当于在服务器网络之间进行广播。这样的设计使得各个服务器之间耦合度太高,不利于系统扩展,并且会占用系统大量的socket资源,各服务器之间的带宽压力很大,不能够节省资源给更多的客户端提供服务,因此绝对不是一个好的设计。
2. 引入中间件消息队列
集群部署的服务器之间进行通信,最好的方式就是引入中间件消息队列,解耦各个服务器,使整个系统松耦合,提高服务器的响应能力,节省服务器的带宽资源,如下图所示:

在集群分布式环境中,经常使用的中间件消息队列有ActiveMQ、RabbitMQ、Kafka等,都是应用场景广泛并且性能很好的消息队列,供集群服务器之间,分布式服务之间进行消息通信。限于我们的项目业务类型并不是非常复杂,对并发请求量也没有太高的要求,因此我们的中间件消息队列选型的是-基于发布-订阅模式的redis。
示例
示例:ChatServer1在redis上订阅和client1相关的事件,ChatServer2在redis上订阅和client2相关的事件,那么如果client1想给client2发消息,就会发布消息,消息队列接收后,就会给ChatServer2 notify,将消息转发给client2。

redis的安装

redis发布-订阅的使用
redis存储数据是以键值对形式存储,示例:


发布订阅
客户端登录不同服务器后,需要在消息队列以用户id订阅通道channel,这样就能利用通道来实现跨服务器通信。

命令:
subscribe + id
publish + id + message
其他命令都会响应,而subscribe会阻塞住,等待消息传过来。



redis编程流程
redis.hpp
#ifndef REDIS_H
#define REDIS_H
#include <hiredis/hiredis.h>
#include <thread>
#include <functional>
using namespace std;
class Redis
{
public:
Redis();
~Redis();
// 连接redis服务器
bool connect();
// 向redis指定的通道channel发布消息
bool publish(int channel, string message);
// 向redis指定的通道subscribe订阅消息
bool subscribe(int channel);
// 向redis指定的通道unsubscribe取消订阅消息
bool unsubscribe(int channel);
// 在独立线程中接收订阅通道中的消息
void observer_channel_message();
// 初始化向业务层上报通道消息的回调对象
void init_notify_handler(function<void(int, string)> fn);
private:
// hiredis同步上下文对象,负责publish消息
redisContext* _publish_context;
// hiredis同步上下文对象,负责subscribe消息
redisContext* _subscribe_context;
// 回调操作,收到订阅的消息,给service层上报
function<void(int, string)> _notify_message_handler;
};
#endif
redis.cpp
#include "redis.hpp"
#include <iostream>
using namespace std;
Redis::Redis()
: _publish_context(nullptr), _subscribe_context(nullptr)
{
}
Redis::~Redis()
{
if (_publish_context != nullptr)
{
redisFree(_publish_context);
}
if (_subscribe_context != nullptr)
{
redisFree(_subscribe_context);
}
}
bool Redis::connect()
{
// 负责publish发布消息的上下文连接
_publish_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _publish_context)
{
cerr << "connect redis failed!" << endl;
return false;
}
// 负责subscribe订阅消息的上下文连接
_subscribe_context = redisConnect("127.0.0.1", 6379);
if (nullptr == _subscribe_context)
{
cerr << "connect redis failed!" << endl;
return false;
}
// 在单独的线程中监听通道上的事件,有消息给业务层进行上报
thread t([&]() {
observer_channel_message();
});
t.detach();
cout << "connect redis-server success!" << endl;
return true;
}
// 向redis指定的channel发布消息
bool Redis::publish(int channel, string message)
{
redisReply* reply = (redisReply*)redisCommand(_publish_context, "PUBLISH %d %s", channel, message.c_str());
if (nullptr == reply)
{
cerr << "publish command failed!" << endl;
return false;
}
freeReplyObject(reply);
return true;
}
// 向redis指定的通道subscribe订阅消息
bool Redis::subscribe(int channel)
{
// SUBSCRIBE命令本身会造成线程阻塞等待通道里面发生消息,这里只做订阅通道,不接收通道消息
// 通道消息的接收专门在observer_channel_message函数中的独立线程中进行
// 只负责发送命令,不阻塞收redis server响应消息,否则和notifyMsg线程抢占响应资源
if (REDIS_ERR == redisAppendCommand(this->_subscribe_context, "SUBSCRIBE %d", channel))
{
cerr << "subscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(this->_subscribe_context, &done))
{
cerr << "subscribe command failed!" << endl;
return false;
}
}
return true;
}
// 向redis指定的通道unsubscribe取消订阅消息
bool Redis::unsubscribe(int channel)
{
if (REDIS_ERR == redisAppendCommand(this->_subscribe_context, "SUBSCRIBE %d", channel))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
// redisBufferWrite可以循环发送缓冲区,直到缓冲区数据发送完毕(done被置为1)
int done = 0;
while (!done)
{
if (REDIS_ERR == redisBufferWrite(this->_subscribe_context, &done))
{
cerr << "unsubscribe command failed!" << endl;
return false;
}
}
return true;
}
// 在独立线程中接收订阅通道中的消息
void Redis::observer_channel_message()
{
redisReply* reply = nullptr;
while (REDIS_OK == redisGetReply(this->_subscribe_context, (void**)&reply))
{
// 订阅收到的消息是一个带三元素的数组
if (reply != nullptr && reply->element[2] != nullptr && reply->element[2]->str != nullptr)
{
// 给业务层上报通道上发生的消息
_notify_message_handler(atoi(reply->element[1]->str), reply->element[2]->str);
}
freeReplyObject(reply);
}
cerr << ">>>>>>>>>>>> observer_channel_message quit <<<<<<<<<<<<" << endl;
}
void Redis::init_notify_handler(function<void(int, string)> fn)
{
this->_notify_message_handler = fn;
}
边栏推荐
- [attack and defense world web] difficulty four-star 12 point advanced question: flatscience
- Hezhou esp32c3 hardware configuration information serial port printout
- 第三届SLAM技术论坛-吴毅红教授
- 手机股票开户安全吗?
- Scala Programming (Junior)
- VLAN综合实验
- Oom mechanism
- vite3学习记录
- If the order is not paid within 30 minutes, it will be automatically cancelled
- Scala programming (intermediate advanced experimental application)
猜你喜欢

1309_ Add GPIO flip on STM32F103 and schedule test with FreeRTOS

【isprint函数判断字符是否可输出】

High numbers | calculation of double integral 2 | high numbers | handwritten notes

TypeScript基础

221. Largest square ● &1277. Square submatrix with statistics all 1 ● ●

OOM机制

第三届SLAM技术论坛-吴毅红教授

Problems and abuse of protocol buffers

Visual slam learning | basic chapter 01

The third slam Technology Forum - Professor wuyihong
随机推荐
221. Largest square ● &1277. Square submatrix with statistics all 1 ● ●
scala編程(初級)
Green-Tao 定理的证明 (2): Von Neumann 定理的推广
Flink principle and development summary (detailed)
MySql的DDL和DML和DQL的基本语法
[attack and defense world web] difficulty four-star 12 point advanced question: flatscience
基于速度、复杂性等因素比较KernelSHAP和TreeSHAP
When we talk about Chen Chunhua and Huawei, what are we talking about?
(Note)优化器Adam的学习率设置
Read the five flow indicators of R & D efficiency insight
Unity—3D数学-Vector3
Comment présenter votre expérience de projet lors d'une entrevue
如何在面試中介紹自己的項目經驗
Major upgrade of openim - group chat reading diffusion model release group management function upgrade
Oom mechanism
[cloud co creation] what magical features have you encountered when writing SQL every day?
Vite3 learning records
一道golang中关于for range常见坑的面试题
SQLite database
At 12 o'clock on July 23, 2022, the deviation from the top of the line of love life hour appeared, maintaining a downward trend and waiting for the rebound signal.