有一个项目用来负责调度集群中的"cron任务",比如一个application中可以配置N个定时任务,这些任务信息最终注册到zookeeper上,并开发了一系列代码用于维护这些任务的"活性";当applicaton中一个server故障,那么这个server上接管的任务,需要迁移到其他server上,如果多个server存活的话,还需要这些任务能够"均衡"的分布.
其中"负载均衡",很好理解,比如有6个任务,3个server,那么就需要每个server上尽可能的运行2个任务;其实这个事情想起来很简单,但是做起来似乎有些不得不考虑的问题:
1) "相对平均"怎么设计
2) 迁移任务时,是否会丢失任务的触发时机;比如一个任务凌晨3点执行,刚好此时运行了一次"均衡",任务在原来的server上没有触发,在新的server上又过了时间..
3) 迁移任务时,还需要考虑"最少移动"次数,不能大面积迁移任务;只能从"负载高"的server上迁移到"负载低"的.
例如:
sid1: w1 w2 w3 w4
sid2: w5
sid3:w6
期望迁移之后:
sid1:w1 w2
sid2:w5 w3
sid3:w4 w6
而不是(这种结果迁移的面积太大,只需要把"多余"的任务迁移出去即可,而不是重新洗牌再均衡)
sid1:w6 w5
sid2:w2 w3
sid3:w1 w4
经过提取,"相对平均"的设计代码如下,仅作备忘:
- package
- import
- import
- import
- public class
- private List<String> servers = new
- private Map<String, List<String>> current = new
- private Set<String> workers = new
- public static void
- new BufferedReader(new
- String line;
- new
- new
- try
- while ((line = br.readLine()) != null) {
- if (line.startsWith("addWorker")) {
- balancer.addWorkers(line);
- else if (line.startsWith("addServer")) {
- balancer.addServers(line);
- else
- "???");
- continue;
- }
- balancer.rebalance();
- }
- catch
- e.printStackTrace();
- }
- "--END---");
- }
- public void
- int index = source.indexOf(" ");
- if (index == -1) {
- return;
- }
- 1).split(" ");
- if (values == null || values.length == 0) {
- return;
- }
- for
- servers.add(server);
- if(current.get(server) == null){
- new
- }
- }
- }
- public void
- int index = source.indexOf(" ");
- if (index == -1) {
- return;
- }
- 1).split(" ");
- if (values == null || values.length == 0) {
- return;
- }
- //当有新的worker提交时,将咱有一台机器接管
- 0);
- List<String> sw = current.get(sid);
- if(sw == null){
- new
- }
- for
- workers.add(worker);
- sw.add(worker);
- }
- }
- public void
- try
- if
- return;
- }
- for
- if (current.get(sid) == null) {
- new
- }
- }
- //根据每个sid上的worker个数,整理成一个排序的map
- new
- for
- int
- List<String> sl = counterMap.get(total);
- if (sl == null) {
- new
- counterMap.put(total, sl);
- }
- //sid
- }
- int
- int
- int avg = totalWorkers / totalServers;//每个server实例可以接管任务的平均数
- while (true) {
- //大于平均数的列表, >= avg + 1
- //与平均数差值为2的 <= arg - 1
- //允许任务个数与avg上线浮动1各个,不是绝对的平均
- if (gt == null || lt == null) {
- break;
- }
- Integer gtKey = gt.getKey();
- Integer ltKey = lt.getKey();
- if (gtKey - ltKey < 2) {
- break;
- }
- if (gt.getValue().size() == 0) {
- counterMap.remove(gt.getKey());
- }
- if (lt.getValue().size() == 0) {
- counterMap.remove(lt.getKey());
- }
- //sid列表
- while
- String _fromSid = it.next();
- List<String> _currentWorkers = current.get(_fromSid);
- if (_currentWorkers == null
- it.remove();
- current.remove(_fromSid);
- continue;
- }
- List<String> _ltServers = lt.getValue();
- if
- counterMap.remove(ltKey);
- break;
- }
- //取出需要交换出去的任务id
- int
- 1);
- 0);
- //从_fromSid的worker列表中移除低workerId
- //注意:移除最后一个,和_ltWorkers.add(_wid)对应,_ltWorkers将新任务添加到list的尾部
- //即从尾部移除,从尾部添加,基本保证"原任务,最少迁移次数"
- 1);
- it.remove();
- 0);
- //将此workerId添加到_toSid的worker列表中
- List<String> _ltWorkers = current.get(_toSid);
- if (_ltWorkers == null) {
- new
- current.put(_toSid, _ltWorkers);
- }
- _ltWorkers.add(_wid);
- //将gt的key降低一个数字
- 1);
- if (_next == null) {
- new
- 1, _next);
- }
- _next.add(_fromSid);
- //将lt的key提升一个数字
- 1);
- //从lt的countMap中移除,因为它将被放置在key + 1的新位置
- Iterator<String> _ltIt = _ltServers.iterator();
- while
- if
- _ltIt.remove();
- break;
- }
- }
- if (_prev == null) {
- new
- 1, _prev);
- }
- _prev.add(_toSid);
- }
- }
- //dump info
- for
- "Sid:"
- System.out.println(entry.getValue().toString());
- }
- catch
- e.printStackTrace();
- }
- }
- }