以get请求为例
action: 处理网络层 http, rpc 请求的行为。
TransportAction 作为 处理请求的基类
public final Task execute(Request request, ActionListenerlistener) { Task task = taskManager.register("transport", actionName, request); execute(task, request, new ActionListener () { @Override public void onResponse(Response response) { taskManager.unregister(task); listener.onResponse(response); } @Override public void onFailure(Exception e) { taskManager.unregister(task); listener.onFailure(e); } }); return task; } protected abstract void doExecute(Task task, Request request, ActionListener listener);
TransportAction 实现类(TransportGetAction), 实现抽象方法
protected abstract void doExecute(Task task, Request request, ActionListenerlistener);
索引级get的处理
private void perform(@Nullable final Exception currentFailure) {
Exception lastFailure = this.lastFailure;
if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
lastFailure = currentFailure;
this.lastFailure = currentFailure;
}
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Exception failure = lastFailure;
if (failure == null || isShardNotAvailableException(failure)) {
failure = new NoShardAvailableActionException(null,
LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
} else {
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null,
internalRequest.request()), failure);
}
listener.onFailure(failure);
return;
}
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
internalRequest.request().internalShardId = shardRouting.shardId();
if (logger.isTraceEnabled()) {
logger.trace(
"sending request [{}] to shard [{}] on node [{}]",
internalRequest.request(),
internalRequest.request().internalShardId,
node
);
}
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
new TransportResponseHandler() {
@Override
public Response read(StreamInput in) throws IOException {
Response response = newResponse();
response.readFrom(in);
return response;
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(final Response response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
onFailure(shardRouting, exp);
}
});
}
}
分片级get的处理过程
protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (request.refresh() && !request.realtime()) {
indexShard.refresh("refresh_flag_get");
}
GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
return new GetResponse(result);
}



