0
点赞
收藏
分享

微信扫一扫

MetaQ技术内幕——源码分析(四)


前面,我们已经把Broker存储最重要的一个类具体分析了一遍,接下来,我们分析一下其删除的策略。前面介绍过Messagestore采用的多文件存储的组织方式,而存储空间不可能无限大,得有一定的删除策略对其进行删除以腾出空间给新的消息。

MetaQ允许自定义删除策略,需要实现接口DeletePolicy,默认提供了两种删除策略:过期删除(DiscardDeletePolicy)和过期打包删除(ArchiveDeletePolicy)。DiscardDeletePolicy和ArchiveDeletePolicy都比较简单,DiscardDeletePolicy主要是对于超过一定时期的文件进行删除,ArchiveDeletePolicy则是先打包备份再删除。

自定义策略是如何被识别和使用的呢,MetaQ定义了DeletePolicyFactory,所有删除策略的实例都由DeletePolicyFactory提供,DeletePolicyFactory对外提供了注册机制,利用反射机制生成实例,每个自定义的删除策略都必须有一个无参构造,DeletePolicyFactory生成实例代码如下:

1. public static DeletePolicy getDeletePolicy(String values) { 
2.         String[] tmps = values.split(","); 
3.         String name = tmps[0]; 
4.         Class<? extends DeletePolicy> clazz = policyMap.get(name); 
5.         if (clazz == null) { 
6.             throw new UnknownDeletePolicyException(name); 
7.         } 
8.         try { 
9.            //直接调用class的newInstance()方法,该方法必须要求有一个无参构造 
10.             DeletePolicy deletePolicy = clazz.newInstance(); 
11.             String[] initValues = null; 
12.             if (tmps.length >= 2) { 
13.                 initValues = new String[tmps.length - 1]; 
14.                 System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1); 
15.             } 
16.             deletePolicy.init(initValues); 
17.             return deletePolicy; 
18.         } 
19.         catch (Exception e) { 
20.             throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e); 
21.         } 
22.     }

DeletePolicy和MessageStore如何结合在一起的呢?则是粘合剂MessageStoreManager,MessageStoreManager是存储模块的管家,负责与其他模块联系,也是MessageStore管理器,管理所有的MessageStore以及其删除策略,MessageStoreManager也是要好好分析的一个类。


1. private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>(); 
2. //前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的 
3.     private final MetaConfig metaConfig; 
4. //参数配置 
5.     private ScheduledThreadPoolExecutor scheduledExecutorService;// = 
6.     // Executors.newScheduledThreadPool(2); 
7.   //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘 
8.     private final DeletePolicy deletePolicy; 
9.   //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的 
10.     private DeletePolicySelector deletePolicySelector; 
11.    
12.     public static final int HALF_DAY = 1000 * 60 * 60 * 12; 
13.   //topic 集合 
14.     private final Set<Pattern> topicsPatSet = new HashSet<Pattern>(); 
15.  
16.     private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>(); 
17. //前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放 
18. //定时提交的任务 
19.     private Scheduler scheduler; 
20. //定时调度器,用于定时调度删除任务 
21.     public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) { 
22.         this.metaConfig   = metaConfig; 
23.         this.deletePolicy = deletePolicy; 
24. //生成策略选择器 
25.         this.newDeletePolicySelector(); 
26. //添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器 
27.         this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() { 
28.             public void propertyChange(final PropertyChangeEvent evt) { 
29.                 MessageStoreManager.this.makeTopicsPatSet(); 
30.                 MessageStoreManager.this.newDeletePolicySelector(); 
31.             } 
32.         }); 
33. //添加匿名监听,监听unflushInternal变化,如果发生变化 
34.         this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() { 
35.             public void propertyChange(final PropertyChangeEvent evt) { 
36.                 MessageStoreManager.this.scheduleFlushTask(); 
37.             } 
38.         }); 
39.         this.makeTopicsPatSet(); 
40.       //初始化调度 
41.         this.initScheduler(); 
42.         // 定时flush,该方法作者有详细注释就不在解释了 
43.         this.scheduleFlushTask(); 
44.     }


MessageStoreManager实现接口Service,在启动是会调用init方法,关闭时调用dispose方法

 
  
 
  
1. public void init() { 
2.         // 加载已有数据并校验 
3.         try { 
4.             this.loadMessageStores(this.metaConfig); 
5.         } catch (final IOException e) { 
6.             log.error("load message stores failed", e); 
7.             throw new MetamorphosisServerStartupException("Initilize message store manager failed", e); 
8.         } catch (InterruptedException e) { 
9.             Thread.currentThread().interrupt(); 
10.         } 
11.         this.startScheduleDeleteJobs(); 
12.     } 
13.  
14. // 
15. private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException { 
16.         final Set<String> paths = new HashSet<String>(); 
17.         // public data path 
18.       //公共数据目录 
19.         paths.add(metaConfig.getDataPath()); 
20.         // topic data path 
21.       //私有数据目录 
22.         for (final String topic : metaConfig.getTopics()) { 
23.             final TopicConfig topicConfig = metaConfig.getTopicConfig(topic); 
24.             if (topicConfig != null) { 
25.                 paths.add(topicConfig.getDataPath()); 
26.             } 
27.         } 
28.         final Set<File> fileSet = new HashSet<File>(); 
29.         for (final String path : paths) { 
30.             //验证数据目录是否存在 
31.             fileSet.add(this.getDataDir(path)); 
32.         } 
33.         return fileSet; 
34.     } 
35.  
36. private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException { 
37. //加载数据目录列表,再加载每个目录下的数据 
38.         for (final File dir : this.getDataDirSet(metaConfig)) { 
39.             this.loadDataDir(metaConfig, dir); 
40.         } 
41.     } 
42.  
43.     private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException { 
44.         log.warn("Begin to scan data path:" + dir.getAbsolutePath()); 
45.         final long start = System.currentTimeMillis(); 
46.         final File[] ls = dir.listFiles(); 
47.         int nThreads = Runtime.getRuntime().availableProcessors() + 1; 
48.         ExecutorService executor = Executors.newFixedThreadPool(nThreads); 
49.         int count = 0; 
50.       //将加载验证每个分区的数据包装成一个个任务 
51.         List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>(); 
52.         for (final File subDir : ls) { 
53.             if (!subDir.isDirectory()) { 
54.                 log.warn("Ignore not directory path:" + subDir.getAbsolutePath()); 
55.             } else { 
56.                 final String name = subDir.getName(); 
57.                 final int index = name.lastIndexOf('-'); 
58.                 if (index < 0) { 
59.                     log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath()); 
60.                     continue; 
61.                 } 
62.                   //包装任务 
63.                 tasks.add(new Callable<MessageStore>() { 
64.                     //回调方法,方法将具体的加载验证分区数据 
65. @Override 
66.                     public MessageStore call() throws Exception { 
67.                         log.warn("Loading data directory:" + subDir.getAbsolutePath() + "..."); 
68.                         final String topic = name.substring(0, index); 
69.                         final int partition = Integer.parseInt(name.substring(index + 1));                   //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略 
70.                         final MessageStore messageStore = new MessageStore(topic, partition, metaConfig, 
71.                                 MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy)); 
72.                         return messageStore; 
73.                     } 
74.                 }); 
75.                 count++; 
76.                 if (count % nThreads == 0 || count == ls.length) { 
77. //如果配置了并行加载,则使用并行加载 
78.                     if (metaConfig.isLoadMessageStoresInParallel()) { 
79.                         this.loadStoresInParallel(executor, tasks); 
80.                     } else { 
81. //串行加载验证数据 
82.                         this.loadStores(tasks); 
83.                     } 
84.                 } 
85.             } 
86.         } 
87.         executor.shutdownNow(); 
88.         log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs"); 
89.     }

在init方法中做的一件事情就是加载校验已有的数据,加载校验的方式有两种个,串行和并行。


1. //串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序 
2. private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException { 
3.         for (Callable<MessageStore> task : tasks) { 
4.             MessageStore messageStore; 
5.             try { 
6.                 messageStore = task.call(); 
7.                 ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); 
8.                 if (map == null) { 
9.                     map = new ConcurrentHashMap<Integer, MessageStore>(); 
10.                     this.stores.put(messageStore.getTopic(), map); 
11.                 } 
12.                 map.put(messageStore.getPartition(), messageStore); 
13.             } catch (IOException e) { 
14.                 throw e; 
15.             } catch (InterruptedException e) { 
16.                 throw e; 
17.             } catch (Exception e) { 
18.                 throw new IllegalStateException(e); 
19.             } 
20.         } 
21.         tasks.clear(); 
22.     } 
23.  
24. //并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。 
25. private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException { 
26.         CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor); 
27.         for (Callable<MessageStore> task : tasks) { 
28.             completionService.submit(task); 
29.         } 
30.         for (int i = 0; i < tasks.size(); i++) { 
31.             try { 
32.                 //确保任务都已经运行完毕 
33.                 MessageStore messageStore = completionService.take().get(); 
34.  
35.                 ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic()); 
36.                 if (map == null) { 
37.                     map = new ConcurrentHashMap<Integer, MessageStore>(); 
38.                     this.stores.put(messageStore.getTopic(), map); 
39.                 } 
40.                 map.put(messageStore.getPartition(), messageStore); 
41.             } catch (ExecutionException e) { 
42.                 throw ThreadUtils.launderThrowable(e); 
43.             } 
44.         } 
45.         tasks.clear(); 
46.     }


MessageStoreManager关闭时调用dispose方法,确保资源都正确释放。


1. public void dispose() { 
2. //关闭调度器和调度池 
3.         this.scheduledExecutorService.shutdown(); 
4.         if (this.scheduler != null) { 
5.             try { 
6.                 this.scheduler.shutdown(true); 
7.             } catch (final SchedulerException e) { 
8.                 log.error("Shutdown quartz scheduler failed", e); 
9.             } 
10.         } 
11. //确保每一个 MessageStore实例都正确关闭 
12.         for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores 
13.                 .values()) { 
14.             if (subMap != null) { 
15.                 for (final MessageStore msgStore : subMap.values()) { 
16.                     if (msgStore != null) { 
17.                         try { 
18.                             msgStore.close(); 
19.                         } catch (final Throwable e) { 
20.                             log.error("Try to run close  " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e); 
21.                         } 
22.                     } 
23.                 } 
24.             } 
25.         } 
26. //清空stores列表 
27.         this.stores.clear(); 
28.     }

MessageStoreManager对外提供了获取的MessageStore的方法getMessageStore(final String topic, final int partition)和getOrCreateMessageStore(final String topic, final int partition) throws IOException。

getMessageStore()从stores列表查找对应的MessageStore,如果不存在则返回空;而getOrCreateMessage()则先检查对应的topic是否曾经配置,如果没有则抛出异常,如果有则判断stores是否已有MessageStore实例,如果没有,则生成MessageStore实例放入到stores列表并返回,如果有,则直接返回。

1. public MessageStore getMessageStore(final String topic, final int partition) { 
2.         final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
3.         if (map == null) { 
4. //如果topic对应的MessageStore实例列表不存在,则直接返回null 
5.             return null; 
6.         } 
7.         return map.get(partition); 
8.     } 
9.  
10.     Collection<MessageStore> getMessageStoresByTopic(final String topic) { 
11.         final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
12.         if (map == null) { 
13.             return Collections.emptyList(); 
14.         } 
15.         return map.values(); 
16.     } 
17.  
18.     public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException { 
19.         return this.getOrCreateMessageStoreInner(topic, partition, 0); 
20.     } 
21.  
22.     public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException { 
23.         return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate); 
24.     } 
25.  
26.     private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException { 
27.       //判断topic是否可用,即是否在topicsPatSet列表中 
28.         if (!this.isLegalTopic(topic)) { 
29.             throw new IllegalTopicException("The server do not accept topic " + topic); 
30.         } 
31. //判断分区号是否正确 
32.         if (partition < 0 || partition >= this.getNumPartitions(topic)) { 
33.             log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")"); 
34.             throw new WrongPartitionException("wrong partition " + partition); 
35.         } 
36.         ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic); 
37. //如果topic对应的列表不存在,则生成列表,放进stores中 
38.         if (map == null) { 
39.             map = new ConcurrentHashMap<Integer, MessageStore>(); 
40.             final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map); 
41.             if (oldMap != null) { 
42.                 map = oldMap; 
43.             } 
44.         } 
45. //判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中 
46.         MessageStore messageStore = map.get(partition); 
47.         if (messageStore != null) { 
48.             return messageStore; 
49.         } else { 
50.             // 对string加锁,特例 
51.             synchronized (topic.intern()) { 
52.                 messageStore = map.get(partition); 
53.                 // double check 
54.                 if (messageStore != null) { 
55.                     return messageStore; 
56.                 } 
57.                 messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate); 
58.                 log.info("Created a new message storage for topic=" + topic + ",partition=" + partition); 
59.                 map.put(partition, messageStore); 
60.             } 
61.         } 
62.         return messageStore; 
63.     } 
64.  
65.     boolean isLegalTopic(final String topic) { 
66.         for (final Pattern pat : this.topicsPatSet) { 
67.             if (pat.matcher(topic).matches()) { 
68.                 return true; 
69.             } 
70.         } 
71.         return false; 
72.     }


通过MessageStoreManager,我们把MessageStore和删除策略很好的组织在一起,并在MessageStoreManager提供定时提交的功能,提升了数据的可靠性;通过MessageStoreManager也为其他模块访问存储模块提供了接口。

我觉得MessageStoreManager设计不好的地方在于topicsPatSet,在topic列表发生变化的时候,没有先清空topicsPatSet,而是直接添加,而且没有对topic对应的MessageStore实例进行重新初始化,如果MessageStore实例已经存在,新删除策略配置不能生效。个人建议是一旦topic列表发生变化的时候,重新初始化整个存储模块,保证一致性。

至此, Broker的消息存储模块基本分析完毕。下一篇,进入Broker网络相关以及消息处理流程分析。

举报

相关推荐

0 条评论