本文作者:包子也沉默

[开源] .NETCore websocket 即时通讯组件---ImCore

包子也沉默 3年前 (2019-07-31) ( 07-31 ) 4241 1条评论
摘要: hanne1imServer2订阅redisChanne2imServer3订阅redisChanne3imServer4订阅redisChanne4业务方(webApi)端根据接收方的clientId后四位16进制与节点总数取模,定位到对应的redisChannel,进行redis->publish操作将消息定位到相应的imServer。每个imServer管理着对应的终端连接,当接收到r

前言

ImCore 是一款 .NETCore 下利用 WebSocket 实现的简易、高性能、集群即时通讯组件,支持点对点通讯、群聊通讯、上线下线事件消息等众多实用性功能。

rncc;16}创建MQClientInstance实例:1publicMQClientInstance(ClientConfigclientConfig,intinstanceIndex,Strin

开源地址:https://github.com/2881099/im ,求 star~~

快速开始

dotnet add package ImCore

<String,MQConsumerInner>entry=it.next();60MQConsumerInnerimpl=entry.getValue();61if(impl!=null

IM服务端

public void Configure(IApplicationBuilder app)
{
    app.UseImServer(new ImServerOptions
    {
        Redis = new CSRedis.CSRedisClient("127.0.0.1:6379,poolsize=5"),
        Servers = new[] { "127.0.0.1:6001" }, //集群配置
        Server = "127.0.0.1:6001"
    });
}

一套永远不需要迭代更新的IM服务端

回调处理?我们可以这样设定,所有用户的主动行为走业务方(webApi),imServer只负责即时消息推送。什么意思?用户A向好友B发送消息:客户端请求业务方(webApi)接口,由业务方(webAp

WebApi业务端

public void Configure(IApplicationBuilder app)
{
    //...

    ImHelper.Initialization(new ImClientOptions
    {
        Redis = new CSRedis.CSRedisClient("127.0.0.1:6379,poolsize=5"),
        Servers = new[] { "127.0.0.1:6001" }
    });

    ImHelper.EventBus(
        t => Console.WriteLine(t.clientId + "上线了"), 
        t => Console.WriteLine(t.clientId + "下线了"));
}
ImHelper方法 参数 描述
PrevConnectServer (clientId, string) 在终端准备连接 WebSocket 前调用
SendMessage (发送者, 接收者, 消息内容, 是否回执) 发送消息
GetClientListByOnline - 返回所有在线clientId
EventBus (上线委托, 离线委托) socket上线与下线事件
群聊频道 参数 描述
JoinChan (clientId, 频道名) 加入
LeaveChan (clientId, 频道名) 离开
GetChanClientList (频道名) 获取群聊频道所有clientId
GetChanList - 获取所有群聊频道和在线人数
GetChanListByClientId (clientId) 获取用户参与的所有群聊频道
GetChanOnline (频道名) 获取群聊频道的在线人数
SendChanMessage (clientId, 频道名, 消息内容) 发送群聊消息,所有在线的用户将收到消息

说明:clientId 应该与 webApi的用户id相同,或者有关联。

f(prev!=null){10instance=prev;11log.warn("ReturnedPreviousMQClientInstanceforclientId:[{}]",clientId

Html5终端

本方案支持集群分区,前端连接 websocket 前,应该先请求 webApi 获得地址(ImHelper.PrevConnectServer)。

24return"ThreadLocalIndex{"+25"threadLocalIndex="+threadLocalIndex.get()+26"}";27}28}通过Threa

运行示例

运行环境:.NETCore 2.1 + redis-server 2.8

oFromNameServer(topic,1000*3);23}24if(topicRouteData!=null){25TopicRouteDataold=this.topicRouteTable

下载Redis-x64-2.8.2402.zip,点击 start.bat 运行;

cd imServer && dotnet run

AULT_PRODUCER_GROUP产生冲突1publicstaticfinalStringDEFAULT_PRODUCER_GROUP="DEFAULT_PRODUCER";接下来实例化mQCli

cd web && dotnet run

alse;3privatebooleanhaveTopicRouterInfo=false;4privateList<MessageQueue>messageQueueList=newAr

打开多个浏览器,访问 http://127.0.0.1:5000 发送群消息

image

设计思路

imServer 是 websocket 服务中心,可部署多实例,按clientId分区管理socket连接;

QProducer.getProducerGroup(),this);12if(!registerOK){13this.serviceState=ServiceState.CREATE_JUST;14

webApi 或其他应用端,使用 ImHelper 调用相关方法(如:SendMessage、群聊相关方法);

stalready.",group);9returnfalse;10}1112returntrue;13}在MQClientInstance初始化时,会创建producerTable、consumer

消息发送利用了 redis 订阅发布技术。每个 imServer 订阅相应的频道,收到消息,指派 websocket 向终端(如浏览器)发送消息;

tione){48log.error("ScheduledTaskpersistAllConsumerOffsetexception",e);49}50}51},1000*10,this.client

1、可缓解并发推送消息过多的问题;

查:1privatebooleanisBrokerAddrExistInTopicRouteTable(finalStringaddr){2Iterator<Entry<String,To

2、可解决连接数过多的问题;

();30}3132log.info("theproducer[{}]startOK.sendMessageWithVIPChannel={}",this.defaultMQProducer.getP

客户端连接流程:client -> websocket -> imserver

motingClient进行更新,而此时的remotingClient也就是刚才创建的NettyRemotingClientNettyRemotingClient的updateNameServerAd

imserver 订阅消息:client <- imserver <- redis channel

==this.clientConfig.getNamesrvAddr()){3this.scheduledExecutorService.scheduleAtFixedRate(newRunnable

推送消息流程:web1 -> sendmsg方法 -> redis channel -> imserver

67Iterator<Entry<String,HashMap<Long,String>>>itBrokerTable=this.brokerAddrTable.e

imserver 充当消息转发,及维护连接中心,代码万年不变不需要重启维护;

tcher){4try{5traceDispatcher.start(this.getNamesrvAddr());6}catch(MQClientExceptione){7log.warn("tra

WebSocket

比较笨的办法是浏览器端使用websocket,其他端socket,这种混乱的设计非常难维护。

nnewThread(r,"NettyClientPublicExecutor_"+this.threadIndex.incrementAndGet());22}23});2425this.event

强烈建议所有端都使用websocket协议,adorid/ios/h5/小程序全部支持websocket客户端。

sponse返回,在这里接收到进行处理,封装成TopicRouteData在invokeSync方法中采用懒加载的方式,尝试获取已经建立好连接的Channel,若是没有,则需要通过bootstrap的

业务与通讯协议

im系统一般涉及【我的好友】、【我的群】、【历史消息】等等。。

SSAGE_WITH_VIP_CHANNEL_PROPERTY和vipChannelEnabled:决定是否使用VIP通道,即高优先级回到DefaultMQProducer的构造方法,其会创建Defa

那么,imServer与业务方(webApi)该保持何种关系呢?

tConfigclientConfig){4this.clientConfig=clientConfig;5topAddressing=newTopAddressing(MixAll.getWSAdd

用户A向好友B发送消息,分析一下:

取名称地址HttpTinyClient的httpGet方法:1staticpublicHttpResulthttpGet(Stringurl,List<String>headers,Lis

  • 需要判断B是否为A好友;
  • 需要判断A是否有权限;
  • 等等。。

诸如此类业务判断会很复杂,我们试想一下,如果使用imServer做业务协议,它是不是会变成巨无霸难以维护?

returngetTopicRouteInfoFromNameServer(topic,timeoutMillis,true);4}56publicTopicRouteDatagetTopicRout

又比如获取历史聊天记录,难道客户端要先websocket.send("gethistory"),再在onmessage里定位回调处理?

efaultMQProducer(finalStringproducerGroup){6this(producerGroup,null);7}89publicDefaultMQProducer(fin


我们可以这样设定,所有用户的主动行为走业务方(webApi),imServer只负责即时消息推送。什么意思?

r()!=null){11this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());12

用户A向好友B发送消息:客户端请求业务方(webApi)接口,由业务方(webApi)后端向imServer发起推送请求,imServer收到指令后,向前端用户B的websocket发送数据,用户B收到了消息。

nnameSrvAddr;16}这里首先根据topAddressing的fetchNSAddr方法获取名称服务地址,若是获取到了,则判断是否需要更新名称服务列表以及原来的nameSrvAddrtopA

获取历史消息:客户端请求业务方(webApi)接口,返回json(历史消息)

verException",e);76}77}finally{78this.lockNamesrv.unlock();79}80}else{81log.warn("updateTopicRouteIn

回执:用户A如何知道消息发送状态(成功或失败或不在线)?imServer端向用户B发送消息时,把状态以消息的方式推给用户A即可(按上面的逻辑),具体请看源码吧。。。

数据。事件消息IM系统比较常用的有上线、下线,在imServer层才能准确捕捉事件,但业务代码就不合适在这上面编写了。采用redis发布订阅技术,将上线、下线等事件向指定频道发布,业务方(webApi

发送消息

采用 redis 轻量级的订阅发布功能,实现消息缓冲发送。

tance这里在实例化MQClientInstance时,并没有直接传入clientConfig,而是通过cloneClientConfig方法复制了一份,来保证安全性:1publicClientCo

集群分区

单个imServer实例支持多少个客户端连接,两千个没问题?

dr()){8this.mQClientAPIImpl.fetchNameServerAddr();9}10//Startrequest-responsechannel11this.mQClientA

如果在线用户有10万人,怎么办???

lishInfo(topic,topicRouteData);43publishInfo.setHaveTopicRouterInfo(true);44Iterator<Entry<Str

比如部署4个imServer:

rver订阅相应的频道,收到消息,指派websocket向终端(如浏览器)发送消息;1、可缓解并发推送消息过多的问题;2、可解决连接数过多的问题;客户端连接流程:client->websocke

imServer1 订阅 redisChanne1
imServer2 订阅 redisChanne2
imServer3 订阅 redisChanne3
imServer4 订阅 redisChanne4

Manager(){9}1011publicstaticMQClientManagergetInstance(){12returninstance;13}14}其中factoryTable是所有生产者

业务方(webApi)端根据接收方的clientId后四位16进制与节点总数取模,定位到对应的redisChannel,进行redis->publish操作将消息定位到相应的imServer。

his.brokerName=brokerName;13this.queueId=queueId;14}15}可以看到这是一个简单的pojo,其封装了topic,brokerName以及queueId

每个 imServer 管理着对应的终端连接,当接收到 redis 订阅消息后,向对应的终端连接推送数据。

r){7try{8if(this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS,TimeUnit.MILLISECONDS)){9try{10TopicRouteDa

事件消息

IM 系统比较常用的有上线、下线,在 imServer 层才能准确捕捉事件,但业务代码就不合适在这上面编写了。

,//集群配置Server="127.0.0.1:6001"});}一套永远不需要迭代更新的IM服务端WebApi业务端publicvoidConfigure(IApplicati

采用 redis 发布订阅技术,将上线、下线等事件向指定频道发布,业务方(webApi) 通过 ImHelper.EventBus 方法进行订阅捕捉。

27this.serviceState=ServiceState.RUNNING;28break;首先更改serviceState状态为START_FAILED,防止中途的失败checkConfig方

image

结束语

谢谢支持!

>topicList=newHashSet<String>();34//Consumer5{6Iterator<Entry<String,MQConsumerInner&

文章版权声明:除非注明,否则均为本站原创文章,转载或复制请以超链接形式并注明出处。
分享到:

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

发表评论

快捷回复:

评论列表 (有 1条评论, 4241人围观) 参与讨论
网友昵称:爱吃鱼的加菲猫
爱吃鱼的加菲猫 游客 2020-05-18 1楼 回复
博主逻辑清晰,写得很好,不过要是想简单点的话可以试试第三方websocket推送框架,像GoEasy,有免费版的,提供完整的websocket前后端解决方案,支持多种前后端语言的使用,我一般都用这个,自己构建太麻烦了,还费时间,博主有兴趣可以试一下。地址:https://www.goeasy.io