0
点赞
收藏
分享

微信扫一扫

7.高可用集群架构Keepalived双主热备原理

目录

1 简介

1.1 什么是 CP 架构?

1.2 Nacos 中的 CP 架构特点

1.3 优缺点

1.4适用场景

2 cp架构的主节点选举

2.1 选举流程

2.2 总结

3 cp架构主节点的心跳发送

3.1 leader发送心跳

3.2 follower接收心跳

3.3 总结

4 cp架构的服务注册

4.1 注册流程

4.2 总结


1 简介

Nacos(Naming Configuration Service)是一款由阿里巴巴开源的服务发现和配置管理平台。它支持服务注册、发现和配置管理,并在微服务架构中被广泛应用。Nacos 支持两种主要的分布式一致性架构:CP 模式和 AP模式。这里主要介绍一下 CP 架构

1.1 什么是 CP 架构?

CP 架构遵循 CAP 理论中的 一致性(Consistency) 和 分区容错性(Partition Tolerance,在系统遇到网络分区问题时,CP 架构确保数据一致性,但可能会牺牲可用性(Availability)。这意味着在网络分区的情况下,系统会优先保证所有节点数据的一致性,即使需要暂停一些服务。

1.2 Nacos 中的 CP 架构特点

•  数据强一致性:CP 模式下,Nacos 集群中的数据保证强一致性,所有的写操作都需要通过大多数节点的确认才能生效。因此,当数据写入一个节点时,数据会被同步到集群中的大多数节点,确保一致性。

•  Raft 协议:Nacos 的 CP 架构实现基于 Raft 一致性协议,这是一个分布式一致性算法,确保数据在多个节点之间保持一致。Raft 协议通过选举出一个主节点(Leader)来管理写操作,其他节点作为从节点(Follower)负责复制数据。

•  主从架构:Nacos 在 CP 模式下采用主从架构,主节点处理所有写请求,并将这些请求复制给从节点。若主节点不可用,则会在剩余节点中进行新的选举。

•  数据可靠性:因为数据需要被大多数节点确认后才被写入,CP 模式提供了更高的数据可靠性,适用于对一致性要求高的场景,如金融或订单系统。

1.3 优缺点

优点

•  数据一致性:在分布式环境中保持数据的强一致性,减少数据不一致问题。

•  可靠性:保证在故障或网络分区情况下,数据不会出现不同步或冲突的现象。

缺点

•  可用性降低:在网络分区或节点故障时,服务可用性可能受到影响,无法及时处理请求。

•  性能开销:因为写操作需要多数节点确认,写入性能可能相对较低。

1.4适用场景

CP 架构适用于对一致性要求极高的应用,如:

•  金融系统:需要确保每笔交易的准确性。

•  订单处理系统:要求数据在订单创建、修改等操作中保持严格一致。

2 cp架构的主节点选举

2.1 选举流程

核心类:RaftCore.java 核心方法:init


public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();

raftStore.loadDatums(notifier, datums);

setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

initialized = true;

Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
//1 选举流程
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
//2 发心跳流程
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());

versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);

NotifyCenter.registerSubscriber(notifier);

Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}


//MasterElection中的run方法
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
//1 休眠一段时间
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;

if (local.leaderDueMs > 0) {
return;
}

// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
//2 发起投票
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}

}


private void sendVote() {

RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),
local.term);

peers.reset();
//1 选举周期加1
local.term.incrementAndGet();
//2 投票给自己
local.voteFor = local.ip;
//3 将自己改为候选者
local.state = RaftPeer.State.CANDIDATE;

Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
//4 发起投票
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildUrl(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}

RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);

Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));
//5 解析选票 如果大于半数 将自己改为leader
peers.decideLeader(peer);

}

@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);
}

@Override
public void onCancel() {

}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
2.2 总结
  1. 首先所有节点休眠一个随机时间

  2. 当节点苏醒后 将自己的选举周期加一 投票给自己 将自己修改为候选者

  3. 然后发送投票给其他所有节点

  4. 半数节点以上同意就将自己修改为leader 如果没有选择 开启下一轮的选举

3 cp架构主节点的心跳发送

3.1 leader发送心跳

核心类:RaftCore.java 核心方法:init


public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
final long start = System.currentTimeMillis();

raftStore.loadDatums(notifier, datums);

setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));

Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());

initialized = true;

Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
//1 选举流程
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
//2 发心跳流程
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());

versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);

NotifyCenter.registerSubscriber(notifier);

Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}


//HeartBeat的run方法
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}

RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}

local.resetHeartbeatDue();
//发送心跳
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}

}


private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
//1 如果不是leader 不能发送心跳
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}

local.resetLeaderDue();

// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
packet.replace("peer", JacksonUtils.transferToJsonNode(local));

ArrayNode array = JacksonUtils.createEmptyArrayNode();

if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
}
//2 封装需要发送的信息 主要包含key和时间
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {

ObjectNode element = JacksonUtils.createEmptyJsonNode();

if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());

array.add(element);
}
}

packet.replace("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
//3 压缩消息
String content = JacksonUtils.toJson(params);

ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();

byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);

if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),
compressedContent.length());
}
//4 将消息发送给其他节点
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildUrl(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return;
}

peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
}

@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,
throwable);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}

@Override
public void onCancel() {

}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}

}
3.2 follower接收心跳

核心类:RaftController.java 核心方法:beat


public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {
if (versionJudgement.allMemberIsNewVersion()) {
throw new IllegalStateException("old raft protocol already stop");
}
String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);
String value = URLDecoder.decode(entity, "UTF-8");
value = URLDecoder.decode(value, "UTF-8");

JsonNode json = JacksonUtils.toObj(value);
//1 接受心跳
RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));

return JacksonUtils.transferToJsonNode(peer);
}


public RaftPeer receivedBeat(JsonNode beat) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
JsonNode peer = beat.get("peer");
remote.ip = peer.get("ip").asText();
remote.state = RaftPeer.State.valueOf(peer.get("state").asText());
remote.term.set(peer.get("term").asLong());
remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();
remote.leaderDueMs = peer.get("leaderDueMs").asLong();
remote.voteFor = peer.get("voteFor").asText();
//1 只处理leader的心跳
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,
JacksonUtils.toJson(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}

if (local.term.get() > remote.term.get()) {
Loggers.RAFT
.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",
remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);
throw new IllegalArgumentException(
"out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());
}
//2 如果之前不是follower 将自己修改为follower节点
if (local.state != RaftPeer.State.FOLLOWER) {

Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));
// mk follower
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}

final JsonNode beatDatums = beat.get("datums");
local.resetLeaderDue();
local.resetHeartbeatDue();

peers.makeLeader(remote);

if (!switchDomain.isSendBeatOnly()) {

Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());

for (Map.Entry<String, Datum> entry : datums.entrySet()) {
//3 将自己内存的数据标记为0
receivedKeysMap.put(entry.getKey(), 0);
}

// now check datums
List<String> batch = new ArrayList<>();

int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT
.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
//4 开始解析远程发来的key
for (Object object : beatDatums) {
processedCount = processedCount + 1;

JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;

if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
// ignore corrupted key:
continue;
}

long timestamp = entry.get("timestamp").asLong();
//5 标记是leader发来的key
receivedKeysMap.put(datumKey, 1);

try {
//6 如果本地包含这个key 并且时间戳大于等于leader的时间戳 不处理
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
//7 如果不包含这个key或者本地key不是新的 存储起来
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
//8 如果没有达到批量处理条件 继续处理
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}

String keys = StringUtils.join(batch, ",");

if (batch.size() <= 0) {
continue;
}

Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"
+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),
processedCount, beatDatums.size(), datums.size());

// update datum entry
//9 调用leader的接口 获取不是最新key的信息
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
return;
}

List<JsonNode> datumList = JacksonUtils
.toObj(result.getData(), new TypeReference<List<JsonNode>>() {
});

for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {

Datum oldDatum = getDatum(datumJson.get("key").asText());

if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) {
Loggers.RAFT
.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.get("key").asText(),
datumJson.get("timestamp").asLong(), oldDatum.timestamp);
continue;
}

if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}

if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}

if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}

raftStore.write(newDatum);

datums.put(newDatum.key, newDatum);
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);

local.resetLeaderDue();

if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}

raftStore.updateTerm(local.term.get());

Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);

} catch (Throwable e) {
Loggers.RAFT
.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,
e);
} finally {
OPERATE_LOCK.unlock();
}
}
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);
}
return;
}

@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);
}

@Override
public void onCancel() {

}

});

batch.clear();

} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}

}

List<String> deadKeys = new ArrayList<>();
//10 将leader已经删除的key删除
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}

for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}

}

return local;
}
3.3 总结
  1. 首先是leader每隔一定时间会发送心跳给follower节点 心跳的数据主要包含key值和时间戳

  2. follower接收到心跳后 将与本地的key和时间戳做对比 将过期的数据重新向leader节点拉取一份更新到本地

  3. 如果本地的key值在leader发来的心跳中没有出现 说明这个key在leader节点被删除了 需要删除

4 cp架构的服务注册

4.1 注册流程

核心类:RaftConsistencyServiceImpl.java 核心方法:put


public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
//1 注册服务信息
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,
e);
}
}


public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
//1 判断节点是不是leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);

final RaftPeer leader = getLeader();
//2 如果不是leader 转发请求给leader
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}

OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}

ObjectNode json = JacksonUtils.createEmptyJsonNode();
json.replace("datum", JacksonUtils.transferToJsonNode(datum));
json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));
//3 将改动存储到本地文件并发布事件通知
onPublish(datum, peers.local());

final String content = json.toString();
//4 通知其他节点变更 满足半数以上即可
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT
.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, result.getCode());
return;
}
latch.countDown();
}

@Override
public void onError(Throwable throwable) {
Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);
}

@Override
public void onCancel() {

}
});

}
//5 不满足半数直接抛异常
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
// only majority servers return success can we consider this update success
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}

long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
4.2 总结
  1. 当客户端发送服务注册的消息时 如果发送到follower节点 follower节点会把消息转发给leader节点

  2. leader节点手动服务注册的消息时 首先将服务信息存储到本地文件 并且广播给follower节点存储服务信息

  3. 然后leader节点通知follower节点将服务信息存储提交 满足半数以上响应成功

举报

相关推荐

0 条评论