上一篇以及上上篇基本介绍了MetaQ如何使用Gecko框架在网络上传输数据,今天将继续进一步介绍在Broker,各种命令的处理逻辑(暂时将不涉及到事务处理)。
依旧是在MetaMorphosisBroker的registerProcessors()方法中,我们可以注意到一点,每个Processor的实例在构造的时候都注入了一个brokerProcessor的变量,该变量的类型为CommandProcessor。其实,各个Processor的业务逻辑又委托给了CommandProcessor进行处理,比如我们看看其中的GetProcessor的源码:
1. public class GetProcessor implements RequestProcessor<GetCommand> {
2. public static final Logger log = LoggerFactory.getLogger(GetProcessor.class);
3.
4. private final ThreadPoolExecutor executor;
5.
6. private final CommandProcessor processor;
7.
8. public GetProcessor(final CommandProcessor processor, final ThreadPoolExecutor executor) {
9. this.processor = processor;
10. this.executor = executor;
11. }
12.
13. @Override
14. public ThreadPoolExecutor getExecutor() {
15. return this.executor;
16. }
17.
18. @Override
19. public void handleRequest(final GetCommand request, final Connection conn) {
20. // Processor并没有处理具体的业务逻辑,而是将业务逻辑交给CommandProcessor的processGetCommand()进行处理,Processor只是将处理结果简单的返回给客户端
21. final ResponseCommand response = this.processor.processGetCommand(request, SessionContextHolder.getOrCreateSessionContext(conn, null));
22. if (response != null) {
23. RemotingUtils.response(conn, response);
24. }
25. }
26. }
CommandProcessor业务逻辑的处理模块采用责任链的处理方式,目前来说只有两个类型的业务逻辑处理单元:带有事务处理(TransactionalCommandProcessor)的和不带有事务处理(BrokerCommandProcessor)的。老习惯,先上类图:
CommandProcessor接口定义如下:
1. public interface CommandProcessor extends Service {
2. //处理Put命令,结果通过PutCallback的回调返回
3. public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) throws Exception;
4. //处理Get命令
5. public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx);
6.
7. /**
8. * Under conditions that cannot use notify-remoting directly.
9. */
10. //处理Get命令,并根据条件zeroCopy是否使用zeroCopy
11. public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx, final boolean zeroCopy);
12. //处理查询最近可用offset位置请求
13. public ResponseCommand processOffsetCommand(OffsetCommand request, final SessionContext ctx);
14. //处理退出请求
15. public void processQuitCommand(QuitCommand request, final SessionContext ctx);
16.
17. public ResponseCommand processVesionCommand(VersionCommand request, final SessionContext ctx);
18. //处理统计请求
19. public ResponseCommand processStatCommand(StatsCommand request, final SessionContext ctx);
20. //下面主要定义与事务相关的方法,暂时先不介绍
21. public void removeTransaction(final XATransactionId xid);
22.
23. public Transaction getTransaction(final SessionContext context, final TransactionId xid) throws MetamorphosisException, XAException;
24.
25. public void forgetTransaction(final SessionContext context, final TransactionId xid) throws Exception;
26.
27. public void rollbackTransaction(final SessionContext context, final TransactionId xid) throws Exception;
28.
29. public void commitTransaction(final SessionContext context, final TransactionId xid, final boolean onePhase) throws Exception;
30.
31. public int prepareTransaction(final SessionContext context, final TransactionId xid) throws Exception;
32.
33. public void beginTransaction(final SessionContext context, final TransactionId xid, final int seconds) throws Exception;
34.
35. public TransactionId[] getPreparedTransactions(final SessionContext context, String uniqueQualifier) throws Exception;
36. }
细心的读者会发现,每个定义的方法的参数都有一个参数SessionContext,SessionContext携带了连接的信息,由Broker创建,具体代码见SessionContextHolder的getOrCreateSessionContext()方法,getOrCreateSessionContext()方法在Processor委托给CommandProcessor处理业务逻辑时被调用。
BrokerCommandProcessor和TransactionalCommandProcessor其实就是各模块的粘合剂,将各模块的功能统一协调形成整体对外提供功能。BrokerCommandProcessor的实现并不难理解,下面让我们来具体分析一下BrokerCommandProcessor这个类:
1. //Put请求的业务逻辑处理
2. @Override
3. public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) {
4. final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition();
5. //统计计算
6. this.statsManager.statsPut(request.getTopic(), partitionString, 1);
7. this.statsManager.statsMessageSize(request.getTopic(), request.getData().length);
8. int partition = -1;
9. try {
10. //如果对应存储的分区已经关闭,则拒绝该消息
11. if (this.metaConfig.isClosedPartition(request.getTopic(), request.getPartition())) {
12. log.warn("Can not put message to partition " + request.getPartition() + " for topic=" + request.getTopic() + ",it was closed");
13. if (cb != null) {
14. cb.putComplete(new BooleanCommand(HttpStatus.Forbidden, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:partition[" + partitionString + "] has been closed", request.getOpaque()));
15. }
16. return;
17. }
18.
19. partition = this.getPartition(request);
20. //获取对应Topic分区的MessageStore实例
21. final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition);
22. // 如果是动态添加的topic,需要注册到zk
23. //就到目前为止,我着实没想明白下面这句代码的用途是什么?
24. //如果topic没有在该Broker的配置中配置,在MessageStoreManager中的isLegalTopic()方法中检查就通不过而抛出异常,那么下面这句代码怎么样都不会被执行,而Client要向Broker发送消息,一定要先发布topic,保证topic在zk发布;
25. this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false);
26. // 设置唯一id
27. final long messageId = this.idWorker.nextId();
28. //存储消息,之前的文章介绍过Broker的存储使用回调的方式,易于异步的实现,代码简单不分析
29. store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb));
30. } catch (final Exception e) {
31. //发生异常,统计计算回滚
32. this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1);
33. log.error("Put message failed", e);
34. if (cb != null) {
35. //返回结果
36. cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), partition) + "Detail:" + e.getMessage(), request.getOpaque()));
37. }
38. }
39. }
40.
41. @Override
42. // GET请求的业务逻辑处理
43. public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx) {
44. //默认为zeroCopy
45. return this.processGetCommand(request, ctx, true);
46. }
47.
48. @Override
49. public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) {
50. //获取查询信息
51. final String group = request.getGroup();
52. final String topic = request.getTopic();
53. //统计计数(请求数统计)
54. this.statsManager.statsGet(topic, group, 1);
55.
56. // 如果分区被关闭,禁止读数据 --wuhua
57. if (this.metaConfig.isClosedPartition(topic, request.getPartition())) {
58. log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,");
59. return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque());
60. }
61. //获取topic对应分区的MessageStore实例,如果实例不存在,则返回NotFound
62. final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition());
63. if (store == null) {
64. //统计计数
65. this.statsManager.statsGetMiss(topic, group, 1);
66. return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque());
67. }
68. //如果请求的起始位置<0,判定该请求无效
69. if (request.getMaxSize() <= 0) {
70. return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque());
71. }
72. try {
73. //读取由request.getOffset()开始的消息集合
74. final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));
75. //如果当前消息集不为空
76. if (set != null) {
77. //判断是否zeroCopy,如果是zeroCopy,则直接写;如果不是,则将消息集包装成DataCommand,这也就是前面为什么说DataCommand要实现encode()方法的缘故
78. if (zeroCopy) {
79. set.write(request, ctx);
80. return null;
81. } else {
82. // refer to the code of line 440 in MessageStore
83. // create two copies of byte array including the byteBuffer
84. // and new bytes
85. // this may not a good use case of Buffer
86. final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize()));
87. set.read(byteBuffer);
88. byteBuffer.flip();
89. final byte[] bytes = new byte[byteBuffer.remaining()];
90. byteBuffer.get(bytes);
91. return new DataCommand(bytes, request.getOpaque());
92. }
93. } else {
94. //如果为空消息集,则认为请求无效
95. //统计计数
96. this.statsManager.statsGetMiss(topic, group, 1);
97. this.statsManager.statsGetFailed(topic, group, 1);
98.
99. // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量.
100. final long maxOffset = store.getMaxOffset();
101. final long requestOffset = request.getOffset();
102. if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) {
103. log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group);
104. this.statsManager.statsOffset(topic, group, 1);
105. return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque());
106. } else {
107. return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque());
108. }
109. }
110. } catch (final ArrayIndexOutOfBoundsException e) {
111. log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic);
112. // 告知最近可用的offset
113. this.statsManager.statsGetMiss(topic, group, 1);
114. this.statsManager.statsGetFailed(topic, group, 1);
115. final long validOffset = store.getNearestOffset(request.getOffset());
116. this.statsManager.statsOffset(topic, group, 1);
117. return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque());
118. } catch (final Throwable e) {
119. log.error("Could not get message from position " + request.getOffset(), e);
120. this.statsManager.statsGetFailed(topic, group, 1);
121. return new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:" + e.getMessage(), request.getOpaque());
122. }
123. }
124.
125. //查询最近可用offset请求的业务逻辑处理
126. @Override
127. public ResponseCommand processOffsetCommand(final OffsetCommand request, final SessionContext ctx) {
128. //统计计数
129. this.statsManager.statsOffset(request.getTopic(), request.getGroup(), 1);
130. //获取topic对应分区的MessageStore实例
131. final MessageStore store = this.storeManager.getMessageStore(request.getTopic(), request.getPartition());
132. //如果为空,则返回未找到
133. if (store == null) {
134. return new BooleanCommand(HttpStatus.NotFound, "The topic `" + request.getTopic() + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque());
135. }
136. //获取topic对应分区最近可用的offset
137. final long offset = store.getNearestOffset(request.getOffset());
138. return new BooleanCommand(HttpStatus.Success, String.valueOf(offset), request.getOpaque());
139. }
140.
141. //退出请求业务逻辑处理
142. @Override
143. public void processQuitCommand(final QuitCommand request, final SessionContext ctx) {
144. try {
145. if (ctx.getConnection() != null) {
146. //关闭与客户端的连接
147. ctx.getConnection().close(false);
148. }
149. } catch (final NotifyRemotingException e) {
150. // ignore
151. }
152. }
153.
154. //版本查询请求业务逻辑处理
155. @Override
156. public ResponseCommand processVesionCommand(final VersionCommand request, final SessionContext ctx) {
157. //返回当前Broker版本
158. return new BooleanCommand(HttpStatus.Success, BuildProperties.VERSION, request.getOpaque());
159. }
160.
161. //统计请求查询业务逻辑处理
162. @Override
163. public ResponseCommand processStatCommand(final StatsCommand request, final SessionContext ctx) {
164. //判断类型,如果类型以config 开头,则传输整个配置文件
165. final String item = request.getItem();
166. if ("config".equals(item)) {
167. return this.processStatsConfig(request, ctx);
168. } else {
169. //如果是获取统计结果,则从统计模块获取响应结果并返回给客户端
170. final String statsInfo = this.statsManager.getStatsInfo(item);
171. return new BooleanCommand(HttpStatus.Success, statsInfo, request.getOpaque());
172. }
173. }
174.
175. //获取配置文件内容,使用zeroCopy将文件内容发送到客户端,构造的响应用BooleanCommand
176. @SuppressWarnings("resource")
177. private ResponseCommand processStatsConfig(final StatsCommand request, final SessionContext ctx) {
178. try {
179. final FileChannel fc = new FileInputStream(this.metaConfig.getConfigFilePath()).getChannel();
180. // result code length opaque\r\n
181. IoBuffer buf = IoBuffer.allocate(11 + 3 + ByteUtils.stringSize(fc.size()) + ByteUtils.stringSize(request.getOpaque()));
182. ByteUtils.setArguments(buf, MetaEncodeCommand.RESULT_CMD, HttpStatus.Success, fc.size(), request.getOpaque());
183. buf.flip();
184. ctx.getConnection().transferFrom(buf, null, fc, 0, fc.size(), request.getOpaque(),
185. new SingleRequestCallBackListener() {
186. @Override
187. public void onResponse(ResponseCommand responseCommand, Connection conn) {
188. this.closeChannel();
189. }
190.
191. @Override
192. public void onException(Exception e) {
193. this.closeChannel();
194. }
195.
196. private void closeChannel() {
197. try {
198. fc.close();
199. } catch (IOException e) {
200. log.error("IOException while stats config", e);
201. }
202. }
203.
204. @Override
205. public ThreadPoolExecutor getExecutor() {
206. return null;
207. }
208. }, 5000, TimeUnit.MILLISECONDS);
209. } catch (FileNotFoundException e) {
210. log.error("Config file not found:" + this.metaConfig.getConfigFilePath(), e);
211. return new BooleanCommand(HttpStatus.InternalServerError, "Config file not found:" + this.metaConfig.getConfigFilePath(), request.getOpaque());
212. } catch (IOException e) {
213. log.error("IOException while stats config", e);
214. return new BooleanCommand(HttpStatus.InternalServerError, "Read config file error:" + e.getMessage(), request.getOpaque());
215. } catch (NotifyRemotingException e) {
216. log.error("NotifyRemotingException while stats config", e);
217. }
218. return null;
219. }
如果不使用内容的事务,Broker已经完成了从网络接收数据—>处理请求(存储消息/查询结果等)—>返回结果的流程,Broker最基础的流程已经基本分析完毕。