return nil, err
}
//创建KubeAPIServer并注册路由
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
//创建aggregatorServer并注册路由
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don’t need special handling for innerStopCh because the aggregator server doesn’t create any go routines
return nil, err
}
}
创建每个server都要有对应它的config。apiExtensionServer和aggregatorServer的Config需要依赖kubeAPIServerConfig,而这几个ServerConfig都需要依赖GenericConfig,CreateKubeAPIServerConfig创建kubeAPIServerConfig,而CreateKubeAPIServerConfig调用buildGenericConfig创建GenericConfig。
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
)(…){
//创建一个genericConfig对象
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
//设置genericConfig的字段,代码不展示
//创建认证实例
if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
return
}
//创建鉴权实例
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)
//准入控制器
err = s.Admission.ApplyTo(
genericConfig,
versionedInformers,
kubeClientConfig,
feature.DefaultFeatureGate,
pluginInitializers…)
}
APIExtensionServer
APIExtensionServer的创建流程大致包含以下几个步骤
-
创建GeneriAPIServer
-
实例化CustomResourceDefinitions
-
实例化APIGroupInfo
-
InstallAPIGroup
三种类型的Server底层都需要依赖GeneriAPIServer。第二步创建的CustomResourceDefinitions是本类型Server的对象,用于后续进行路由注册。APIGroupInfo是用于每个版本、每个资源类型对应的存储对象。最后调用InstallAPIGroup进行路由注册,把每一个资源的版本,类型映射到一个URI地址中。代码如下所示
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
return apiextensionsConfig.Complete().New(delegateAPIServer)
}
//代码位于 /vendor/k8s.io/apiextensions-apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
//创建Generic
genericServer, err := c.GenericConfig.New(“apiextensions-apiserver”, delegationTarget)
//实例化 CustomResourceDefinitions
s := &CustomResourceDefinitions{
GenericAPIServer: genericServer,
}
//实例化APIGroupInfo
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
storage := map[string]rest.Storage{}
// customresourcedefinitions
customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
storage[“customresourcedefinitions”] = customResourceDefinitionStorage
storage[“customresourcedefinitions/status”] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroup
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
Version.Version] = storage
}
//另一个版本的类似,不作展示
//InstallAPIGroup注册
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
}
KubeAPIServer
KubeAPIServer处理k8s内置资源请求,它的创建流程与APIExtensionServer类似,包含下面几个步骤
-
创建GeneriAPIServer
-
实例化Instance
-
installLegacyAPI
-
installAPI
其中Instance是KubeAPIServer的Server对象。KubeAPIServer创建和Install的APIGroup需要调用两个方法,一个是installLegacyAPI,另一个是installAPI,原因在于k8s的apiGroup分了/api和/apis两种。初期的资源其实没有apiGroup这个概念,而后期引入了groupVersion为了兼容原有的才把旧的资源类型的URI地址都归属于/api这个路径下的,新的全部在/apis这个路径下,因此在创建注册APIGroup时都分了两类。代码如下
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
return kubeAPIServer, nil
}
//代码位于 /vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
//创建Generic
s, err := c.GenericConfig.New(“kube-apiserver”, delegationTarget)
//1.14版本的是Master,当前版本是Instance
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
//实例化核心API
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
return nil, err
}
}
restStorageProviders := []RESTStorageProvider{…}
//InstallAPIs,内部包含InstallAPIGroup
if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders…); err != nil {
return nil, err
}
}
注册核心apiGroups
进入m.InstallLegacyAPI,这个方法包含了实例化ApiGroupInfo和InstalAPIGroup两个操作,这部分资源是k8s的核心资源
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
//实例化ApiGroupInfo
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
return fmt.Errorf(“error building core storage: %v”, err)
}
controllerName := “bootstrap-controller”
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
//相当于调用InstallAPIGroup
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf(“error in registering group versions: %v”, err)
}
return nil
}
实例化APIGroupInfo的代码局部如下,代码篇幅较长,只摘取pod一部分的源码展示,代码位于/pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
Scheme: legacyscheme.Scheme,
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
podDisruptionClient,
)
restStorageMap := map[string]rest.Storage{
“pods”: podStorage.Pod,
“pods/attach”: podStorage.Attach,
“pods/status”: podStorage.Status,
“pods/log”: podStorage.Log,
“pods/exec”: podStorage.Exec,
“pods/portforward”: podStorage.PortForward,
“pods/proxy”: podStorage.Proxy,
“pods/binding”: podStorage.Binding,
“bindings”: podStorage.LegacyBinding,
…
}
}
m.GenericAPIServer.InstallLegacyAPIGroup的第一个参数是apiPrefix,值是/api;第二个参数是上面创建好的,包含资源存储方式的apiGroupInfo
与InstallAPIGroup类似地,InstallLegacyAPIGroup需要经过两层调用才会到达InstallREST,调用链如下
m.GenericAPIServer.InstallLegacyAPIGroup
|–s.installAPIResources
|–apiGroupVersion.InstallREST
InstallREST的入参是restful.Container,他是golang http框架go-restful里面的一个重要对象,在InstallREST里面构造出installer,installer包含资源的存储方法和资源对应api的前缀,利用installer.Install()来创建出go-restful的webservice,webservice加入到传入得container,即完成api的注册。
代码位于/vendor/k8s.io/apiserver/pkg/endpoints/groupversion.go
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
prefix: prefix,
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
}
installer.Install()方法的边幅很长,它既然创建了webservice,api中各个URL的路由注册,handler的绑定也会在里面实现。由于这部分代码也涉及到apiserver如何响应处理一个http请求,本篇先不探讨
go-restful框架
不过上面提及到go-restful框架的几个概念,在这里进行一个简单的科普
-
container:在http的角度就是一个Server,里面就包含若干个webservice
-
webservice:webservice从结构来说是承上启下的一个角色,它包含了一组route,而且这组route都会有一个共同的basePath或者说他们的URL的prefix是相同的
-
route:route对应具体的一个URL,它需要指定具体的路径Path,请求方法Method和处理函数Handler,以及一些参数Parameter等等。
他们的层次结构如下
container
|–webservice
|–Route
AggregratorServer
用于处理聚合进来的api请求,实际是做七层转发,它的创建流程与APIExtensionServer的最为相似
-
创建GeneriAPIServer
-
实例化Aggregrator
-
实例化APIGroupInfo
-
InstallAPIGroup
实际创建AggregratorServer的代码位于/vendor/k8s.io/kube-aggregrator/pkg/apiserver/apiserver.go
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
//创建GeneriAPIServer
genericServer, err := c.GenericConfig.New(“kube-aggregator”, delegationTarget)
//实例化Aggregrator
s := &APIAggregator{…}
//实例化APIGroupInfo
apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
//InstallAPIGroup
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
return nil, err
}
}
运行http serverapi的路由绑定完毕,最后就是要把http server跑起来,prepared.Run调用的是由preparedGenericAPIServer实现的Run方法,经过多层调用最终把server跑起来,调用链如下



