
版本:0.23.11
DataNode.initDataXceiver
private void initDataXceiver(Configuration conf) throws IOException {
InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
// find free port or use privileged port provided
/*
dn 创建ServerSocket 绑定到dn服务端口,用于数据传输{dfs.datanode.address}
*/
ServerSocket ss;
if(secureResources == null) {
ss = (dnConf.socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
ss = secureResources.getStreamingSocket();
}
ss.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); // 设置Socket 接收缓存区的大小,适用于所有从accept 返回的Socket 对象
// adjust machine name with the actual port
int tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
LOG.info("Opened info server at " + tmpPort);
/*
1. 创建线程组 dataXceiverServer
2. 创建DataXceiverServer
3. 将该线程组内线程设置为守护线程(简化dn 对threadGroup 的管理)
*/
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
}
DataXceiverServer
/**
* Server used for receiving/sending a block of data.
* This is created to listen for requests from clients or
* other DataNodes. This small server does not use the
* Hadoop IPC mechanism.
*/
class DataXceiverServer implements Runnable {
public static final Log LOG = DataNode.LOG;
ServerSocket ss;
DataNode datanode;
// Record all sockets opened for data transfer
Set<Socket> childSockets = Collections.synchronizedSet(
new HashSet<Socket>());
/**
* Maximal number of concurrent xceivers per node.
* Enforcing the limit is required in order to avoid data-node
* running out of memory.
*/
// dn stream interface 能支持的最大ceiver
// 1.x {dfs.datanode.max.xcievers}
// 2.x {dfs.datanode.max.transfer.threads}
int maxXceiverCount =
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
/** A manager to make sure that cluster balancing does not
* take too much resources.
*
* It limits the number of block moves for balancing and
* the total amount of bandwidth they can use.
*/
static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads;
/**Constructor
*
* @param bandwidth Total amount of bandwidth can be used for balancing
*/
private BlockBalanceThrottler(long bandwidth) {
super(bandwidth);
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
}
/** Check if the block move can start.
*
* Return true if the thread quota is not exceeded and
* the counter is incremented; False otherwise.
*/
synchronized boolean acquire() {
if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
return false;
}
numThreads++;
return true;
}
/** Mark that the move is completed. The thread counter is decremented. */
synchronized void release() {
numThreads--;
}
}
BlockBalanceThrottler balanceThrottler;
/**
* We need an estimate for block size to check if the disk partition has
* enough space. For now we set it to be the default block size set
* in the server side configuration, which is not ideal because the
* default block size should be a client-size configuration.
* A better solution is to include in the header the estimated block size,
* i.e. either the actual block size or the default block size.
*/
long estimateBlockSize;
DataXceiverServer(ServerSocket ss, Configuration conf,
DataNode datanode) {
this.ss = ss;
this.datanode = datanode;
this.maxXceiverCount =
conf.getInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT);
this.estimateBlockSize = conf.getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
//set up parameter for cluster balancing
this.balanceThrottler = new BlockBalanceThrottler(
conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
}
@Override
public void run() {
while (datanode.shouldRun) {
Socket s = null;
try {
// ss等待客户端连接
s = ss.accept();
s.setTcpNoDelay(true);
// Timeouts are set within DataXceiver.run()
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > maxXceiverCount) {
throw new IOException("Xceiver count " + curXceiverCount
+ " exceeds the limit of concurrent xcievers: "
+ maxXceiverCount);
}
// 创建DataXceiver 对象,服务ceiver
new Daemon(datanode.threadGroup,
DataXceiver.create(s, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
// another thread closed our listener socket - that's expected during shutdown,
// but not in other circumstances
if (datanode.shouldRun) {
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
IOUtils.closeSocket(s);
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
} catch (OutOfMemoryError ie) {
IOUtils.closeSocket(s);
// DataNode can run out of memory if there is too many transfers.
// Log the event, Sleep for 30 seconds, other transfers may complete by
// then.
LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);
try {
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
// ignore
}
} catch (Throwable te) {
LOG.error(datanode.getMachineName()
+ ":DataXceiverServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.getMachineName()
+ " :DataXceiverServer: close exception", ie);
}
}
void kill() {
assert datanode.shouldRun == false :
"shoudRun should be set to false before killing";
try {
this.ss.close();
} catch (IOException ie) {
LOG.warn(datanode.getMachineName() + ":DataXceiverServer.kill(): ", ie);
}
// close all the sockets that were accepted earlier
synchronized (childSockets) {
for (Iterator<Socket> it = childSockets.iterator();
it.hasNext();) {
Socket thissock = it.next();
try {
thissock.close();
} catch (IOException e) {
}
}
}
}
}
DataXceiver
/**
* Thread for processing incoming/outgoing data stream.
*/
class DataXceiver extends Receiver implements Runnable {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private final Socket s;
private final boolean isLocal; //is a local connection?
private final String remoteAddress; // address of remote side
private final String localAddress; // local address of this daemon
private final DataNode datanode;
private final DNConf dnConf;
private final DataXceiverServer dataXceiverServer;
private long opStartTime; //the start time of receiving an Op
private final SocketInputWrapper socketInputWrapper;
public static DataXceiver create(Socket s, DataNode dn,
DataXceiverServer dataXceiverServer) throws IOException {
SocketInputWrapper iw = NetUtils.getInputStream(s);
return new DataXceiver(s, iw, dn, dataXceiverServer);
}
private DataXceiver(Socket s,
SocketInputWrapper socketInput,
DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
super(new DataInputStream(new BufferedInputStream(
socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
this.s = s;
this.socketInputWrapper = socketInput;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dnConf = datanode.getDnConf();
this.dataXceiverServer = dataXceiverServer;
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
}
}
/**
* Update the current thread's name to contain the current status.
* Use this only after this receiver has started on its thread, i.e.,
* outside the constructor.
*/
private void updateCurrentThreadName(String status) {
StringBuilder sb = new StringBuilder();
sb.append("DataXceiver for client ").append(remoteAddress);
if (status != null) {
sb.append(" [").append(status).append("]");
}
Thread.currentThread().setName(sb.toString());
}
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
/**
* Read/write data from/to the DataXceiverServer.
*/
public void run() {
int opsProcessed = 0;
Op op = null;
dataXceiverServer.childSockets.add(s);
try {
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
} else {
socketInputWrapper.setTimeout(dnConf.socketTimeout);
}
/*
通过readOp 方法进行版本检查
*/
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here.
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
}
} else {
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed != 0) {
s.setSoTimeout(dnConf.socketTimeout);
}
opStartTime = now();
processOp(op);
++opsProcessed;
} while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0);
} catch (Throwable t) {
LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
((op == null) ? "unknown" : op.name()) + " operation " +
" src: " + remoteAddress +
" dest: " + localAddress, t);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
}
}
@Override
public void readBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length) throws IOException {
OutputStream baseStream = NetUtils.getOutputStream(s,
dnConf.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(out, true, block, blockToken,
Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(block.getBlockPoolId());
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"%d", "HDFS_READ", clientName, "%d",
dnR.getStorageID(), block, "%d")
: dnR + " Served block " + block + " to " +
remoteAddress;
updateCurrentThreadName("Sending block " + block);
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, datanode, clientTraceFmt);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
throw e;
}
// send op status
writeSuccessWithChecksumInfo(blockSender,
getStreamWithTimeout(s, dnConf.socketWriteTimeout)); // 成功应答
long read = blockSender.sendBlock(out, baseStream, null); // 发送数据
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client
// to respond with a Status enum.
try {
ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
HdfsProtoUtil.vintPrefixed(in));
if (!stat.hasStatus()) {
LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
"code after reading. Will close connection.");
IOUtils.closeStream(out);
}
} catch (IOException ioe) {
LOG.debug("Error reading client status response. Will close connection.", ioe);
IOUtils.closeStream(out);
}
} else {
IOUtils.closeStream(out);
}
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
} catch ( SocketException ignored ) {
if (LOG.isTraceEnabled()) {
LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
remoteAddress, ignored);
}
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
IOUtils.closeStream(out);
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
*/
LOG.warn(dnR + ":Got exception while serving " + block + " to "
+ remoteAddress, ioe);
throw ioe;
} finally {
IOUtils.closeStream(blockSender);
}
//update metrics
datanode.metrics.addReadBlockOp(elapsed());
datanode.metrics.incrReadsFromClient(isLocal);
}
@Override
public void writeBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
+ Arrays.asList(targets));
}
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
);
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
" tcp no delay " + s.getTcpNoDelay());
}
// We later mutate block's generation stamp and length, but we need to
// forward the original version of the block to downstream mirrors, so
// make a copy here.
final ExtendedBlock originalBlock = new ExtendedBlock(block);
block.setNumBytes(dataXceiverServer.estimateBlockSize);
LOG.info("Receiving block " + block +
" src: " + remoteAddress +
" dest: " + localAddress);
// reply to upstream datanode or client
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout),
HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
//
// Connect to downstream machine, if appropriate
//
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
mirrorNode = targets[0].getName();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
HdfsConstants.SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
if (isClient) {
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
firstBadLink);
}
}
} catch (IOException e) {
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
.setFirstBadLink(mirrorNode)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode + ": " + e);
throw e;
} else {
LOG.info(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode +
". continuing without the mirror.", e);
}
}
}
// send connect-ack to source for clients and not transfer-RBW/Finalized
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets);
// send close-ack for transfer-RBW/Finalized
if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
writeResponse(SUCCESS, null, replyOut);
}
}
// update its generation stamp
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
// if this write is for a replication request or recovering
// a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
LOG.info("Received block " + block +
" src: " + remoteAddress +
" dest: " + localAddress +
" of size " + block.getNumBytes());
}
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
// close all opened streams
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
}
//update metrics
datanode.metrics.addWriteBlockOp(elapsed());
datanode.metrics.incrWritesFromClient(isLocal);
}
@Override
public void transferBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final DatanodeInfo[] targets) throws IOException {
checkAccess(null, true, blk, blockToken,
Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
}
}
@Override
public void blockChecksum(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
final MetaDataInputStream metadataIn =
datanode.data.getMetaDataInputStream(block);
final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE));
updateCurrentThreadName("Getting checksum for block " + block);
try {
//read metadata file
final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
final DataChecksum checksum = header.getChecksum();
final int bytesPerCRC = checksum.getBytesPerChecksum();
final long crcPerBlock = (metadataIn.getLength()
- BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize();
//compute block checksum
final MD5Hash md5 = MD5Hash.digest(checksumIn);
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
+ ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
}
//write reply
BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
.setBytesPerCrc(bytesPerCRC)
.setCrcPerBlock(crcPerBlock)
.setMd5(ByteString.copyFrom(md5.getDigest()))
.setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
)
.build()
.writeDelimitedTo(out);
out.flush();
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(metadataIn);
}
//update metrics
datanode.metrics.addBlockChecksumOp(elapsed());
}
@Override
public void copyBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
updateCurrentThreadName("Copying block " + block);
// Read in the header
if (datanode.isBlockTokenEnabled) {
try {
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
BlockTokenSecretManager.AccessMode.COPY);
} catch (InvalidToken e) {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", dnConf.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to copy block " + block.getBlockId() + " to "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.info(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
return;
}
BlockSender blockSender = null;
DataOutputStream reply = null;
boolean isOpSuccess = true;
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, datanode,
null);
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
s, dnConf.socketWriteTimeout);
reply = new DataOutputStream(new BufferedOutputStream(
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first
writeSuccessWithChecksumInfo(blockSender, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
isOpSuccess = false;
LOG.info("opCopyBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
dataXceiverServer.balanceThrottler.release();
if (isOpSuccess) {
try {
// send one last byte to indicate that the resource is cleaned.
reply.writeChar('d');
} catch (IOException ignored) {
}
}
IOUtils.closeStream(reply);
IOUtils.closeStream(blockSender);
}
//update metrics
datanode.metrics.addCopyBlockOp(elapsed());
}
@Override
public void replaceBlock(final ExtendedBlock block,
final Token<BlockTokenIdentifier> blockToken,
final String delHint,
final DatanodeInfo proxySource) throws IOException {
updateCurrentThreadName("Replacing block " + block + " from " + delHint);
/* read header */
block.setNumBytes(dataXceiverServer.estimateBlockSize);
if (datanode.isBlockTokenEnabled) {
try {
datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block,
BlockTokenSecretManager.AccessMode.REPLACE);
} catch (InvalidToken e) {
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
dnConf.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
String msg = "Not able to receive block " + block.getBlockId() + " from "
+ s.getRemoteSocketAddress() + " because threads quota is exceeded.";
LOG.warn(msg);
sendResponse(s, ERROR, msg, dnConf.socketWriteTimeout);
return;
}
Socket proxySock = null;
DataOutputStream proxyOut = null;
Status opStatus = SUCCESS;
String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
try {
// get the output stream to the proxy
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
proxySource.getName());
proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setSoTimeout(dnConf.socketTimeout);
OutputStream baseStream = NetUtils.getOutputStream(proxySock,
dnConf.socketWriteTimeout);
proxyOut = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE));
/* send request to the proxy */
new Sender(proxyOut).copyBlock(block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), HdfsConstants.IO_FILE_BUFFER_SIZE));
BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
HdfsProtoUtil.vintPrefixed(proxyReply));
if (copyResponse.getStatus() != SUCCESS) {
if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
}
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed");
}
// get checksum info about the block we're copying
ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
dataXceiverServer.balanceThrottler, null);
// notify name node
datanode.notifyNamenodeReceivedBlock(block, delHint);
LOG.info("Moved block " + block +
" from " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
opStatus = ERROR;
errMsg = "opReplaceBlock " + block + " received exception " + ioe;
LOG.info(errMsg);
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
if (opStatus == SUCCESS) {
try {
proxyReply.readChar();
} catch (IOException ignored) {
}
}
// now release the thread resource
dataXceiverServer.balanceThrottler.release();
// send response back
try {
sendResponse(s, opStatus, errMsg, dnConf.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
IOUtils.closeStream(proxyOut);
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
}
//update metrics
datanode.metrics.addReplaceBlockOp(elapsed());
}
private long elapsed() {
return now() - opStartTime;
}
/**
* Utility function for sending a response.
* @param s socket to write to
* @param opStatus status message to write
* @param timeout send timeout
**/
private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
DataOutputStream reply = getStreamWithTimeout(s, timeout);
writeResponse(status, message, reply);
}
private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
throws IOException {
return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
}
private static void writeResponse(Status status, String message, OutputStream out)
throws IOException {
BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
.setStatus(status);
if (message != null) {
response.setMessage(message);
}
response.build().writeDelimitedTo(out);
out.flush();
}
private void writeSuccessWithChecksumInfo(BlockSender blockSender,
DataOutputStream out) throws IOException {
ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
.setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
.setChunkOffset(blockSender.getOffset())
.build();
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
.setStatus(SUCCESS)
.setReadOpChecksumInfo(ckInfo)
.build();
response.writeDelimitedTo(out);
out.flush();
}
private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
final Op op,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (datanode.isBlockTokenEnabled) {
try {
datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode);
} catch(InvalidToken e) {
try {
if (reply) {
if (out == null) {
out = new DataOutputStream(
NetUtils.getOutputStream(s, dnConf.socketWriteTimeout));
}
BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
.setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
resp.setFirstBadLink(dnR.getName());
}
resp.build().writeDelimitedTo(out);
out.flush();
}
LOG.warn("Block token verification failed: op=" + op
+ ", remoteAddress=" + remoteAddress
+ ", message=" + e.getLocalizedMessage());
throw e;
} finally {
IOUtils.closeStream(out);
}
}
}
}
}










