前面,我们已经把Broker存储最重要的一个类具体分析了一遍,接下来,我们分析一下其删除的策略。前面介绍过Messagestore采用的多文件存储的组织方式,而存储空间不可能无限大,得有一定的删除策略对其进行删除以腾出空间给新的消息。
MetaQ允许自定义删除策略,需要实现接口DeletePolicy,默认提供了两种删除策略:过期删除(DiscardDeletePolicy)和过期打包删除(ArchiveDeletePolicy)。DiscardDeletePolicy和ArchiveDeletePolicy都比较简单,DiscardDeletePolicy主要是对于超过一定时期的文件进行删除,ArchiveDeletePolicy则是先打包备份再删除。
自定义策略是如何被识别和使用的呢,MetaQ定义了DeletePolicyFactory,所有删除策略的实例都由DeletePolicyFactory提供,DeletePolicyFactory对外提供了注册机制,利用反射机制生成实例,每个自定义的删除策略都必须有一个无参构造,DeletePolicyFactory生成实例代码如下:
1. public static DeletePolicy getDeletePolicy(String values) {
2. String[] tmps = values.split(",");
3. String name = tmps[0];
4. Class<? extends DeletePolicy> clazz = policyMap.get(name);
5. if (clazz == null) {
6. throw new UnknownDeletePolicyException(name);
7. }
8. try {
9. //直接调用class的newInstance()方法,该方法必须要求有一个无参构造
10. DeletePolicy deletePolicy = clazz.newInstance();
11. String[] initValues = null;
12. if (tmps.length >= 2) {
13. initValues = new String[tmps.length - 1];
14. System.arraycopy(tmps, 1, initValues, 0, tmps.length - 1);
15. }
16. deletePolicy.init(initValues);
17. return deletePolicy;
18. }
19. catch (Exception e) {
20. throw new MetamorphosisServerStartupException("New delete policy `" + name + "` failed", e);
21. }
22. }
DeletePolicy和MessageStore如何结合在一起的呢?则是粘合剂MessageStoreManager,MessageStoreManager是存储模块的管家,负责与其他模块联系,也是MessageStore管理器,管理所有的MessageStore以及其删除策略,MessageStoreManager也是要好好分析的一个类。
1. private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* partition */, MessageStore>> stores = new ConcurrentHashMap<String, ConcurrentHashMap<Integer, MessageStore>>();
2. //前面的存储组织方式介绍过一个主题对应多一个分区,每个分区对应一个MessageStore实例,分区号使用数值来表示,stores就是按照该方式组织管理的
3. private final MetaConfig metaConfig;
4. //参数配置
5. private ScheduledThreadPoolExecutor scheduledExecutorService;// =
6. // Executors.newScheduledThreadPool(2);
7. //调度服务,对不同的MessageStore实例flush,将数据提到到硬盘
8. private final DeletePolicy deletePolicy;
9. //删除策略选择器,这里采用的一个topic对应一种策略,而不是一个MessageStore对应一个策略实例,一个策略实例在同一个topic的不同MessageStore实例间是重用的
10. private DeletePolicySelector deletePolicySelector;
11.
12. public static final int HALF_DAY = 1000 * 60 * 60 * 12;
13. //topic 集合
14. private final Set<Pattern> topicsPatSet = new HashSet<Pattern>();
15.
16. private final ConcurrentHashMap<Integer, ScheduledFuture<?>> unflushIntervalMap = new ConcurrentHashMap<Integer, ScheduledFuture<?>>();
17. //前面曾介绍过MessageStore的提交方式有两种:组提交和定时提交,unflushIntervalMap是存放
18. //定时提交的任务
19. private Scheduler scheduler;
20. //定时调度器,用于定时调度删除任务
21. public MessageStoreManager(final MetaConfig metaConfig, final DeletePolicy deletePolicy) {
22. this.metaConfig = metaConfig;
23. this.deletePolicy = deletePolicy;
24. //生成策略选择器
25. this.newDeletePolicySelector();
26. //添加匿名监听器,监听topic列表变化,如果列表发生变化,则新增列表并重新生成选择器
27. this.metaConfig.addPropertyChangeListener("topics", new PropertyChangeListener() {
28. public void propertyChange(final PropertyChangeEvent evt) {
29. MessageStoreManager.this.makeTopicsPatSet();
30. MessageStoreManager.this.newDeletePolicySelector();
31. }
32. });
33. //添加匿名监听,监听unflushInternal变化,如果发生变化
34. this.metaConfig.addPropertyChangeListener("unflushInterval", new PropertyChangeListener() {
35. public void propertyChange(final PropertyChangeEvent evt) {
36. MessageStoreManager.this.scheduleFlushTask();
37. }
38. });
39. this.makeTopicsPatSet();
40. //初始化调度
41. this.initScheduler();
42. // 定时flush,该方法作者有详细注释就不在解释了
43. this.scheduleFlushTask();
44. }
MessageStoreManager实现接口Service,在启动是会调用init方法,关闭时调用dispose方法
1. public void init() {
2. // 加载已有数据并校验
3. try {
4. this.loadMessageStores(this.metaConfig);
5. } catch (final IOException e) {
6. log.error("load message stores failed", e);
7. throw new MetamorphosisServerStartupException("Initilize message store manager failed", e);
8. } catch (InterruptedException e) {
9. Thread.currentThread().interrupt();
10. }
11. this.startScheduleDeleteJobs();
12. }
13.
14. //
15. private Set<File> getDataDirSet(final MetaConfig metaConfig) throws IOException {
16. final Set<String> paths = new HashSet<String>();
17. // public data path
18. //公共数据目录
19. paths.add(metaConfig.getDataPath());
20. // topic data path
21. //私有数据目录
22. for (final String topic : metaConfig.getTopics()) {
23. final TopicConfig topicConfig = metaConfig.getTopicConfig(topic);
24. if (topicConfig != null) {
25. paths.add(topicConfig.getDataPath());
26. }
27. }
28. final Set<File> fileSet = new HashSet<File>();
29. for (final String path : paths) {
30. //验证数据目录是否存在
31. fileSet.add(this.getDataDir(path));
32. }
33. return fileSet;
34. }
35.
36. private void loadMessageStores(final MetaConfig metaConfig) throws IOException, InterruptedException {
37. //加载数据目录列表,再加载每个目录下的数据
38. for (final File dir : this.getDataDirSet(metaConfig)) {
39. this.loadDataDir(metaConfig, dir);
40. }
41. }
42.
43. private void loadDataDir(final MetaConfig metaConfig, final File dir) throws IOException, InterruptedException {
44. log.warn("Begin to scan data path:" + dir.getAbsolutePath());
45. final long start = System.currentTimeMillis();
46. final File[] ls = dir.listFiles();
47. int nThreads = Runtime.getRuntime().availableProcessors() + 1;
48. ExecutorService executor = Executors.newFixedThreadPool(nThreads);
49. int count = 0;
50. //将加载验证每个分区的数据包装成一个个任务
51. List<Callable<MessageStore>> tasks = new ArrayList<Callable<MessageStore>>();
52. for (final File subDir : ls) {
53. if (!subDir.isDirectory()) {
54. log.warn("Ignore not directory path:" + subDir.getAbsolutePath());
55. } else {
56. final String name = subDir.getName();
57. final int index = name.lastIndexOf('-');
58. if (index < 0) {
59. log.warn("Ignore invlaid directory:" + subDir.getAbsolutePath());
60. continue;
61. }
62. //包装任务
63. tasks.add(new Callable<MessageStore>() {
64. //回调方法,方法将具体的加载验证分区数据
65. @Override
66. public MessageStore call() throws Exception {
67. log.warn("Loading data directory:" + subDir.getAbsolutePath() + "...");
68. final String topic = name.substring(0, index);
69. final int partition = Integer.parseInt(name.substring(index + 1)); //构造MessageStore实例的时候会自动加载验证数据,在初始化MessageStore实例的时候会给该MessageStore实例选择该topic的删除策略
70. final MessageStore messageStore = new MessageStore(topic, partition, metaConfig,
71. MessageStoreManager.this.deletePolicySelector.select(topic, MessageStoreManager.this.deletePolicy));
72. return messageStore;
73. }
74. });
75. count++;
76. if (count % nThreads == 0 || count == ls.length) {
77. //如果配置了并行加载,则使用并行加载
78. if (metaConfig.isLoadMessageStoresInParallel()) {
79. this.loadStoresInParallel(executor, tasks);
80. } else {
81. //串行加载验证数据
82. this.loadStores(tasks);
83. }
84. }
85. }
86. }
87. executor.shutdownNow();
88. log.warn("End to scan data path in " + (System.currentTimeMillis() - start) / 1000 + " secs");
89. }
在init方法中做的一件事情就是加载校验已有的数据,加载校验的方式有两种个,串行和并行。
1. //串行加载验证数据,则在主线程上完成验证加载工作,其缺点是较慢,好处是不会打乱日志顺序
2. private void loadStores(List<Callable<MessageStore>> tasks) throws IOException, InterruptedException {
3. for (Callable<MessageStore> task : tasks) {
4. MessageStore messageStore;
5. try {
6. messageStore = task.call();
7. ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic());
8. if (map == null) {
9. map = new ConcurrentHashMap<Integer, MessageStore>();
10. this.stores.put(messageStore.getTopic(), map);
11. }
12. map.put(messageStore.getPartition(), messageStore);
13. } catch (IOException e) {
14. throw e;
15. } catch (InterruptedException e) {
16. throw e;
17. } catch (Exception e) {
18. throw new IllegalStateException(e);
19. }
20. }
21. tasks.clear();
22. }
23.
24. //并行加载数据,当数据过多的时候,启动并行加载数据可以加快启动速度;但是会打乱启动的日志顺序,默认不启用。
25. private void loadStoresInParallel(ExecutorService executor, List<Callable<MessageStore>> tasks) throws InterruptedException {
26. CompletionService<MessageStore> completionService = new ExecutorCompletionService<MessageStore>(executor);
27. for (Callable<MessageStore> task : tasks) {
28. completionService.submit(task);
29. }
30. for (int i = 0; i < tasks.size(); i++) {
31. try {
32. //确保任务都已经运行完毕
33. MessageStore messageStore = completionService.take().get();
34.
35. ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(messageStore.getTopic());
36. if (map == null) {
37. map = new ConcurrentHashMap<Integer, MessageStore>();
38. this.stores.put(messageStore.getTopic(), map);
39. }
40. map.put(messageStore.getPartition(), messageStore);
41. } catch (ExecutionException e) {
42. throw ThreadUtils.launderThrowable(e);
43. }
44. }
45. tasks.clear();
46. }
MessageStoreManager关闭时调用dispose方法,确保资源都正确释放。
1. public void dispose() {
2. //关闭调度器和调度池
3. this.scheduledExecutorService.shutdown();
4. if (this.scheduler != null) {
5. try {
6. this.scheduler.shutdown(true);
7. } catch (final SchedulerException e) {
8. log.error("Shutdown quartz scheduler failed", e);
9. }
10. }
11. //确保每一个 MessageStore实例都正确关闭
12. for (final ConcurrentHashMap<Integer/* partition */, MessageStore> subMap : MessageStoreManager.this.stores
13. .values()) {
14. if (subMap != null) {
15. for (final MessageStore msgStore : subMap.values()) {
16. if (msgStore != null) {
17. try {
18. msgStore.close();
19. } catch (final Throwable e) {
20. log.error("Try to run close " + msgStore.getTopic() + "," + msgStore.getPartition() + " failed", e);
21. }
22. }
23. }
24. }
25. }
26. //清空stores列表
27. this.stores.clear();
28. }
MessageStoreManager对外提供了获取的MessageStore的方法getMessageStore(final String topic, final int partition)和getOrCreateMessageStore(final String topic, final int partition) throws IOException。
getMessageStore()从stores列表查找对应的MessageStore,如果不存在则返回空;而getOrCreateMessage()则先检查对应的topic是否曾经配置,如果没有则抛出异常,如果有则判断stores是否已有MessageStore实例,如果没有,则生成MessageStore实例放入到stores列表并返回,如果有,则直接返回。
1. public MessageStore getMessageStore(final String topic, final int partition) {
2. final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);
3. if (map == null) {
4. //如果topic对应的MessageStore实例列表不存在,则直接返回null
5. return null;
6. }
7. return map.get(partition);
8. }
9.
10. Collection<MessageStore> getMessageStoresByTopic(final String topic) {
11. final ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);
12. if (map == null) {
13. return Collections.emptyList();
14. }
15. return map.values();
16. }
17.
18. public MessageStore getOrCreateMessageStore(final String topic, final int partition) throws IOException {
19. return this.getOrCreateMessageStoreInner(topic, partition, 0);
20. }
21.
22. public MessageStore getOrCreateMessageStore(final String topic, final int partition, final long offsetIfCreate) throws IOException {
23. return this.getOrCreateMessageStoreInner(topic, partition, offsetIfCreate);
24. }
25.
26. private MessageStore getOrCreateMessageStoreInner(final String topic, final int partition, final long offsetIfCreate) throws IOException {
27. //判断topic是否可用,即是否在topicsPatSet列表中
28. if (!this.isLegalTopic(topic)) {
29. throw new IllegalTopicException("The server do not accept topic " + topic);
30. }
31. //判断分区号是否正确
32. if (partition < 0 || partition >= this.getNumPartitions(topic)) {
33. log.warn("Wrong partition " + partition + ",valid partitions (0," + (this.getNumPartitions(topic) - 1) + ")");
34. throw new WrongPartitionException("wrong partition " + partition);
35. }
36. ConcurrentHashMap<Integer/* partition */, MessageStore> map = this.stores.get(topic);
37. //如果topic对应的列表不存在,则生成列表,放进stores中
38. if (map == null) {
39. map = new ConcurrentHashMap<Integer, MessageStore>();
40. final ConcurrentHashMap<Integer/* partition */, MessageStore> oldMap = this.stores.putIfAbsent(topic, map);
41. if (oldMap != null) {
42. map = oldMap;
43. }
44. }
45. //判断列表中是否有存在分区号位partition为的MessageStore实例,如果有,直接返回;如果没有,则生成实例并放进列表中
46. MessageStore messageStore = map.get(partition);
47. if (messageStore != null) {
48. return messageStore;
49. } else {
50. // 对string加锁,特例
51. synchronized (topic.intern()) {
52. messageStore = map.get(partition);
53. // double check
54. if (messageStore != null) {
55. return messageStore;
56. }
57. messageStore = new MessageStore(topic, partition, this.metaConfig, this.deletePolicySelector.select(topic, this.deletePolicy), offsetIfCreate);
58. log.info("Created a new message storage for topic=" + topic + ",partition=" + partition);
59. map.put(partition, messageStore);
60. }
61. }
62. return messageStore;
63. }
64.
65. boolean isLegalTopic(final String topic) {
66. for (final Pattern pat : this.topicsPatSet) {
67. if (pat.matcher(topic).matches()) {
68. return true;
69. }
70. }
71. return false;
72. }
通过MessageStoreManager,我们把MessageStore和删除策略很好的组织在一起,并在MessageStoreManager提供定时提交的功能,提升了数据的可靠性;通过MessageStoreManager也为其他模块访问存储模块提供了接口。
我觉得MessageStoreManager设计不好的地方在于topicsPatSet,在topic列表发生变化的时候,没有先清空topicsPatSet,而是直接添加,而且没有对topic对应的MessageStore实例进行重新初始化,如果MessageStore实例已经存在,新删除策略配置不能生效。个人建议是一旦topic列表发生变化的时候,重新初始化整个存储模块,保证一致性。
至此, Broker的消息存储模块基本分析完毕。下一篇,进入Broker网络相关以及消息处理流程分析。