【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3
//静态块,构建JanusGraphVertex反序列化对象,构建JanusGraphHadoopSetupImpl对象时,会打开图
static {
refCounter = new RefCountedCloseable<>((conf) ->
new JanusGraphVertexDeserializer(new JanusGraphHadoopSetupImpl(conf)));
}
//构建HadoopRecordReader对象
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//创建该对象时,inputFormat对象为:配置文件中配置的HbaseBinaryInputFormat对象;
return new HadoopRecordReader(refCounter, inputFormat.createRecordReader(split, context));
}
7,JanusGraphHadoopSetupImpl
public JanusGraphHadoopSetupImpl(final Configuration config) {
scanConf = ModifiableHadoopConfiguration.of(JanusGraphHadoopConfiguration.MAPRED_NS, config);
//获取图配置信息
BasicConfiguration bc = scanConf.getJanusGraphConf();
//打开图对象
graph = (StandardJanusGraph) JanusGraphFactory.open(bc);
//开始事务
tx = (StandardJanusGraphTx)graph.buildTransaction().readOnly().vertexCacheSize(200).start();
}
8, JanusGraphFactory
public static JanusGraph open(ReadConfiguration configuration, String backupName) {
final ModifiableConfiguration config = new ModifiableConfiguration(ROOT_NS, (WriteConfiguration) configuration, BasicConfiguration.Restriction.NONE);
final String graphName = config.has(GRAPH_NAME) ? config.get(GRAPH_NAME) : backupName;
final JanusGraphManager jgm = JanusGraphManagerUtility.getInstance();
if (null != graphName) {
Preconditions.checkNotNull(jgm, JANUS_GRAPH_MANAGER_EXPECTED_STATE_MSG);
return (JanusGraph) jgm.openGraph(graphName, gName -> new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration)));
} else {
if (jgm != null) {
log.warn("You should supply "graph.graphname" in your .properties file configuration if you are opening " +
"a graph that has not already been opened at server start, i.e. it was " +
"defined in your YAML file. This will ensure the graph is tracked by the JanusGraphManager, " +
"which will enable autocommit and rollback functionality upon all gremlin script executions. " +
"Note that JanusGraphFactory#open(String === shortcut notation) does not support consuming the property " +
""graph.graphname" so these graphs should be accessed dynamically by supplying a .properties file here " +
"or by using the ConfiguredGraphFactory.");
}
//构建图对象
return new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration));
}
}
9,GraphDatabaseConfigurationBuilder
构建graph存储库配置信息(e.g:后端存储为hbase:Hbase库的配置信息)
public GraphDatabaseConfiguration build(ReadConfiguration localConfig){
Preconditions.checkNotNull(localConfig);
BasicConfiguration localBasicConfiguration = new BasicConfiguration(ROOT_NS,localConfig, BasicConfiguration.Restriction.NONE);
ModifiableConfiguration overwrite = new ModifiableConfiguration(ROOT_NS,new CommonsConfiguration(), BasicConfiguration.Restriction.NONE);
//此处获取的对象是:HbaseStoreManeger,数据存储管理对象
final KeyColumnValueStoreManager storeManager = Backend.getStorageManager(localBasicConfiguration);
final StoreFeatures storeFeatures = storeManager.getFeatures();
//获取全局配置信息:
final ReadConfiguration globalConfig = new ReadConfigurationBuilder().buildGlobalConfiguration(
localConfig, localBasicConfiguration, overwrite, storeManager,
new ModifiableConfigurationBuilder(), new KCVSConfigurationBuilder());
//Copy over local config options
//获取局部本地配置信息;即创建人物时,传入参数信息
ModifiableConfiguration localConfiguration = new ModifiableConfiguration(ROOT_NS, new CommonsConfiguration(), BasicConfiguration.Restriction.LOCAL);
localConfiguration.setAll(getLocalSubset(localBasicConfiguration.getAll()));
Configuration combinedConfig = new MixedConfiguration(ROOT_NS,globalConfig,localConfig);
//Compute unique instance id
String uniqueGraphId = UniqueInstanceIdRetriever.getInstance().getOrGenerateUniqueInstanceId(combinedConfig);
overwrite.set(UNIQUE_INSTANCE_ID, uniqueGraphId);
checkAndOverwriteTransactionLogConfiguration(combinedConfig, overwrite, storeFeatures);
checkAndOverwriteSystemManagementLogConfiguration(combinedConfig, overwrite);
MergedConfiguration configuration = new MergedConfiguration(overwrite,combinedConfig);
return new GraphDatabaseConfiguration(localConfig, localConfiguration, uniqueGraphId, configuration);
}
10 StandardJanusGraph
静态代码块,注册相关策略
static {
TraversalStrategies graphStrategies =
TraversalStrategies.GlobalCache.getStrategies(Graph.class)
.clone()
.addStrategies(AdjacentVertexFilterOptimizerStrategy.instance(),
AdjacentVertexHasIdOptimizerStrategy.instance(),
AdjacentVertexIsOptimizerStrategy.instance(),
JanusGraphLocalQueryOptimizerStrategy.instance(),
JanusGraphStepStrategy.instance(),
JanusGraphIoRegistrationStrategy.instance());
//Register with cache
TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraph.class, graphStrategies);
TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraphTx.class, graphStrategies);
}
private final GraphDatabaseConfiguration config;
private final Backend backend;
private final IDManager idManager;
private final VertexIDAssigner idAssigner;
private final TimestampProvider times;
//Serializers
protected final IndexSerializer indexSerializer;
protected final EdgeSerializer edgeSerializer;
protected final Serializer serializer;
//Caches
public final SliceQuery vertexExistenceQuery;
private final RelationQueryCache queryCache;
private final SchemaCache schemaCache;
//Log
private final ManagementLogger managementLogger;
//Shutdown hook
private volatile ShutdownThread shutdownHook;
private volatile boolean isOpen;
private final AtomicLong txCounter;
private final Set openTransactions;
private final String name;
//构造函数,配置相关属性
public StandardJanusGraph(GraphDatabaseConfiguration configuration) {
this.config = configuration;
//获取存储对象
this.backend = configuration.getBackend();
this.name = configuration.getGraphName();
this.idAssigner = config.getIDAssigner(backend);
this.idManager = idAssigner.getIDManager();
this.serializer = config.getSerializer();
StoreFeatures storeFeatures = backend.getStoreFeatures();
this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer,
this.backend.getIndexInformation(), storeFeatures.isDistributed() && storeFeatures.isKeyOrdered());
this.edgeSerializer = new EdgeSerializer(this.serializer);
this.vertexExistenceQuery = edgeSerializer.getQuery(baseKey.VertexExists, Direction.OUT, new EdgeSerializer.TypedInterval[0]).setLimit(1);
this.queryCache = new RelationQueryCache(this.edgeSerializer);
this.schemaCache = configuration.getTypeCache(typeCacheRetrieval);
this.times = configuration.getTimestampProvider();
isOpen = true;
txCounter = new AtomicLong(0);
openTransactions = Collections.newSetFromMap(new ConcurrentHashMap(100, 0.75f, 1));
//Register instance and ensure uniqueness
String uniqueInstanceId = configuration.getUniqueGraphId();
ModifiableConfiguration globalConfig = getGlobalSystemConfig(backend);
final boolean instanceExists = globalConfig.has(REGISTRATION_TIME, uniqueInstanceId);
final boolean replaceExistingInstance = configuration.getConfiguration().get(REPLACE_INSTANCE_IF_EXISTS);
if (instanceExists && !replaceExistingInstance) {
throw new JanusGraphException(String.format("A JanusGraph graph with the same instance id [%s] is already open. Might required forced shutdown.", uniqueInstanceId));
} else if (instanceExists && replaceExistingInstance) {
log.debug(String.format("Instance [%s] already exists. Opening the graph per " + REPLACE_INSTANCE_IF_EXISTS.getName() + " configuration.", uniqueInstanceId));
}
globalConfig.set(REGISTRATION_TIME, times.getTime(), uniqueInstanceId);
Log managementLog = backend.getSystemMgmtLog();
managementLogger = new ManagementLogger(this, managementLog, schemaCache, this.times);
managementLog.registerReader(ReadMarker.fromNow(), managementLogger);
shutdownHook = new ShutdownThread(this);
Runtime.getRuntime().addShutdownHook(shutdownHook);
log.debug("Installed shutdown hook {}", shutdownHook, new Throwable("Hook creation trace"));
}



