0
点赞
收藏
分享

微信扫一扫

MetaQ技术内幕——源码分析(八)


上一篇以及上上篇基本介绍了MetaQ如何使用Gecko框架在网络上传输数据,今天将继续进一步介绍在Broker,各种命令的处理逻辑(暂时将不涉及到事务处理)。

依旧是在MetaMorphosisBroker的registerProcessors()方法中,我们可以注意到一点,每个Processor的实例在构造的时候都注入了一个brokerProcessor的变量,该变量的类型为CommandProcessor。其实,各个Processor的业务逻辑又委托给了CommandProcessor进行处理,比如我们看看其中的GetProcessor的源码:



1. public class GetProcessor implements RequestProcessor<GetCommand> { 
2.     public static final Logger log = LoggerFactory.getLogger(GetProcessor.class); 
3.  
4.     private final ThreadPoolExecutor executor; 
5.  
6.     private final CommandProcessor processor; 
7.  
8.     public GetProcessor(final CommandProcessor processor, final ThreadPoolExecutor executor) { 
9.         this.processor = processor; 
10.         this.executor = executor; 
11.     } 
12.  
13.     @Override 
14.     public ThreadPoolExecutor getExecutor() { 
15.         return this.executor; 
16.     } 
17.  
18.     @Override 
19.     public void handleRequest(final GetCommand request, final Connection conn) { 
20.         // Processor并没有处理具体的业务逻辑,而是将业务逻辑交给CommandProcessor的processGetCommand()进行处理,Processor只是将处理结果简单的返回给客户端 
21. final ResponseCommand response = this.processor.processGetCommand(request, SessionContextHolder.getOrCreateSessionContext(conn, null)); 
22.         if (response != null) { 
23.             RemotingUtils.response(conn, response); 
24.         } 
25.     } 
26. }


CommandProcessor业务逻辑的处理模块采用责任链的处理方式,目前来说只有两个类型的业务逻辑处理单元:带有事务处理(TransactionalCommandProcessor)的和不带有事务处理(BrokerCommandProcessor)的。老习惯,先上类图:

MetaQ技术内幕——源码分析(八)_sed

CommandProcessor接口定义如下:




1. public interface CommandProcessor extends Service { 
2.     //处理Put命令,结果通过PutCallback的回调返回 
3.     public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) throws Exception; 
4.      //处理Get命令 
5.     public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx); 
6.  
7.     /**
8.      * Under conditions that cannot use notify-remoting directly.
9.      */ 
10.      //处理Get命令,并根据条件zeroCopy是否使用zeroCopy 
11.     public ResponseCommand processGetCommand(GetCommand request, final SessionContext ctx, final boolean zeroCopy); 
12.     //处理查询最近可用offset位置请求 
13.     public ResponseCommand processOffsetCommand(OffsetCommand request, final SessionContext ctx); 
14.     //处理退出请求 
15.     public void processQuitCommand(QuitCommand request, final SessionContext ctx); 
16.     
17.     public ResponseCommand processVesionCommand(VersionCommand request, final SessionContext ctx); 
18.     //处理统计请求 
19.     public ResponseCommand processStatCommand(StatsCommand request, final SessionContext ctx); 
20.     //下面主要定义与事务相关的方法,暂时先不介绍 
21.     public void removeTransaction(final XATransactionId xid); 
22.  
23.     public Transaction getTransaction(final SessionContext context, final TransactionId xid) throws MetamorphosisException, XAException; 
24.  
25.     public void forgetTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
26.  
27.     public void rollbackTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
28.  
29.     public void commitTransaction(final SessionContext context, final TransactionId xid, final boolean onePhase) throws Exception; 
30.  
31.     public int prepareTransaction(final SessionContext context, final TransactionId xid) throws Exception; 
32.  
33.     public void beginTransaction(final SessionContext context, final TransactionId xid, final int seconds) throws Exception; 
34.  
35.     public TransactionId[] getPreparedTransactions(final SessionContext context, String uniqueQualifier) throws Exception; 
36. }





细心的读者会发现,每个定义的方法的参数都有一个参数SessionContext,SessionContext携带了连接的信息,由Broker创建,具体代码见SessionContextHolder的getOrCreateSessionContext()方法,getOrCreateSessionContext()方法在Processor委托给CommandProcessor处理业务逻辑时被调用。

BrokerCommandProcessor和TransactionalCommandProcessor其实就是各模块的粘合剂,将各模块的功能统一协调形成整体对外提供功能。BrokerCommandProcessor的实现并不难理解,下面让我们来具体分析一下BrokerCommandProcessor这个类:




1. //Put请求的业务逻辑处理 
2. @Override 
3. public void processPutCommand(final PutCommand request, final SessionContext sessionContext, final PutCallback cb) { 
4.         final String partitionString = this.metaConfig.getBrokerId() + "-" + request.getPartition(); 
5. //统计计算 
6.         this.statsManager.statsPut(request.getTopic(), partitionString, 1); 
7.         this.statsManager.statsMessageSize(request.getTopic(), request.getData().length); 
8.         int partition = -1; 
9.         try { 
10. //如果对应存储的分区已经关闭,则拒绝该消息 
11.             if (this.metaConfig.isClosedPartition(request.getTopic(), request.getPartition())) { 
12.                 log.warn("Can not put message to partition " + request.getPartition() + " for topic=" + request.getTopic() + ",it was closed"); 
13.                 if (cb != null) { 
14.                     cb.putComplete(new BooleanCommand(HttpStatus.Forbidden, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:partition[" + partitionString + "] has been closed", request.getOpaque())); 
15.                 } 
16.                 return; 
17.             } 
18.  
19.             partition = this.getPartition(request); 
20. //获取对应Topic分区的MessageStore实例 
21.             final MessageStore store = this.storeManager.getOrCreateMessageStore(request.getTopic(), partition); 
22.             // 如果是动态添加的topic,需要注册到zk 
23. //就到目前为止,我着实没想明白下面这句代码的用途是什么?  
24. //如果topic没有在该Broker的配置中配置,在MessageStoreManager中的isLegalTopic()方法中检查就通不过而抛出异常,那么下面这句代码怎么样都不会被执行,而Client要向Broker发送消息,一定要先发布topic,保证topic在zk发布;  
25.             this.brokerZooKeeper.registerTopicInZk(request.getTopic(), false); 
26.             // 设置唯一id 
27.             final long messageId = this.idWorker.nextId(); 
28.             //存储消息,之前的文章介绍过Broker的存储使用回调的方式,易于异步的实现,代码简单不分析 
29. store.append(messageId, request, new StoreAppendCallback(partition, partitionString, request, messageId, cb)); 
30.         } catch (final Exception e) { 
31. //发生异常,统计计算回滚 
32.             this.statsManager.statsPutFailed(request.getTopic(), partitionString, 1); 
33.             log.error("Put message failed", e); 
34.             if (cb != null) { 
35. //返回结果 
36.                 cb.putComplete(new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), partition) + "Detail:" + e.getMessage(), request.getOpaque())); 
37.             } 
38.         } 
39.     } 
40.  
41. @Override 
42. // GET请求的业务逻辑处理 
43. public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx) { 
44. //默认为zeroCopy 
45.         return this.processGetCommand(request, ctx, true); 
46.     } 
47.  
48.     @Override 
49.     public ResponseCommand processGetCommand(final GetCommand request, final SessionContext ctx, final boolean zeroCopy) { 
50. //获取查询信息 
51.         final String group = request.getGroup(); 
52.         final String topic = request.getTopic(); 
53. //统计计数(请求数统计) 
54.         this.statsManager.statsGet(topic, group, 1); 
55.  
56.         // 如果分区被关闭,禁止读数据 --wuhua 
57.         if (this.metaConfig.isClosedPartition(topic, request.getPartition())) { 
58.             log.warn("can not get message for topic=" + topic + " from partition " + request.getPartition() + ",it closed,"); 
59.             return new BooleanCommand(HttpStatus.Forbidden, "Partition[" + this.metaConfig.getBrokerId() + "-" + request.getPartition() + "] has been closed", request.getOpaque()); 
60.         } 
61. //获取topic对应分区的MessageStore实例,如果实例不存在,则返回NotFound 
62.         final MessageStore store = this.storeManager.getMessageStore(topic, request.getPartition()); 
63.         if (store == null) { 
64. //统计计数 
65.             this.statsManager.statsGetMiss(topic, group, 1); 
66.             return new BooleanCommand(HttpStatus.NotFound, "The topic `" + topic + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); 
67.         } 
68. //如果请求的起始位置<0,判定该请求无效 
69.         if (request.getMaxSize() <= 0) { 
70.             return new BooleanCommand(HttpStatus.BadRequest, "Bad request,invalid max size:" + request.getMaxSize(), request.getOpaque()); 
71.         } 
72.         try { 
73. //读取由request.getOffset()开始的消息集合 
74.             final MessageSet set = store.slice(request.getOffset(), Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); 
75. //如果当前消息集不为空 
76.             if (set != null) { 
77. //判断是否zeroCopy,如果是zeroCopy,则直接写;如果不是,则将消息集包装成DataCommand,这也就是前面为什么说DataCommand要实现encode()方法的缘故 
78.                 if (zeroCopy) { 
79.                     set.write(request, ctx); 
80.                     return null; 
81.                 } else { 
82.                     // refer to the code of line 440 in MessageStore 
83.                     // create two copies of byte array including the byteBuffer 
84.                     // and new bytes 
85.                     // this may not a good use case of Buffer 
86.                     final ByteBuffer byteBuffer = ByteBuffer.allocate(Math.min(this.metaConfig.getMaxTransferSize(), request.getMaxSize())); 
87.                     set.read(byteBuffer); 
88.                     byteBuffer.flip(); 
89.                     final byte[] bytes = new byte[byteBuffer.remaining()]; 
90.                     byteBuffer.get(bytes); 
91.                     return new DataCommand(bytes, request.getOpaque()); 
92.                 } 
93.             } else { 
94. //如果为空消息集,则认为请求无效 
95. //统计计数 
96.                 this.statsManager.statsGetMiss(topic, group, 1); 
97.                 this.statsManager.statsGetFailed(topic, group, 1); 
98.  
99.                 // 当请求的偏移量大于实际最大值时,返回给客户端实际最大的偏移量. 
100.                 final long maxOffset = store.getMaxOffset(); 
101.                 final long requestOffset = request.getOffset(); 
102.                 if (requestOffset > maxOffset && (this.metaConfig.isUpdateConsumerOffsets() || requestOffset == Long.MAX_VALUE)) { 
103.                     log.info("offset[" + requestOffset + "] is exceeded,tell the client real max offset: " + maxOffset + ",topic=" + topic + ",group=" + group); 
104.                     this.statsManager.statsOffset(topic, group, 1); 
105.                     return new BooleanCommand(HttpStatus.Moved, String.valueOf(maxOffset), request.getOpaque()); 
106.                 } else { 
107.                     return new BooleanCommand(HttpStatus.NotFound, "Could not find message at position " + requestOffset, request.getOpaque()); 
108.                 } 
109.             } 
110.         } catch (final ArrayIndexOutOfBoundsException e) { 
111.             log.error("Could not get message from position " + request.getOffset() + ",it is out of bounds,topic=" + topic); 
112.             // 告知最近可用的offset 
113.             this.statsManager.statsGetMiss(topic, group, 1); 
114.             this.statsManager.statsGetFailed(topic, group, 1); 
115.             final long validOffset = store.getNearestOffset(request.getOffset()); 
116.             this.statsManager.statsOffset(topic, group, 1); 
117.             return new BooleanCommand(HttpStatus.Moved, String.valueOf(validOffset), request.getOpaque()); 
118.         } catch (final Throwable e) { 
119.             log.error("Could not get message from position " + request.getOffset(), e); 
120.             this.statsManager.statsGetFailed(topic, group, 1); 
121.             return new BooleanCommand(HttpStatus.InternalServerError, this.genErrorMessage(request.getTopic(), request.getPartition()) + "Detail:" + e.getMessage(), request.getOpaque()); 
122.         } 
123.     } 
124.  
125. //查询最近可用offset请求的业务逻辑处理 
126. @Override 
127.     public ResponseCommand processOffsetCommand(final OffsetCommand request, final SessionContext ctx) { 
128. //统计计数 
129.         this.statsManager.statsOffset(request.getTopic(), request.getGroup(), 1); 
130. //获取topic对应分区的MessageStore实例 
131.         final MessageStore store = this.storeManager.getMessageStore(request.getTopic(), request.getPartition()); 
132. //如果为空,则返回未找到 
133.         if (store == null) { 
134.             return new BooleanCommand(HttpStatus.NotFound, "The topic `" + request.getTopic() + "` in partition `" + request.getPartition() + "` is not exists", request.getOpaque()); 
135.         } 
136.         //获取topic对应分区最近可用的offset 
137. final long offset = store.getNearestOffset(request.getOffset()); 
138.         return new BooleanCommand(HttpStatus.Success, String.valueOf(offset), request.getOpaque()); 
139.     } 
140.  
141. //退出请求业务逻辑处理 
142.     @Override 
143.     public void processQuitCommand(final QuitCommand request, final SessionContext ctx) { 
144.         try { 
145.             if (ctx.getConnection() != null) { 
146.                 //关闭与客户端的连接 
147.                 ctx.getConnection().close(false); 
148.             } 
149.         } catch (final NotifyRemotingException e) { 
150.             // ignore 
151.         } 
152.     } 
153.  
154. //版本查询请求业务逻辑处理 
155. @Override 
156.     public ResponseCommand processVesionCommand(final VersionCommand request, final SessionContext ctx) { 
157. //返回当前Broker版本 
158.         return new BooleanCommand(HttpStatus.Success, BuildProperties.VERSION, request.getOpaque()); 
159.     } 
160.  
161. //统计请求查询业务逻辑处理 
162.     @Override 
163.     public ResponseCommand processStatCommand(final StatsCommand request, final SessionContext ctx) { 
164. //判断类型,如果类型以config 开头,则传输整个配置文件 
165.         final String item = request.getItem(); 
166.         if ("config".equals(item)) { 
167.             return this.processStatsConfig(request, ctx); 
168.         } else { 
169. //如果是获取统计结果,则从统计模块获取响应结果并返回给客户端 
170.             final String statsInfo = this.statsManager.getStatsInfo(item); 
171.             return new BooleanCommand(HttpStatus.Success, statsInfo, request.getOpaque()); 
172.         } 
173.     } 
174.  
175.     //获取配置文件内容,使用zeroCopy将文件内容发送到客户端,构造的响应用BooleanCommand 
176. @SuppressWarnings("resource") 
177.     private ResponseCommand processStatsConfig(final StatsCommand request, final SessionContext ctx) { 
178.         try { 
179.             final FileChannel fc = new FileInputStream(this.metaConfig.getConfigFilePath()).getChannel(); 
180.             // result code length opaque\r\n 
181.             IoBuffer buf = IoBuffer.allocate(11 + 3 + ByteUtils.stringSize(fc.size()) + ByteUtils.stringSize(request.getOpaque())); 
182.             ByteUtils.setArguments(buf, MetaEncodeCommand.RESULT_CMD, HttpStatus.Success, fc.size(), request.getOpaque()); 
183.             buf.flip(); 
184.             ctx.getConnection().transferFrom(buf, null, fc, 0, fc.size(), request.getOpaque(), 
185.                     new SingleRequestCallBackListener() { 
186.                         @Override 
187.                         public void onResponse(ResponseCommand responseCommand, Connection conn) { 
188.                             this.closeChannel(); 
189.                         } 
190.  
191.                         @Override 
192.                         public void onException(Exception e) { 
193.                             this.closeChannel(); 
194.                         } 
195.  
196.                         private void closeChannel() { 
197.                             try { 
198.                                 fc.close(); 
199.                             } catch (IOException e) { 
200.                                 log.error("IOException while stats config", e); 
201.                             } 
202.                         } 
203.  
204.                         @Override 
205.                         public ThreadPoolExecutor getExecutor() { 
206.                             return null; 
207.                         } 
208.                     }, 5000, TimeUnit.MILLISECONDS); 
209.         } catch (FileNotFoundException e) { 
210.             log.error("Config file not found:" + this.metaConfig.getConfigFilePath(), e); 
211.             return new BooleanCommand(HttpStatus.InternalServerError, "Config file not found:" + this.metaConfig.getConfigFilePath(), request.getOpaque()); 
212.         } catch (IOException e) { 
213.             log.error("IOException while stats config", e); 
214.             return new BooleanCommand(HttpStatus.InternalServerError, "Read config file error:" + e.getMessage(), request.getOpaque()); 
215.         } catch (NotifyRemotingException e) { 
216.             log.error("NotifyRemotingException while stats config", e); 
217.         } 
218.         return null; 
219.     }


如果不使用内容的事务,Broker已经完成了从网络接收数据—>处理请求(存储消息/查询结果等)—>返回结果的流程,Broker最基础的流程已经基本分析完毕。

举报

相关推荐

0 条评论