Transport模块之REST的解析与处理
注册REST处理处理请求
HTTP请求执行路径
Transport模块之REST的解析与处理注册REST处理基于ES源码6.7.2
借由上图回顾一下通信模块的初始化过程,在ActionModule下对Rest请求处理进行了注册,注册过程在initRestHandlers方法中。可以发现对REST请求执行处理的类的命名是Rest*Action,同时可以发现这些处理类都继承了baseRestHandler,而baseRestHandler继承了RestHandler。
public void initRestHandlers(SuppliernodesInCluster) { List catActions = new ArrayList<>(); Consumer registerHandler = a -> { if (a instanceof AbstractCatAction) { catActions.add((AbstractCatAction) a); } }; registerHandler.accept(new RestMainAction(settings, restController)); registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController)); registerHandler.accept(new RestNodesStatsAction(settings, restController)); registerHandler.accept(new RestNodesUsageAction(settings, restController)); registerHandler.accept(new RestNodesHotThreadsAction(settings, restController)); registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController)); registerHandler.accept(new RestClusterStatsAction(settings, restController)); registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter)); registerHandler.accept(new RestClusterHealthAction(settings, restController)); ... ... ... for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameexpressionResolver, nodesInCluster)) { registerHandler.accept(handler); } } registerHandler.accept(new RestCatAction(settings, restController, catActions)); }
以RestClusterHealthAction为例,在其构造函数中对请求头中请求方法为GET,URI为/_cluster/health和拥有占位符/_cluster/health/{index}的处理类为this(即自己)。
public RestClusterHealthAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/health", this);
controller.registerHandler(RestRequest.Method.GET, "/_cluster/health/{index}", this);
}
由于继承了baseRestHandler,所以必须实现prepareRequest方法,用于在接收到请求时,做一些前置工作,比如验证参数,转换为内部RPC请求等。
处理请求 HTTP请求执行路径baseRestHandler的handleRequest方法
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// 对请求的预处理
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
if (request.hasContent() && request.isContentConsumed() == false) {
deprecationLogger.deprecated(
"request [{} {}] does not support having a body; Elasticsearch 7.x+ will reject such requests",
request.method(),
request.path());
}
usageCount.increment();
// 实际执行真正的Action
action.accept(channel);
}
RestClusterHealthAction的prepareRequest
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// 构造内部请求clusterHealthRequest
ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterHealthRequest.masterNodeTimeout()));
clusterHealthRequest.timeout(request.paramAsTime("timeout", clusterHealthRequest.timeout()));
String waitForStatus = request.param("wait_for_status");
if (waitForStatus != null) {
clusterHealthRequest.waitForStatus(ClusterHealthStatus.valueOf(waitForStatus.toUpperCase(Locale.ROOT)));
}
clusterHealthRequest.waitForNoRelocatingShards(
request.paramAsBoolean("wait_for_no_relocating_shards", clusterHealthRequest.waitForNoRelocatingShards()));
clusterHealthRequest.waitForNoInitializingShards(
request.paramAsBoolean("wait_for_no_initializing_shards", clusterHealthRequest.waitForNoRelocatingShards()));
if (request.hasParam("wait_for_relocating_shards")) {
// wait_for_relocating_shards has been removed in favor of wait_for_no_relocating_shards
throw new IllegalArgumentException("wait_for_relocating_shards has been removed, " +
"use wait_for_no_relocating_shards [true/false] instead");
}
String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
clusterHealthRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
}
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
if (request.param("wait_for_events") != null) {
clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
}
// 在AbstractClient中将REST和tcpAction,在ActionModule找到对应的处理类
return channel -> client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<>(channel));
}
AbstractClient的health
@Override
public void health(final ClusterHealthRequest request, final ActionListener listener) {
execute(ClusterHealthAction.INSTANCE, request, listener);
}
之后通过NodeClient.executeLocally将请求发出去。开始转入TCP请求处理了。
publicTask executeLocally(GenericAction action, Request request, ActionListener listener) { return transportAction(action).execute(request, listener); }
可以看出REST应该是在内部通信上有包装了一层。为了简化和方便开发人员通过客户端调用。ES的客户端建议使用REST Client。



