巴氏旅人 2018-01-11 01:34 采纳率: 0%
浏览 763
已结题

addStoredBlock的block != storedBlock的引用判断

Datanode.blockReceived时,最后会调用addStoredBlock方法,传入方法的数据块参数对象,和blocksMap中保存的数据块(即变量storedBlock),做了!=的判断(即判断对象的引用是否相同).问题是Block是通过远程传过来的,可能会与blocksMap的对象引用相同嘛?请授教

synchronized Block addStoredBlock(Block block,
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
if (storedBlock == null) {
// If we have a block in the block map with the same ID, but a different
// generation stamp, and the corresponding file is under construction,
// then we need to do some special processing.
storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block);

  if (storedBlock == null) {
    return rejectAddStoredBlock(
      block, node,
      "Block not in blockMap with any generation stamp");
  }

  INodeFile inode = storedBlock.getINode();
  if (inode == null) {
    return rejectAddStoredBlock(
      block, node,
      "Block does not correspond to any file");
  }

  boolean reportedOldGS = block.getGenerationStamp() < storedBlock.getGenerationStamp();
  boolean reportedNewGS = block.getGenerationStamp() > storedBlock.getGenerationStamp();
  boolean underConstruction = inode.isUnderConstruction();
  boolean isLastBlock = inode.getLastBlock() != null &&
    inode.getLastBlock().getBlockId() == block.getBlockId();

  // We can report a stale generation stamp for the last block under construction,
  // we just need to make sure it ends up in targets.
  if (reportedOldGS && !(underConstruction && isLastBlock)) {
    return rejectAddStoredBlock(
      block, node,
      "Reported block has old generation stamp but is not the last block of " +
      "an under-construction file. (current generation is " +
      storedBlock.getGenerationStamp() + ")");
  }

  // Don't add blocks to the DN when they're part of the in-progress last block
  // and have an inconsistent generation stamp. Instead just add them to targets
  // for recovery purposes. They will get added to the node when
  // commitBlockSynchronization runs
  if (underConstruction && isLastBlock && (reportedOldGS || reportedNewGS)) {
    NameNode.stateChangeLog.info(
      "BLOCK* NameSystem.addStoredBlock: "
      + "Targets updated: block " + block + " on " + node.getName() +
      " is added as a target for block " + storedBlock + " with size " +
      block.getNumBytes());
    ((INodeFileUnderConstruction)inode).addTarget(node);
    return block;
  }
}

INodeFile fileINode = storedBlock.getINode();
if (fileINode == null) {
  return rejectAddStoredBlock(
    block, node,
    "Block does not correspond to any file");
}
assert storedBlock != null : "Block must be stored by now";

// add block to the data-node
boolean added = node.addBlock(storedBlock);    


// Is the block being reported the last block of an underconstruction file?
boolean blockUnderConstruction = false;
if (fileINode.isUnderConstruction()) {
  INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
  Block last = fileINode.getLastBlock();
  if (last == null) {
    // This should never happen, but better to handle it properly than to throw
    // an NPE below.
    LOG.error("Null blocks for reported block=" + block + " stored=" + storedBlock +
      " inode=" + fileINode);
    return block;
  }
  blockUnderConstruction = last.equals(storedBlock);
}

// block == storedBlock when this addStoredBlock is the result of a block report
    //    有疑问的是这个判断
if (block != storedBlock) {
  if (block.getNumBytes() >= 0) {
    long cursize = storedBlock.getNumBytes();
    INodeFile file = storedBlock.getINode();
    if (cursize == 0) {
      storedBlock.setNumBytes(block.getNumBytes());
    } else if (cursize != block.getNumBytes()) {
      LOG.warn("Inconsistent size for block " + block + 
               " reported from " + node.getName() + 
               " current size is " + cursize +
               " reported size is " + block.getNumBytes());
      try {
        if (cursize > block.getNumBytes() && !blockUnderConstruction) {
          // new replica is smaller in size than existing block.
          // Mark the new replica as corrupt.
          LOG.warn("Mark new replica " + block + " from " + node.getName() + 
              "as corrupt because its length is shorter than existing ones");
          markBlockAsCorrupt(block, node);
        } else {
          // new replica is larger in size than existing block.
          if (!blockUnderConstruction) {
            // Mark pre-existing replicas as corrupt.
            int numNodes = blocksMap.numNodes(block);
            int count = 0;
            DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
            Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
            for (; it != null && it.hasNext();) {
              DatanodeDescriptor dd = it.next();
              if (!dd.equals(node)) {
                nodes[count++] = dd;
              }
            }
            for (int j = 0; j < count; j++) {
              LOG.warn("Mark existing replica "
                  + block
                  + " from "
                  + node.getName()
                  + " as corrupt because its length is shorter than the new one");
              markBlockAsCorrupt(block, nodes[j]);
            }
          }
          //
          // change the size of block in blocksMap
          //
          storedBlock.setNumBytes(block.getNumBytes());
        }
      } catch (IOException e) {
        LOG.warn("Error in deleting bad block " + block + e);
      }
    }

    //Updated space consumed if required.
    long diff = (file == null) ? 0 :
                (file.getPreferredBlockSize() - storedBlock.getNumBytes());

    if (diff > 0 && file.isUnderConstruction() &&
        cursize < storedBlock.getNumBytes()) {
      try {
        String path = /* For finding parents */ 
          leaseManager.findPath((INodeFileUnderConstruction)file);
        dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
      } catch (IOException e) {
        LOG.warn("Unexpected exception while updating disk space : " +
                 e.getMessage());
      }
    }
  }
  block = storedBlock;
}
assert storedBlock == block : "Block must be stored by now";

int curReplicaDelta = 0;

if (added) {
  curReplicaDelta = 1;
  // 
  // At startup time, because too many new blocks come in
  // they take up lots of space in the log file. 
  // So, we log only when namenode is out of safemode.
  //
  if (!isInSafeMode()) {
    NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
                                  +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
  }
} else {
  NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
                               + "Redundant addStoredBlock request received for " 
                               + block + " on " + node.getName()
                               + " size " + block.getNumBytes());
}

// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(storedBlock);
int numLiveReplicas = num.liveReplicas();
int numCurrentReplica = numLiveReplicas
  + pendingReplications.getNumReplicas(block);

// check whether safe replication is reached for the block
incrementSafeBlockCount(numCurrentReplica);

//
// if file is being actively written to, then do not check 
// replication-factor here. It will be checked when the file is closed.
//
if (blockUnderConstruction) {
  INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode;
  cons.addTarget(node);
  return block;
}

// do not handle mis-replicated blocks during startup
if(isInSafeMode())
  return block;

// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
if (numCurrentReplica >= fileReplication) {
  neededReplications.remove(block, numCurrentReplica, 
                            num.decommissionedReplicas, fileReplication);
} else {
  updateNeededReplications(block, curReplicaDelta, 0);
}
if (numCurrentReplica > fileReplication) {
  processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
}
// If the file replication has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block); 
int numCorruptNodes = num.corruptReplicas();
if ( numCorruptNodes != corruptReplicasCount) {
  LOG.warn("Inconsistent number of corrupt replicas for " + 
      block + "blockMap has " + numCorruptNodes + 
      " but corrupt replicas map has " + corruptReplicasCount);
}
if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) 
  invalidateCorruptReplicas(block);
return block;

}

  • 写回答

1条回答 默认 最新

  • devmiao 2018-01-11 15:47
    关注
    评论

报告相同问题?

悬赏问题

  • ¥15 错误 LNK2001 无法解析的外部符号
  • ¥50 安装pyaudiokits失败
  • ¥15 计组这些题应该咋做呀
  • ¥60 更换迈创SOL6M4AE卡的时候,驱动要重新装才能使用,怎么解决?
  • ¥15 让node服务器有自动加载文件的功能
  • ¥15 jmeter脚本回放有的是对的有的是错的
  • ¥15 r语言蛋白组学相关问题
  • ¥15 Python时间序列如何拟合疏系数模型
  • ¥15 求学软件的前人们指明方向🥺
  • ¥50 如何增强飞上天的树莓派的热点信号强度,以使得笔记本可以在地面实现远程桌面连接