连接管理模块
这个模块的作用就:
向⽤⼾提供⼀个⽤于实现⽹络通信的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与服务端进⾏⽹络通信。因为我的这个项目并没有使用原生的套接字而是使用了muduo库提供的网络套接字,只需要管理好这个muduo库中的连接即可。但是完全依靠这个muduo也是不可行的,因为muduo库中的连接对象是基于muduo库中的需求进行制作的,和我们现在的需求是不一样的,我现在的需求是一个连接还需要继续细分出信道,也就是每一个连接需要具有创建信道和销毁信道的功能,这个功能muduo库并没有实现,而我们现在要实现的这模块就是提供这两个功能的,所以创建信道需要什么信息,这些信息就会作为成员变量在这个模块中。
1. 管理信息:
a. 连接关联的实际⽤于通信的muduo::net::Connection连接
b. 连接关联的信管理句柄(实现信道的增删查)
c. 连接关联的EventLoop异步循环⼯作线程
d. 异步⼯作线程池(⽤于对收到服务器推送过来的消息进⾏处理的线程池)
由此才能够提供信道的创建和删除功能,最后再写一个类来实现对连接的管理。
下面就是这个连接管理模块的基本框架的实现了:
上图中多了一个成员:_cid,信道id是没有必要存在的
然后就是去实现构造函数和这两个函数了,同时当执行完两个函数之后还会去给客户端发送响应:
因为底层的代码已经基本完成了,所以上层代码只需要去进行调用即可。所以越是上层越要理解每个文件中的类的作用是什么。
下面要完成的就是连接的管理类了,这个类中的成员也就两个一个是互斥锁,一个就是连接的map了,这个管理类也就提供三个操作,增加,删除和查询连接。这个map我打算让muduo库中的连接作为key而我现在写的这个连接作为value,这样一般的连接还是使用muduo库中的连接,而特别的功能由我自己的连接来完成。
首先就是增加连接的功能:函数逻辑就是寻找这个连接是否存在,不存在就创建,存在什么都不做
然后是删除连接功能:函数逻辑寻找连接是否存在,存在就删除,不存在什么都不做
最后是获取一个连接的功能:
到这里这个连接管理也就完成了,因为这个模块依旧是和网络有关,所有无法进行逻辑测试,只能先进行编译测试。这里我在channel类和consummer类的构造和析构函数中增加了一些打印。让看是否存在资源泄露。
然后就是编译测试了:
可以看到编译错误是不存在的。
服务器设计
这里是将上面所有服务端的功能整合到一起成为一个完整的服务。这里先回顾一下客户端回向客户端发送的协议的格式:首先在应用层协议使用的是一个muduo库封装的protobuf协议
使用的是lv的结构,lv结构也就是规定一个长度字段和一个value字段,长度字段的长度是固定的,长度字段中描述的长度就是value的长度,由此解决粘包问题,而在具体的value中又设定了各种不同的响应格式,以为客户端提供不同的服务。这些请求协议格式在上面已经说明了。这里主要理解应用层的协议要解决的第一件事情就是粘包问题,然后再value中针对不同的请求有不同的协议字段,不同的协议字段这里采用的是使用protobuf来完成这些请求协议的序列化和反序列化的功能。当服务端收到这些不同的请求之后识别这些请求,知道客户端想要什么样的服务,然后服务器再根据自己所管理的资源来给客户端提供相应的服务,这就是搭建服务器的基本目的。在之前介绍muduo库的时候已经说明过如何使用muduo库搭建一个服务器。在那个例子中使用服务器对象接收客户端的连接,再通过分发器识别不同的请求,以提供不同的服务。所以如果使用muduo库的话,服务器的搭建并不算困难,也因为分发器能够识别请求然后调用不同的解决方法,也让我们将精力集中在业务处理上,服务器搭建不难,资源整合也不难,因为现在服务器需要的各种子模块已经全部完成了。
下面就是在搭建服务器的时候需要使用到的资源:
- _virtual_host:服务器持有的虚拟主机。队列、交换机、绑定、消息等数据都是通过虚拟主机管理。
- _server:Muduo库提供的一个通用TCP服务器,我们可以封装这个服务器进行TCP通信。
- baseloop:主事件循环器,用于响应IO事件和定时器事件,主loop主要是为了响应监听描述符的IO事件。
- _dispatcher:一个消息分发器,当Socket接收到一个报文消息后,我们需要按照消息的类型,即上面提到的typeName进行消息分发,会不同类型的消息分发相对应的处理函数中,下边具体讲解。
- _codec:一个编解码器,我们在TCP服务器上设计了一层应用层协议,这个编解码器主要就是负责实现应用层协议的解析和封装,下边具体讲解。
- _consumer:服务器中的消费者信息管理句柄。
- _threadpool:异步工作线程池,主要用于队列消息的推送工作。
- _connections:连接管理句柄,管理当前服务器上的所有已经建立的通信连接
下面要做的就是将服务器的框架搭建出来了。
其实就是将之前写的demo中的工作函数删除,直接将代码复制过来,再增加其它的成员:
还保留有两个函数:
然后就是来看我们的应用层请求有哪些了
然后就是先去完成这些函数的声明了。
然后需要将这些函数在构造函数中绑定给分发器。
这些业务函数的绑定一定不能出现错误,否则就会出现服务无法对应的情况。
然后要做的事情就是去完成这些业务函数了,这些业务函数的完成也很简单首先反序列化,得到数据,调用服务,然后序列化响应发送到客户端。
下面就是来完善这个类了,首先是构造函数,这个服务器的构造函数,现在只是将Tcpconnection以及分发器完成了初始化,对于其它的内部资源,比如虚拟机,消费者等等都没有进行初始化。需要对这些成员进行初始化。
但是这样是不够的,因为只是将这个类中需要管理的资源实例化出来是肯定不够的,因为不要放了这个项目是需要完成读取之间持久化后的数据的,对于消息队列中的消息,这些的实例化在底层代码已经完成了,自然不需要关心,但是如果后序存在某个消费者订阅了某个数据恢复的消息队列,但是这个消息队列的消费者队列并没有完成创建不就出现问题了吗?所以在这个初始化函数还需要完成一件是事情就是获取所有恢复后的队列名字,然后使用这些队列名字去完成消费者队列的实例化。这需要virtualhost提供一个接口,这个接口就会返回当前所有的队列信息。
其它的就没有特殊的了,因为在取消订阅的时候这个消费者队列会自动进行移除
下一个当一个连接建立的时候需要去创建一个我这个服务的连接对象。
在连接创建完毕的时候需要去销毁这个连接。
到这里前置的操作就基本完成了,构造函数已经建立和销毁连接的函数都完成了。
然后就是业务处理函数了,首先是打开信道的函数。
这个函数的逻辑就是调用连接中的openchannl函数,所以需要先找到连接对象。
找到了就使用连接中的打开信道函数,没有找到就关闭这个连接,返回false
然后是关闭信道的业务函数,逻辑和上一个基本是一样的
下一个声明交换机的业务函数:函数逻辑依旧是通过TCP连接去寻找我自己的连接,然后通过我自己的连接去获取信道对象,然后使用信道对象去进行交换机的声明。
这里就需要connection提供一个通过信道id获取信道对象的函数:
然后就可以去完成这个业务函数:
下一个业务函数删除交换机,逻辑和上面一个是一样的
只不过最后调用的函数不一样。
然后是声明队列和删除队列的业务函数,业务逻辑也是一样的
声明队列:
删除队列:
队列的绑定和解绑函数逻辑依旧是一致的
队列绑定:
队列解绑:
然后就是消息发布的业务函数了,本来消息发布的函数是比较复杂的,但是因为底层的函数我已经写完了,所以这里的业务函数依旧是去进行调用即可,后面的函数基本都是这个逻辑就直接上代码了:
消息发布:
消息确认:
队列消息订阅:
队列消息取消订阅:
可以看到这个服务器类除了连接函数和构造函数以及连接到来时和取消时的处理函数需要特殊一点,其余的业务函数基本都是一致的。
然后就是这个服务器模块的编译测试了:
也就是创建一个服务器对象,然后进行编译了。
因为要连接的库有点多了,所以这里展示一下g++编译指令
编译成功,运行这个服务器
可以看到date文件夹,以及数据库文件都已经存在了,并且可以看到这个端口也处于listen状态了,说明服务器已经没有问题了
此时这个服务器就创建完成了,就等待客户端连接上来了,客户端声明一个消息队列,这里就会创建队列,客户端订阅一个队列,就会生成一个消费者。各种消息文件也会进行创建。大的逻辑没有问题了,现在就剩下完成客户端,然后进行联合调试了。
到这里整个服务端代码就写完了。
下面就是去完成客户端的代码了。
客户端模块
在RabbitMQ中,提供服务的是信道,因此在客⼾端的实现中,弱化了Client客⼾端的概念,也就是说在RabbitMQ中并不会向⽤⼾展⽰⽹络通信的概念出来,⽽是以⼀种提供服务的形式来体现。其实现思想类似于普通的功能接⼝封装,⼀个接⼝实现⼀个功能,接⼝内部完成向客⼾端请求的过 程,但是对外并不需要体现出客⼾端与服务端通信的概念,⽤⼾需要什么服务就调⽤什么接⼝就⾏。
基于以上的思想,客⼾端的实现共分为四⼤模块:
- 订阅者模块:
- 一个并不直接对用户展示的模块,其在客户端体现的作用是对角色的描述,表示这是一个消费者。
- 信道模块:
- 一个直接面向用户的模块,内部包含多个向外提供的服务接口,用户需要什么服务,调用对应接口即可。其包含交换机声明/删除,队列声明/删除,绑定/解绑,消息发布/确认,订阅/解除订阅等服务。这里可以这么理解:服务端中的信道是为了给客户端进行提供的,而客户端中的信道是为了给用户提供服务的。
- 连接模块:
- 这是唯一能体现出网络通信概念的一个模块了,它向用户提供的功能就是用于打开/关闭信道。
- 异步线程模块:
- 虽然客户端部分,并不对外体现网络通信的概念,但是本质上内部还是包含有网络通信的,因此既然有网络通信,那么就必须包含有一个网络通信IO事件监控线程模块,用于进行客户端连接的IO事件监控,以便于在事件出发后进行IO操作。
- 其次,在客户端部分存在一个情况就是,当一个信道作为消费者而存在的时候,服务端会向信道推送消息,而用户这边需要对收到的消息进行不同的业务处理,而这个消息的处理需要一个异步的工作线程池来完成。 因此异步线程模块包含两个部分:
- 客户端连接的IO事件监控线程
- 推送过来的消息异步处理线程
基于以上模块,实现一个客户端的流程也就比较简单了:
- 实例化异步线程对象
- 实例化连接对象
- 通过连接对象,创建信道
- 根据信道获取自己所需服务
- 关闭信道
- 关闭连接
以上就是简单说明了一下客户端的实现思想。
订阅者模块
这里为了让思想更加简单一点,所以一个队列只允许一个订阅者,由此对于客户端来说,也不需要订阅者的管理,只需要一个订阅者的模块,进行描述:
- 描述当前信道订阅了哪个队列的消息。
- 描述了收到消息后该如何对这条消息进⾏处理。
- 描述收到消息后是否需要进⾏确认回复。
所以这个模块也就是一个简单结构体的定义,需要下面四个信息:
- a. 订阅者标识
- b. 订阅队列名
- c. 是否⾃动确认标志
- d. 回调处理函数(收到消息后该如何处理的回调函数对象)
不要忘了在消息推送响应的协议中也是存在一个消费者标识(订阅者)和信道id的,这两个信息就能够描述出来,当客户端收到这个响应之后,应该交给哪一个信道中的哪一个订阅者来进行消息处理。相当于每一个订阅者内部都保存有自己对于不同消息的处理函数。
还有一个订阅的队列名字,描述的是当前订阅者订阅的队列的消息,存在这个是因为,当客户端收到了这个消息之后还需要向服务端对收到的消息进行一下确认。此时就需要使用到这个订阅队列名字。
从上面就能够看到这个模块的思想比较简单,实现也就很简单了。因为在服务端实现过一个差不多的模块,也就是一个消费者的模块(不是管理模块)。
这样这个模块也就完成了,其实此时是出现了代码冗余的,这里是可以将这个消费者模块设计到公用模块中,这样服务端和客户端都能够进行使用,但是我这里就不这么做了。
下面要去完成的就是客户端的信道管理模块了
信道模块
服务端具有信道同样的客户端也是具有信道的,其功能与服务端几乎一致,或者说不管是客户端的 channel 还是服务端的 channel 都是为了用户提供具体服务而存在的,只不过服务端是为客户端的对应请求提供服务,而客户端的接口服务是为了用户具体需要服务,也可以理解是用户通过客户端 channel 的接口调用来向服务端发送对应请求,获取请求的服务。下面就是这个信道的各个成员和需要提供的服务了。
- 信道信息:
a. 信道ID
b. 信道关联的网络通信连接对象
c. protobuf协议处理对象
d. 信道关联的消费者(当客户端订阅了一个队列之后,就成为了消费者,订阅了哪一个队列,收到消息后是否要进行确认回复,收到消息后要如何处理,都需要这个消费者进行处理<智能指针管理>,毕竟也是存在生产型客户读端的)
e. 请求对应的响应信息队列(这里队列使用<请求id,响应>的hash表,以便于查找指定的响应)
f. 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是muduo库的通信是异步的,因此需要我们自己在收到响应后,通过判断是否是等待的指定响应来进行同步,比如说我客户端需要创建一个交换机然后进行队列的绑定,但是当我客户端继续往下运行的时候,在服务端中这个交换机可能并没有完成创建/绑定,此时就需要使用同步的机制,或者说没有绑定成功,此时也需要同步机制,由此当一个服务只有存在响应了,被要求等待的执行流才能进行返回,因为客户端可能会去申请多个服务,所以需要使用一个响应信息表来储存响应)
有了上面的成员才能提供下面的服务:
- 信道操作:
a. 提供创建信道操作
b. 提供删除信道操作
c. 提供声明交换机操作(强断言-有则OK,没有则创建)
d. 提供删除交换机
e. 提供创建队列操作(强断言-有则OK,没有则创建)
f. 提供删除队列操作
g. 提供交换机-队列绑定操作
h. 提供交换机-队列解除绑定操作
i. 提供添加订阅操作
j. 提供取消订阅操作
k. 提供发布消息操作
l. 提供确认消息操作
最后就是信道的管理:
增加信道,查询信道,删除信道
下面就来实现信道模块的大体框架。
还剩下一个map成员,这个成员中的key和value值分别应该是请求的id,以及这个请求id对应的响应,这样就可以通过请求id快速的得到这个请求的响应是否存在了。这里需要分析一下客户端一共会收到两种响应,一种是通用的响应,另外一种就是消息推送的响应。这里先完成一个标识消息的哈希表。
然后就是功能函数和构造析构函数的声明了:
这些函数的参数其实就是使用了服务端的virtual_host端口中对应的函数,因为说白了这些参数都要传递过去的
下一个需要进行声明的函数就是消息传递函数了,这个函数的参数就存在一些不同了,因为客户端要发送的消息到达了服务端之后应该交给服务单的交换机,交换机在对消息进行路由将这个消息放入到对应的消息队列中。然后消息确认也就是向服务器确认某一个队列中的某一个消息id
然后就是添加订阅的接口了:
这个接口需要说明消费者的标识,是否需要自动确认,要订阅的队列的名字,以及最后收到消息之后的消息处理函数。
然后是取消订阅的函数取消订阅,只需要知道要取消订阅的是哪一个消费者,这样服务器就会通过消费者标识知道这个消费者所在的队列,然后对队列中的这个消费者进行删除
以上就是给用户提供的接口,还有一些给连接提供的接口:
当连接收到响应之后需要往map中进行数据的插入,此时就需要向连接提供一个接口让其能够往map中进行数据的放入
以及当一个连接收到了一个消息推送之后,连接是不知道要如何进行处理的,此时就需要连接通过一个接口这个接口能够让连接收到推送的消息之后,通过信道找到对应的消费者对象,然后调用回调函数进行处理,但是其实这个消息处理并不会在主执行流上进行而是会将推送的消息内容和处理函数一起打包放入到线程池中,让线程池进行处理,最后就是构造函数了:
现在函数的框架已经完成了,下面就是去完成这些函数了。
首先去完成构造函数,需要构造的东西首先是连接,然后是信道的id以及protobuf的协议解析器,还有一个点就是当这个客户端的信道调用了消费函数之后,消费者对象才会产生,在没有调用消费函数之前这个消费者对象是不需要进行初始化的,不然如果这个客户端的功能只是为了发布消息,那么将这个消费者构造出来是没有意义的
然后是析构函数,但是因为当前并没有特别需要进行析构的函数所以暂时不加内容,或者暂时删除掉也是可行的(使用默认的析构函数)。下面是第一个业务函数,声明一个交换机
逻辑就是:构造一个声明交换机的请求对象,然后向服务器发送响应,最后等待服务器的响应,看声明交换机的声明是否正常,声明成功服务器返回正确,否则返回错误。这里在根据返回的结果继续给上层返回。这里面存在一些细节点,比如现在请求已经准备完成,也调用连接完成了发送,此时我这个函数就能够直接进行返回了吗?当然不能,因为此时虽然已经发送了请求但是服务器不一定完成了对这个请求的处理,此时就需要使用同步机制了,如果用户认为交换机已经声明完成了,去进行使用就会出现问题,所以这里需要完成一个给客户端连接的函数:
这个函数就能够让执行流等待响应的到达,并且最后将响应返回给执行流。
这样上面的构造请求函数就能直接调用这个函数完成对响应的获得。
由此就能完成这个函数的编写了:
然后剩下的函数模式几乎和上面这个是一样的。
删除交换机的业务函数:依旧是创建请求,然后等待响应的返回,对于delete来说因为不需要返回值,所以这里就不需要根据响应中的字段进行返回了。
然后就是队列的声明了,基本上和交换机的声明是一样的,只不过构建的请求不一样。
然后就是队列的删除了
然后是队列的绑定
然后是队列的解绑:
下一个消息发布函数
然后是消息确认函数
然后是取消订阅的函数:逻辑依旧是一样的,只不过当取消定于了当前队列之后要将消费者信息进行清空,并且因为取消订阅这个函数需要在析构函数中进行一次统一的调用,对于不是消费者对象的信道就不需要进行取消订阅操作。
最后就是添加订阅这个函数了,这个函数和之前的函数有一些不同首先之前的创建请求,然后发送请求,等待响应的步骤依旧是一样的,但是当信道得到了响应之后,判断响应的结果为true,就代表这一次订阅成功了,那么当前信道就成为了消费者就需要创建一个消费者对象了,然后赋值给成员中的消费者了,并且还有一个点,我当前的设计为一个信道只能订阅一个消费队列,所以如果当前信道已经订阅了一个队列之后,是不允许去订阅其它的队列的。如何判断呢?也很简单如果当前队列已经订阅了一个队列,那么消费者的智能指针就不是空的。
由此这个函数也就完成了。主体的功能也就完成了,还剩下两个函数一个收到响应之后往map中进行插入,还有一个函数是收到消息推送之后要调用消费者中的回调函数进行处理。
我们先来完成当收到基础的响应之后往哈希表中添加基础的响应信息,这个函数很简单,逻辑就是加锁之后往map中进行数据的添加,然后唤醒线程去进行处理即可:
下一个函数当信道收到了推送的消息之后要如何处理呢?就需要通过消费者中的回调函数进行处理,但是有几个小细节首先当前信道的消费者对象不能为空。然后就是推送过来的响应中的消费者标识要和我的消费者标识应该是一样的
这样这个函数也就完成了,这两个函数都是给连接对象提供的。其它的业务函数都是给用户提供的。到这里信道能够提供的操作就完成了,但是在一个连接中是可以存在多个信道的,所以最后需要完成对信道的管理。使用map对信道进行管理key就是信道的id,value就是信道的对象。在去进行管理的书写之前,还有一个点那就是这个信道类的析构函数,这里我希望当这个信道被析构的时候在析构函数中能够去调用一次取消订阅的函数,这样即使用户没有去执行取消订阅的函数,当用户退出的时候用户的订阅信息也会被取消。
然后就是信道管理类中的第一个函数创建一个信道:这个函数传入一个tcp连接和一个protobuf协议处理对象,然后就会创建一个信道对象,还会对对象进行返回。
这里我在channel类中又实现了一个函数用于返回成员变量_cid。下一个函数remove函数,这个函数也就是你传入一个信道的id,然后将其从map中删除即可
最后还有一个获取信道id的信息,实现也很简单通过外界传入的信道id在map中进行查询即可。
到这里整个信道模块就完成了。因为客户端涉及到了网络通信所以这里是无法进行单元测试的,这里先继续写代码,当客户端写完之后再去进行功能的联调。
异步工作线程模块
要完成这个模块首先就要知道客户端这边存在几个异步工作线程
- 一个是muduo库中客户端连接的异步循环线程EventLoopThread,
- 一个是当收到消息后进行异步处理的工作线程池。
第一个模块是为了监控客户端收到的连接,第二个模块是为了处理当连接到来后的IO请求
这两个模块还有一个特点并不专门是为了某一个连接或者信道做服务,而是可以应用在多个连接多个信道多个消费者。由此就需要用户单独创建一个异步工作线程模块对象,创建出来之后将其交给连接,连接会在合适的时候调用线程池去使用这个对象,进行合适的操作。这个模块的使用逻辑就是这样,下面就是去实现这个模块了
下面就是这个类的实现,很简单。
连接管理模块
这个模块是客户端的最后一个子模块,完成这个模块需要将上面的模块一起整合起来就是客户端了,但是这里需要回忆一下,在RabitMQ设计中客户端的概念是被弱化了的。想要获取服务的时候并不需要实例化一个客户端然后再去发送请求获取服务,而是通过创建一个信道,然后通过信道来向用户提供服务,所以这个模块同样是针对muduo库客户端连接的二次封装,向用户提供创建channel信道的接口,创建信道后,可以通过信道来获取指定服务。
但是从本质来说,这个连接管理模块就是创建了一个客户端,只不过说法上这里是创建了一个连接,然后创建信道来实现了服务。所以说这个模块和服务端那里的client在客户端的实现上代码是一样的,需要告诉客户端收到消息之后要怎么处理,然后当收到消息之后通过协议分发器就能够将不同的响应做出处理了,所以我才说和服务端那里是差不多一样的。只不过客户端要处理的响应很少只有两种,一种是基础的响应,另外一种就是消息推送过来的一个响应。收到基础响应的处理逻辑就是将这个响应放入到基础响应的哈希map中去,收到消息推送响应的处理逻辑就是将这个响应通过调用信道的业务处理接口,将这个推送消息封装为一个任务放入到线程池中,这就是这个客户端的实现逻辑(连接管理模块),这里只不过是将客户端修改为了连接而已。
下面就是这个模块的实现了,基本就是将之前写的服务器的代码上修改一下即可,首先要修改的就是修改成员函数为一下这些
然后就是构造模块了:
然后就是两个业务处理函数了:
然后两个向外提供的接口
然后要做的事情就是去实现信道的创建与关闭,以及两个业务函数实现。到这里连接模块的架子就搭建出来了。这里我将连接服务器的功能直接放到了构造函数中,只要这个对象能够构造出来,那么连接服务器也一定是成功的。下面要做的事情就是去完成这几个业务函数了。首先就是创建信道的函数,逻辑就是创建一个信道对象,但是我本地创建是不够的服务端也必须存在一个一样的信道才能行,这样服务端才能为客户端提供服务。所以在我本地创建了信道之后,还需要向服务器发送一个信道创建请求:
这样我就需要在这个连接模块中去完成一个同步的逻辑了,但是这个模块是没有条件变量的,难道又要加上条件变量吗?也不需要,我们可以将打开信道这个操作直接实现到信道提供的服务中。这样这里的这个函数直接调用一下这个接口就可以了。
将这个实现放到channel这个类的功能实现中,然后在连接管理模块中的这个函数就可以变成
然后就是关闭信道的功能了,这个功能依旧需要向远端的服务器进行请求的发送,所以依旧会在channel类中实现一个功能。
然后就可以去实现这个关闭信道的函数了:但是不要忘了所有的信道还有一个管理对象,当这个信道关闭的时候对应的管理对象也需要知道这个信道被删除了,所以代码就是下面这样的:
然后就是第一个业务函数了,当客户端收到了一个普通的响应的业务函数,这个业务函数的逻辑就是首先找到这个信道,然后将这个请求id和这个响应一起放入到信道的普通响应map表中
然后就是收到消息推送响应之后的处理了,实现逻辑:首先依旧是找到这个信道,然后封装异步任务将其抛入到线程池中进行执行。
这样这个业务函数也就完成了。到这里这个模块也就完成了。
下面要做的事情就是去编写两个客户端一个生产者客户端,一个消费者客户端,然后就可以和服务器一起进行功能联调了。
功能联调测试
测试逻辑:
搭建客户端:
- 发布消息的生产者客户端
- 订阅消息的消费者客户端
思想:
- 必须有一个生产者客户端
- 声明一个交换机 exchange1
- 声明一个队列 queue1, binding_key=queue1
- 声明一个队列 queue2, binding_key= news.music.*
- 将两个队列和交换机绑定起来
- 搭建两个消费者客户端,分别各自订阅一个队列的消息
测试:
- 第一次,将交换机类型,定义为广播交换模式:理论结果,两个消费者客户端分别都能拿到消息
- 第二次,将交换机类型,定义为直接交换模式:routing_key=queue1 理论结果,只有queue1能拿到消息
- 第三次,将交换机类型,定义为主题交换模式:routing_key="news.music.pop",理论结果,只有queue2能拿到消息
上面就是这个测试的逻辑,下面就来根据这个逻辑进行代码的编写以及测试,下面首先来实现一个消息发布客户端,实现逻辑如下:
下面就是这个发布消息客户端的实现
然后就是对这个客户端进行编译了,首先就是makefile文件中的内容了:
编译测试成功:
这样发布者客户端也就完成了。
下面就是消费者客户端的实现了,为了完成上面的测试消费者客户端需要完成两个,操作流程和生产者客户端并没有太大的区别,需要先去声明交换机,声明队列(即使这个交换机/队列在服务器中已经存在了,这是为了防止如果这个交换机没有存在呢),和上面的代码不同点在于消费者需要去订阅一个队列中的信息。并不需要去进行消息的发布,并且这个客户端还需要一个消息的处理方法。
这样这个模块的大体框架就完成了,然后就是去完成消费函数了,在完成这个消费函数的时候,我就发现了我之前的信道模块设计的不合理之处,这里我打算直接将这个消息打印出来,但是将消息打印处理之后,我需要向服务器发送消息确认的请求,发送请求需要使用信道对象,这很好解决,我直接将增加一个函数参数即可。但是如果我调用这个信道中的消息确认函数:
就会发现一个问题,我怎么找到这个队列呢?这就是设计的不合理之处了,用户是知道当前自己订阅的队列的名字的,但是我这里因为函数设计的缘故是不知道的,但是这个成为消费者的信道自己是有记录的,所以这个ack函数其实并不需要外部传入队列的名字,只需要从信道的成员的_consummer中获取队列的名字即可。但是这就需要这个成员对象一定是要存在的。
这样这个接口才是合理的。
这样就能够完成这个消费函数了,但是因为我增加了一个参数,所以需要修改一下信道的消费者函数绑定的代码:
这样这个客户端才是完成了,下面就是对这个客户端的编译了。也就是makefile文件:
编译成功
下面就是进行功能联调了。
首先启动服务器:
然后是三个客户端一个生产者两个消费者,一个消费者消费queue1,一个消费者消费queue2,因为这一次的交换模式是广播交换所以在生产者生产消息后两个消费者都能够得到消息。
这里的提示是正常的,因为推送客户端没有进行消费所以不是消费者客户端。
然后两个消费者客户端都能够收到消息:
这里我再将所有的客户端都退出,因为进行了消息确认,所以在服务器重新启动之后,不会加载任何的有效信息:
符合逻辑。广播交换的测试就通过了。
下面就是测试直接交换了:
需要修改的代码:
将两个客户端中的广播交换替换成为直接交换。
消费者就不需要进行修改了,但是在发布的时候就需要设定routingkey了,决定消息发布给哪一个队列
此时就只有queue1才会收到消息了,也就是运行之后订阅queue2的消费者不会得到消息,而订阅queue1的消费者才会得到消息
这也就证明了,直接交换模式也是没有问题的。
下面要测试的就是主题交换了,两个客户端的修改方式和上面是一样的,是将客户端的方式修改为了主题交换。然后发布客户端的routingkey就不是queue1了,而是下面的这个,并且我在循环外还设定了一个routingkey
这样就看queue2能否收到条消息了,最后一条消息queue也收不到,因为不符合条件。
下面就是测试了
queue2成功收到了11条信息,而queue1则一条信息都没有收到,因为匹配失败了。到这里简化的消息队列的发布和订阅功能就完成了。
项目总结以及项目可以继续深入的部分
从这个项目能够知道的就是这个消息队列能够解决的就是忙先不均,起到负载均衡作用,并且是在各个不同的主机上,而不再是在一个主机上。
首先明确我们所实现的项目:仿RabbitMQ实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能(这里还应该有客户端的按需获取功能,也就是消费者客户端能够根据自己的需求从服务器上拉取消息过来)。
其次项目中所用到的技术:基于muduo库实现底层网络通信服务器和客户端的搭建,在应用层基于protobuf协议设计应用层协议接口,在数据管理上使用了轻量数据库sqlite来进行数据的持久化管理,以及基于AMQP模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用gtest框架进行单元测试,完成项目的最终实现。但是对于rabitMQ中的其它很多细节的功能这里并没有实现,比如这个项目的虚拟机只有一个,但是在RabbitMQ中是具有多个虚拟机的,并且RabbitMQ完整项目中是支持网站对虚拟机进行配置的,但是我这里并没有实现这些细节的功能。
- 项目的扩展功能
- 虚拟主机管理(多个虚拟机然后进行管理)
- 用户管理/用户认证(多个虚拟机就会存在多个虚拟机,此时就能够去实现队列的独占功能)
- 交换机/队列的独占模式和自动删除
- 发送方确认(broker给生产者的确认应答,现在生产者发布了一条信息,但是服务端并没有给这个客户端进行消息的确认,以及服务端将信息推送给客户端之后,如果客户端没有对消息进行确认怎么办,难道就不管了吗?此时就有一直死等或者一段时间后重新对消息进行消费等等的方法)
- 消息的管理方式
- 管理接口
- 管理页面