0
点赞
收藏
分享

微信扫一扫

【转载】相对平均分布


 

有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.

   其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:

    1) "相对平均"怎么设计

    2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..

    3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.

 

例如:

    sid1: w1 w2 w3 w4

    sid2: w5

    sid3:w6

期望迁移之后:

    sid1:w1 w2

    sid2:w5 w3

    sid3:w4 w6

 

而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)

    sid1:w6 w5

    sid2:w2 w3

    sid3:w1 w4

   

经过提取,"相对平均"的设计代码如下,仅作备忘:



【转载】相对平均分布_d3



  1. package
  2.   
  3. import
  4. import
  5. import
  6.   
  7. public class
  8. private List<String> servers = new
  9. private Map<String, List<String>> current = new
  10. private Set<String> workers = new
  11.   
  12. public static void
  13. new BufferedReader(new
  14.         String line;  
  15. new
  16. new
  17. try
  18. while ((line = br.readLine()) != null) {  
  19. if (line.startsWith("addWorker")) {  
  20.                     balancer.addWorkers(line);  
  21. else if (line.startsWith("addServer")) {  
  22.                     balancer.addServers(line);  
  23. else
  24. "???");  
  25. continue;  
  26.                 }  
  27.                 balancer.rebalance();  
  28.             }  
  29. catch
  30.             e.printStackTrace();  
  31.         }  
  32. "--END---");  
  33.     }  
  34.   
  35. public void
  36. int index = source.indexOf(" ");  
  37. if (index == -1) {  
  38. return;  
  39.         }  
  40. 1).split(" ");  
  41. if (values == null || values.length == 0) {  
  42. return;  
  43.         }  
  44. for
  45.             servers.add(server);  
  46. if(current.get(server) == null){  
  47. new
  48.             }  
  49.         }  
  50.     }  
  51.   
  52. public void
  53. int index = source.indexOf(" ");  
  54. if (index == -1) {  
  55. return;  
  56.         }  
  57. 1).split(" ");  
  58. if (values == null || values.length == 0) {  
  59. return;  
  60.         }  
  61. //当有新的worker提交时,将咱有一台机器接管
  62. 0);  
  63.         List<String> sw = current.get(sid);  
  64. if(sw == null){  
  65. new
  66.         }  
  67. for
  68.             workers.add(worker);  
  69.             sw.add(worker);  
  70.         }  
  71.   
  72.     }  
  73.   
  74. public void
  75. try
  76. if
  77. return;  
  78.             }  
  79. for
  80. if (current.get(sid) == null) {  
  81. new
  82.                 }  
  83.             }  
  84. //根据每个sid上的worker个数,整理成一个排序的map
  85. new
  86. for
  87. int
  88.                 List<String> sl = counterMap.get(total);  
  89. if (sl == null) {  
  90. new
  91.                     counterMap.put(total, sl);  
  92.                 }  
  93. //sid
  94.             }  
  95. int
  96. int
  97. int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数
  98. while (true) {  
  99. //大于平均数的列表, >= avg + 1
  100. //与平均数差值为2的 <= arg  - 1
  101. //允许任务个数与avg上线浮动1各个,不是绝对的平均
  102.   
  103. if (gt == null || lt == null) {  
  104. break;  
  105.                 }  
  106.                 Integer gtKey = gt.getKey();  
  107.                 Integer ltKey = lt.getKey();  
  108. if (gtKey - ltKey < 2) {  
  109. break;  
  110.                 }  
  111. if (gt.getValue().size() == 0) {  
  112.                     counterMap.remove(gt.getKey());  
  113.                 }  
  114. if (lt.getValue().size() == 0) {  
  115.                     counterMap.remove(lt.getKey());  
  116.                 }  
  117. //sid列表
  118. while
  119.                     String _fromSid = it.next();  
  120.                     List<String> _currentWorkers = current.get(_fromSid);  
  121. if (_currentWorkers == null
  122.                         it.remove();  
  123.                         current.remove(_fromSid);  
  124. continue;  
  125.                     }  
  126.                     List<String> _ltServers = lt.getValue();  
  127. if
  128.                         counterMap.remove(ltKey);  
  129. break;  
  130.                     }  
  131. //取出需要交换出去的任务id
  132. int
  133. 1);  
  134. 0);  
  135. //从_fromSid的worker列表中移除低workerId
  136. //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部
  137. //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"
  138. 1);  
  139.                     it.remove();  
  140. 0);  
  141. //将此workerId添加到_toSid的worker列表中
  142.                     List<String> _ltWorkers = current.get(_toSid);  
  143. if (_ltWorkers == null) {  
  144. new
  145.                         current.put(_toSid, _ltWorkers);  
  146.                     }  
  147.                     _ltWorkers.add(_wid);  
  148. //将gt的key降低一个数字
  149. 1);  
  150. if (_next == null) {  
  151. new
  152. 1, _next);  
  153.                     }  
  154.                     _next.add(_fromSid);  
  155. //将lt的key提升一个数字
  156. 1);  
  157. //从lt的countMap中移除,因为它将被放置在key + 1的新位置
  158.                     Iterator<String> _ltIt = _ltServers.iterator();  
  159. while
  160. if
  161.                             _ltIt.remove();  
  162. break;  
  163.                         }  
  164.                     }  
  165. if (_prev == null) {  
  166. new
  167. 1, _prev);  
  168.                     }  
  169.                     _prev.add(_toSid);  
  170.                 }  
  171.             }  
  172. //dump info
  173. for
  174. "Sid:"
  175.                 System.out.println(entry.getValue().toString());  
  176.             }  
  177. catch
  178.             e.printStackTrace();  
  179.         }  
  180.   
  181.     }  
  182.   
  183. }  



 

举报

相关推荐

css 文本平均分布

计算平均分

平均分 凑字

总分和平均分

2071:【例2.14】平均分

java 平均分配

0 条评论