当前有两种方式触发全量块汇报
- 通过命令行
./bin/hdfs dfsadmin -triggerBlockReport dnIp:50020
- 通过hdfs.site.xml参数控制
dfs.blockreport.intervalMsec 600000 Determines block reporting interval in milliseconds.
datanode---BPServiceActor
ListblockReport(long fullBrLeaseId) throws IOException { LOG.info("Trigger a full block report to " + nnAddr); final ArrayList cmds = new ArrayList (); // Flush any block information that precedes the block report. Otherwise // we have a chance that we will miss the delHint information // or we will report an RBW replica after the BlockReport already reports // a FINALIZED one. ibrManager.sendIBRs(bpNamenode, bpRegistration, bpos.getBlockPoolId(), dn.getMetrics()); long brCreateStartTime = monotonicNow(); Map perVolumeBlockLists = dn.getFSDataset().getBlockReports(bpos.getBlockPoolId()); // Convert the reports to the format expected by the NN. int i = 0; int totalBlockCount = 0; StorageBlockReport reports[] = new StorageBlockReport[perVolumeBlockLists.size()]; for (Map.Entry kvPair : perVolumeBlockLists.entrySet()) { BlockListAsLongs blockList = kvPair.getValue(); reports[i++] = new StorageBlockReport(kvPair.getKey(), blockList); totalBlockCount += blockList.getNumberOfBlocks(); } // Send the reports to the NN. int numReportsSent = 0; int numRPCs = 0; Boolean success = false; long brSendStartTime = monotonicNow(); long reportId = generateUniqueBlockReportId(); try { if (totalBlockCount < dnConf.blockReportSplitThreshold) { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports, new BlockReportContext(1, 0, reportId, fullBrLeaseId, true)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { cmds.add(cmd); } } else { // Send one block report per message. for (int r = 0; r < reports.length; r++) { StorageBlockReport singleReport[] = { reports[r] }; DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport, new BlockReportContext(reports.length, r, reportId, fullBrLeaseId, true)); numReportsSent++; numRPCs++; if (cmd != null) { cmds.add(cmd); } } } success = true; } finally { // Log the block report processing stats from Datanode perspective long brSendCost = monotonicNow() - brSendStartTime; long brCreateCost = brSendStartTime - brCreateStartTime; dn.getMetrics().addBlockReport(brSendCost); if (!success) { dn.getMetrics().incrBlockReportFailedNumOps(); } else { scheduler.lastBlockReportTime = Time.monotonicNow(); } if (scheduler.isFirstBlockReport) { synchronized (bpos) { if (dn.getFirstBlockReportLatency() != null && dn.getFirstBlockReportLatency().get(bpos.bpNSInfo.getBlockPoolID()) == null) { dn.getFirstBlockReportLatency().put( bpos.bpNSInfo.getBlockPoolID(), new HashMap ()); dn.getFirstBlockReportLatency().get(bpos.bpNSInfo.getBlockPoolID()) .put(nnAddr.toString(), monotonicNow() - actorStartTime); } } dn.getMetrics().addBlockReportsStartComplete(monotonicNow() - actorStartTime); scheduler.isFirstBlockReport = false; } final int nCmds = cmds.size(); LOG.info((success ? "S" : "Uns") + "uccessfully sent block report 0x" + long.toHexString(reportId) + ", containing " + reports.length + " storage report(s), of which we sent " + numReportsSent + "." + " The reports had " + totalBlockCount + " total blocks and used " + numRPCs + " RPC(s). This took " + brCreateCost + " msec to generate and " + brSendCost + " msecs for RPC and NN processing." + " Got back " + ((nCmds == 0) ? "no commands" : ((nCmds == 1) ? "one command: " + cmds.get(0) : (nCmds + " commands: " + Joiner.on("; ").join(cmds)))) + "."); } scheduler.scheduleNextBlockReport(); return cmds.size() == 0 ? null : cmds; }
全量块汇报的触发都是在BPService中的offerService.java中完成的
if(dnConf.requestBPLease) {
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
} else {
if (scheduler.isBlockReportDue(startTime) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
}
手动触发全量块汇报
手动命令行触发
} else if ("-triggerBlockReport".equals(cmd)) {
exitCode = triggerBlockReport(argv);
DFSAdmin.java
public int triggerBlockReport(String[] argv) throws IOException {
List args = new linkedList();
for (int j = 1; j < argv.length; j++) {
args.add(argv[j]);
}
Boolean incremental = StringUtils.popOption("-incremental", args);
String hostPort = StringUtils.popFirstNonOption(args);
if (hostPort == null) {
System.err.println("You must specify a host:port pair.");
return 1;
}
if (!args.isEmpty()) {
System.err.print("Can't understand arguments: " +
Joiner.on(" ").join(args) + "n");
return 1;
}
ClientDatanodeProtocol dnProxy = getDataNodeProxy(hostPort);
try {
dnProxy.triggerBlockReport(
new BlockReportOptions.Factory().
setIncremental(incremental).
build());
} catch (IOException e) {
System.err.println("triggerBlockReport error: " + e);
return 1;
}
System.out.println("Triggering " +
(incremental ? "an incremental " : "a full ") +
"block report on " + hostPort + ".");
return 0;
}
Datanode.java
@Override // ClientDatanodeProtocol
public void triggerBlockReport(BlockReportOptions options)
throws IOException {
checkSuperuserPrivilege();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
if (bpos != null) {
for (BPServiceActor actor : bpos.getBPServiceActors()) {
actor.triggerBlockReport(options);
}
}
}
}
BPService.java
强制触发全量块汇报(FBR)
void triggerBlockReport(BlockReportOptions options) {
if (options.isIncremental()) {
LOG.info(bpos.toString() + ": scheduling an incremental block report.");
ibrManager.triggerIBR(true);
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(ibrManager) {
scheduler.forceFullBlockReportNow();
ibrManager.notifyAll();
}
}
}
void forceFullBlockReportNow() {
forceFullBlockReport.set(true);
resetBlockReportTime = true;
}
判断是否需要全量FBR
boolean forceFullBr = scheduler.forceFullBlockReport.getAndSet(false);
回到刚刚的offerService.java
if(dnConf.requestBPLease) {
// 走forceFullBr == true的逻辑
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
} else {
if (scheduler.isBlockReportDue(startTime) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
}
间隔6小时自动触发
在DNConf中会读取全量块汇报的间隔配置
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
BPServiceActor
**
* Main loop for each BP thread. Run until shutdown,
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using"
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval
+ (lifelineSender != null ?
"; lifelineIntervalMs=" + dnConf.getLifelineIntervalMs() : ""));
long fullBlockReportLeaseId = 0;
//
// Now loop for a long time....
//
while (shouldRun()) {
try {
final long startTime = scheduler.monotonicNow();
//
// Every so often, send heartbeat or block-report
//
final Boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
// All heartbeat messages include following info:
// -- Datanode name
// -- data transfer port
// -- Total capacity
// -- Bytes remaining
//
Boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
if (!dn.areHeartbeatsDisabledForTests()) {
try {
dn.getMetrics().addHeartbeatInterval(scheduler.getHeartbeatInterval());
resp = sendHeartBeat(requestBlockReportLease);
scheduler.setLastHeartbeatTime();
}
catch (IOException e) {
dn.getMetrics().incrFailedHeartBeatsINumOps();
throw e;
}
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
if (fullBlockReportLeaseId != 0) {
LOG.warn(nnAddr + " sent back a full block report lease " +
"ID of 0x" +
long.toHexString(resp.getFullBlockReportLeaseId()) +
", but we already have a lease ID of 0x" +
long.toHexString(fullBlockReportLeaseId) + ". " +
"Overwriting old lease ID.");
}
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
//
// important that this happens before processCommand below,
// since the first heartbeat to a new active might have commands
// that we should actually process.
bpos.updateActorStatesFromHeartbeat(
this, resp.getNameNodeHaState());
state = resp.getNameNodeHaState().getState();
if (state == HAServiceState.ACTIVE) {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = monotonicNow();
if (dnProcessCmdAsynchronouslyEnabled) {
commandProcessingThread.enqueue(resp.getCommands());
} else {
if (!processCommand(resp.getCommands()))
continue;
}
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
+ " commands from NN");
}
}
}
if (ibrManager.sendImmediately() || sendHeartbeat) {
ibrManager.sendIBRs(bpNamenode, bpRegistration,
bpos.getBlockPoolId(), dn.getMetrics());
}
List cmds = null;
Boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcing a full block report to " + nnAddr);
}
if(dnConf.requestBPLease) {
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
} else {
if (scheduler.isBlockReportDue(startTime) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
}
if (dnProcessCmdAsynchronouslyEnabled) {
commandProcessingThread.enqueue(cmds);
} else {
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
}
if (!dn.areCacheReportsDisabledForTests()) {
DatanodeCommand cmd = cacheReport();
if (dnProcessCmdAsynchronouslyEnabled) {
commandProcessingThread.enqueue(cmd);
} else {
processCommand(new DatanodeCommand[]{
cmd
}
);
}
}
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
ibrManager.waitTillNextIBR(scheduler.getHeartbeatWaitTime());
}
catch(RemoteException re) {
String reClass = re.getClassName();
if (UnregisteredNodeException.class.getName().equals(reClass) ||
DisallowedDatanodeException.class.getName().equals(reClass) ||
IncorrectVersionException.class.getName().equals(reClass)) {
LOG.warn(this + " is shutting down", re);
shouldServiceRun = false;
return;
}
LOG.warn("RemoteException in offerService", re);
sleepAfterException();
}
catch (IOException e) {
LOG.warn("IOException in offerService", e);
sleepAfterException();
}
finally {
// 记录长时间没进行全量汇报的Metric
if (Time.monotonicNow() - scheduler.lastBlockReportTime >
(dnConf.blockReportInterval + TimeUnit.HOURS.toMillis(1))) {
dn.getMetrics().incrStaleBlockReportNumOps();
}
}
processQueueMessages();
}
// while (shouldRun())
}
// offerService
还是这一段
if(dnConf.requestBPLease) {
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
} else {
// 走配置定时触发
if (scheduler.isBlockReportDue(startTime) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
}
判断下一次的全量块汇报的时间比当前时间小(早),说明该开始全量块汇报了
Boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
}
计算nextBlockReportTime(每次blockReport都会计算下一次要report的时间)
void scheduleNextBlockReport() {
// If we have sent the first set of block reports, then wait a random
// time before we start the periodic block reports.
if (resetBlockReportTime) {
nextBlockReportTime = monotonicNow() +
DFSUtil.getRandom().nextint((int)(blockReportIntervalMs));
resetBlockReportTime = false;
} else {
nextBlockReportTime +=
(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
blockReportIntervalMs)) * blockReportIntervalMs;
}
}
初始化Schedule的时候会赋值配置文件里的或者默认值
DFSConfigKeys.java
public static final String DFS_BLOCKREPORT_INTERVAL_MSEC_KEY = "dfs.blockreport.intervalMsec"; public static final long DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT = 6 * 60 * 60 * 1000;
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
long blockReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
}
scheduler = new Scheduler(dnConf.heartBeatInterval,
dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);



