【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3
public static final String EDGESTORE_NAME = "edgestore"; public static final String INDEXSTORE_NAME = "graphindex"; public static final String METRICS_STOREMANAGER_NAME = "storeManager"; public static final String METRICS_MERGED_STORE = "stores"; public static final String METRICS_MERGED_CACHE = "caches"; public static final String METRICS_CACHE_SUFFIX = ".cache"; public static final String LOCK_STORE_SUFFIX = "_lock_"; public static final String SYSTEM_TX_LOG_NAME = "txlog"; public static final String SYSTEM_MGMT_LOG_NAME = "systemlog"; public static final double EDGESTORE_CACHE_PERCENT = 0.8; public static final double INDEXSTORE_CACHE_PERCENT = 0.2; private static final long ETERNAL_CACHE_EXPIRATION = 1000L *3600*24*365*200; //200 years public static final int THREAD_POOL_SIZE_SCALE_FACTOR = 2; private final KeyColumnValueStoreManager storeManager; private final KeyColumnValueStoreManager storeManagerLocking; private final StoreFeatures storeFeatures; private KCVSCache edgeStore; private KCVSCache indexStore; private KCVSCache txLogStore; private IDAuthority idAuthority; private KCVSConfiguration systemConfig; private KCVSConfiguration userConfig; private boolean hasAttemptedClose; private final StandardScanner scanner; private final KCVSLogManager managementLogManager; private final KCVSLogManager txLogManager; private final LogManager userLogManager; private final Map12,BypassMergeSortShuffleWriterindexes; private final int bufferSize; private final Duration maxWriteTime; private final Duration maxReadTime; private final boolean cacheEnabled; private final ExecutorService threadPool; private final Function lockerCreator; private final ConcurrentHashMap lockers = new ConcurrentHashMap<>(); private final Configuration configuration; public Backend(Configuration configuration) { this.configuration = configuration; //获取 //此处获取的对象是:HbaseStoreManeger,数据存储管理对象 //KeyColumnValueStoreManager provides the persistence context to the graph database storage backend. KeyColumnValueStoreManager manager = getStorageManager(configuration); if (configuration.get(BASIC_METRICS)) { storeManager = new MetricInstrumentedStoreManager(manager,METRICS_STOREMANAGER_NAME,configuration.get(METRICS_MERGE_STORES),METRICS_MERGED_STORE); } else { storeManager = manager; } //获取索引对象,如果是es对象,则获取org.janusgraph.diskstorage.es.ElasticSearchIndex 对象; //如果该ES开启kerberos认证,则会出现认证不过的偶现问题; indexes = getIndexes(configuration); storeFeatures = storeManager.getFeatures(); managementLogManager = getKCVSLogManager(MANAGEMENT_LOG); txLogManager = getKCVSLogManager(TRANSACTION_LOG); userLogManager = getLogManager(USER_LOG); cacheEnabled = !configuration.get(STORAGE_BATCH) && configuration.get(DB_CACHE); int bufferSizeTmp = configuration.get(BUFFER_SIZE); Preconditions.checkArgument(bufferSizeTmp > 0, "Buffer size must be positive"); if (!storeFeatures.hasBatchMutation()) { bufferSize = Integer.MAX_VALUE; } else bufferSize = bufferSizeTmp; maxWriteTime = configuration.get(STORAGE_WRITE_WAITTIME); maxReadTime = configuration.get(STORAGE_READ_WAITTIME); if (!storeFeatures.hasLocking()) { Preconditions.checkArgument(storeFeatures.isKeyConsistent(),"Store needs to support some form of locking"); storeManagerLocking = new ExpectedValueCheckingStoreManager(storeManager,LOCK_STORE_SUFFIX,this,maxReadTime); } else { storeManagerLocking = storeManager; } if (configuration.get(PARALLEL_BACKEND_OPS)) { int poolSize = Runtime.getRuntime().availableProcessors() * THREAD_POOL_SIZE_SCALE_FACTOR; threadPool = Executors.newFixedThreadPool(poolSize); log.info("Initiated backend operations thread pool of size {}", poolSize); } else { threadPool = null; } final String lockBackendName = configuration.get(LOCK_BACKEND); if (REGISTERED_LOCKERS.containsKey(lockBackendName)) { lockerCreator = REGISTERED_LOCKERS.get(lockBackendName); } else { throw new JanusGraphConfigurationException("Unknown lock backend "" + lockBackendName + "". Known lock backends: " + Joiner.on(", ").join(REGISTERED_LOCKERS.keySet()) + "."); } // Never used for backends that have innate transaction support, but we // want to maintain the non-null invariant regardless; it will default // to consistent-key implementation if none is specified Preconditions.checkNotNull(lockerCreator); scanner = new StandardScanner(storeManager); }
@Override public void write(Iterator13:DiskBlockObjectWriter> records) throws IOException { assert (partitionWriters == null); //判断是否有数据 if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); //创建磁盘写入对象 partitionWriters = new DiskBlockObjectWriter[numPartitions]; //创建文件块写入对象;每个 ShuffleBlock 被称为 FileSegment,存储shuffle产生的数据 partitionWriterSegments = new FileSegment[numPartitions]; for (int i = 0; i < numPartitions; i++) { final Tuple2 tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); //获取数据写入文件路径:本地磁盘 final File file = tempShuffleBlockIdPlusFile._2(); //获取block块id,UUID保证每个block块的唯一性 final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); while (records.hasNext()) { //遍历待写入的每一条数据;此处:[v[id],v[]] final Product2 record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { final DiskBlockObjectWriter writer = partitionWriters[i]; partitionWriterSegments[i] = writer.commitAndGet(); writer.close(); } //写入到磁盘文件路径:(e.g:/data/5/hadoop/yarn/local/usercache/graph/appcache/) File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); File tmp = Utils.tempFileWith(output); try { //数据写入磁盘 partitionLengths = writePartitionedFile(tmp); //将block文件index写入,driver端的blockmanager;index记载文件数据,其实pos,长度:length //数据文件区段:文件路径,其实位置,长度 shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); }
def commitAndGet(): FileSegment = {
if (streamOpen) {
// NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
objOut.close()
streamOpen = false
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incWriteTime(System.nanoTime() - start)
}
//该数据文件中,数据的其实位置
val pos = channel.position()
//数据文件区段:文件路径,其实位置,长度
val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
committedPosition = pos
// In certain compression codecs, more bytes are written after streams are closed
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
numRecordsWritten = 0
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
}
}
14:IndexShuffleBlockResolver
def writeIndexFileAndCommit(
shuffleId: Int,
mapId: Int,
lengths: Array[Long],
dataTmp: File): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val indexTmp = Utils.tempFileWith(indexFile)
try {
val dataFile = getDataFile(shuffleId, mapId)
// There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
// the following check and rename are atomic.
synchronized {
val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
if (existingLengths != null) {
// Another attempt for the same task has already written our map outputs successfully,
// so just use the existing partition lengths and delete our temporary map outputs.
System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
if (dataTmp != null && dataTmp.exists()) {
dataTmp.delete()
}
} else {
// This is the first successful attempt in writing the map outputs for this task,
// so override any existing index and data files with the ones we wrote.
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
} {
out.close()
}
if (indexFile.exists()) {
indexFile.delete()
}
if (dataFile.exists()) {
dataFile.delete()
}
if (!indexTmp.renameTo(indexFile)) {
throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
}
if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
}
}
}
} finally {
if (indexTmp.exists() && !indexTmp.delete()) {
logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
}
}
}
15:Executor:
//执行task任务,并将task任务信息写入监控,更新任务状态
override def run(): Unit = {
val startTimeMs = System.currentTimeMillis()
def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs > killTimeoutMs
try {
// only attempt to kill the task once. If interruptThread = false then a second kill
// attempt would be a no-op and if interruptThread = true then it may not be safe or
// effective to interrupt multiple times:
taskRunner.kill(interruptThread = interruptThread, reason = reason)
// Monitor the killed task until it exits. The synchronization logic here is complicated
// because we don't want to synchronize on the taskRunner while possibly taking a thread
// dump, but we also need to be careful to avoid races between checking whether the task
// has finished and wait()ing for it to finish.
var finished: Boolean = false
while (!finished && !timeoutExceeded()) {
taskRunner.synchronized {
// We need to synchronize on the TaskRunner while checking whether the task has
// finished in order to avoid a race where the task is marked as finished right after
// we check and before we call wait().
if (taskRunner.isFinished) {
finished = true
} else {
taskRunner.wait(killPollingIntervalMs)
}
}
if (taskRunner.isFinished) {
finished = true
} else {
logWarning(s"Killed task $taskId is still running after $elapsedTimeMs ms")
if (takeThreadDump) {
try {
Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread =>
if (thread.threadName == taskRunner.threadName) {
logWarning(s"Thread dump from task $taskId:n${thread.stackTrace}")
}
}
} catch {
case NonFatal(e) =>
logWarning("Exception thrown while obtaining thread dump: ", e)
}
}
}
}
if (!taskRunner.isFinished && timeoutExceeded()) {
if (isLocal) {
logError(s"Killed task $taskId could not be stopped within $killTimeoutMs ms; " +
"not killing JVM because we are running in local mode.")
} else {
// In non-local-mode, the exception thrown here will bubble up to the uncaught exception
// handler and cause the executor JVM to exit.
throw new SparkException(
s"Killing executor JVM because killed task $taskId could not be stopped within " +
s"$killTimeoutMs ms.")
}
}
} finally {
// Clean up entries in the taskReaperForTask map.
taskReaperForTask.synchronized {
taskReaperForTask.get(taskId).foreach { taskReaperInMap =>
if (taskReaperInMap eq this) {
taskReaperForTask.remove(taskId)
} else {
// This must have been a TaskReaper where interruptThread == false where a subsequent
// killTask() call for the same task had interruptThread == true and overwrote the
// map entry.
}
}
}
}
}
}



