Datanode.blockReceived时,最后会调用addStoredBlock方法,传入方法的数据块参数对象,和blocksMap中保存的数据块(即变量storedBlock),做了!=的判断(即判断对象的引用是否相同).问题是Block是通过远程传过来的,可能会与blocksMap的对象引用相同嘛?请授教
synchronized Block addStoredBlock(Block block,
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null) {
// If we have a block in the block map with the same ID, but a different
// generation stamp, and the corresponding file is under construction,
// then we need to do some special processing.
storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block);
if (storedBlock == null) {
return rejectAddStoredBlock(
block, node,
"Block not in blockMap with any generation stamp");
}
INodeFile inode = storedBlock.getINode();
if (inode == null) {
return rejectAddStoredBlock(
block, node,
"Block does not correspond to any file");
}
boolean reportedOldGS = block.getGenerationStamp() < storedBlock.getGenerationStamp();
boolean reportedNewGS = block.getGenerationStamp() > storedBlock.getGenerationStamp();
boolean underConstruction = inode.isUnderConstruction();
boolean isLastBlock = inode.getLastBlock() != null &&
inode.getLastBlock().getBlockId() == block.getBlockId();
// We can report a stale generation stamp for the last block under construction,
// we just need to make sure it ends up in targets.
if (reportedOldGS && !(underConstruction && isLastBlock)) {
return rejectAddStoredBlock(
block, node,
"Reported block has old generation stamp but is not the last block of " +
"an under-construction file. (current generation is " +
storedBlock.getGenerationStamp() + ")");
}
// Don't add blocks to the DN when they're part of the in-progress last block
// and have an inconsistent generation stamp. Instead just add them to targets
// for recovery purposes. They will get added to the node when
// commitBlockSynchronization runs
if (underConstruction && isLastBlock && (reportedOldGS || reportedNewGS)) {
NameNode.stateChangeLog.info(
"BLOCK* NameSystem.addStoredBlock: "
+ "Targets updated: block " + block + " on " + node.getName() +
" is added as a target for block " + storedBlock + " with size " +
block.getNumBytes());
((INodeFileUnderConstruction)inode).addTarget(node);
return block;
}
}
INodeFile fileINode = storedBlock.getINode();
if (fileINode == null) {
return rejectAddStoredBlock(
block, node,
"Block does not correspond to any file");
}
assert storedBlock != null : "Block must be stored by now";
// add block to the data-node
boolean added = node.addBlock(storedBlock);
// Is the block being reported the last block of an underconstruction file?
boolean blockUnderConstruction = false;
if (fileINode.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
Block last = fileINode.getLastBlock();
if (last == null) {
// This should never happen, but better to handle it properly than to throw
// an NPE below.
LOG.error("Null blocks for reported block=" + block + " stored=" + storedBlock +
" inode=" + fileINode);
return block;
}
blockUnderConstruction = last.equals(storedBlock);
}
// block == storedBlock when this addStoredBlock is the result of a block report
// 有疑问的是这个判断
if (block != storedBlock) {
if (block.getNumBytes() >= 0) {
long cursize = storedBlock.getNumBytes();
INodeFile file = storedBlock.getINode();
if (cursize == 0) {
storedBlock.setNumBytes(block.getNumBytes());
} else if (cursize != block.getNumBytes()) {
LOG.warn("Inconsistent size for block " + block +
" reported from " + node.getName() +
" current size is " + cursize +
" reported size is " + block.getNumBytes());
try {
if (cursize > block.getNumBytes() && !blockUnderConstruction) {
// new replica is smaller in size than existing block.
// Mark the new replica as corrupt.
LOG.warn("Mark new replica " + block + " from " + node.getName() +
"as corrupt because its length is shorter than existing ones");
markBlockAsCorrupt(block, node);
} else {
// new replica is larger in size than existing block.
if (!blockUnderConstruction) {
// Mark pre-existing replicas as corrupt.
int numNodes = blocksMap.numNodes(block);
int count = 0;
DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
for (; it != null && it.hasNext();) {
DatanodeDescriptor dd = it.next();
if (!dd.equals(node)) {
nodes[count++] = dd;
}
}
for (int j = 0; j < count; j++) {
LOG.warn("Mark existing replica "
+ block
+ " from "
+ node.getName()
+ " as corrupt because its length is shorter than the new one");
markBlockAsCorrupt(block, nodes[j]);
}
}
//
// change the size of block in blocksMap
//
storedBlock.setNumBytes(block.getNumBytes());
}
} catch (IOException e) {
LOG.warn("Error in deleting bad block " + block + e);
}
}
//Updated space consumed if required.
long diff = (file == null) ? 0 :
(file.getPreferredBlockSize() - storedBlock.getNumBytes());
if (diff > 0 && file.isUnderConstruction() &&
cursize < storedBlock.getNumBytes()) {
try {
String path = /* For finding parents */
leaseManager.findPath((INodeFileUnderConstruction)file);
dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
} catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space : " +
e.getMessage());
}
}
}
block = storedBlock;
}
assert storedBlock == block : "Block must be stored by now";
int curReplicaDelta = 0;
if (added) {
curReplicaDelta = 1;
//
// At startup time, because too many new blocks come in
// they take up lots of space in the log file.
// So, we log only when namenode is out of safemode.
//
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
}
} else {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
+ "Redundant addStoredBlock request received for "
+ block + " on " + node.getName()
+ " size " + block.getNumBytes());
}
// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
+ pendingReplications.getNumReplicas(block);
// check whether safe replication is reached for the block
incrementSafeBlockCount(numCurrentReplica);
//
// if file is being actively written to, then do not check
// replication-factor here. It will be checked when the file is closed.
//
if (blockUnderConstruction) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
cons.addTarget(node);
return block;
}
// do not handle mis-replicated blocks during startup
if(isInSafeMode())
return block;
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
if (numCurrentReplica >= fileReplication) {
neededReplications.remove(block, numCurrentReplica,
num.decommissionedReplicas, fileReplication);
} else {
updateNeededReplications(block, curReplicaDelta, 0);
}
if (numCurrentReplica > fileReplication) {
processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
}
// If the file replication has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
int numCorruptNodes = num.corruptReplicas();
if ( numCorruptNodes != corruptReplicasCount) {
LOG.warn("Inconsistent number of corrupt replicas for " +
block + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
invalidateCorruptReplicas(block);
return block;
}