diff --git a/cli/cmd/lib_apis.go b/cli/cmd/lib_apis.go index 23514342dc..bce36d0ca2 100644 --- a/cli/cmd/lib_apis.go +++ b/cli/cmd/lib_apis.go @@ -24,6 +24,11 @@ import ( func replicaCountTable(counts *status.ReplicaCounts) table.Table { var rows [][]interface{} for _, replicaCountType := range status.ReplicaCountTypes { + // skip up-to-date count + if replicaCountType == status.ReplicaCountUpToDate { + continue + } + count := counts.GetCountBy(replicaCountType) canBeHiddenIfZero := false switch replicaCountType { diff --git a/cli/cmd/lib_async_apis.go b/cli/cmd/lib_async_apis.go index e2e4441003..e534a9923d 100644 --- a/cli/cmd/lib_async_apis.go +++ b/cli/cmd/lib_async_apis.go @@ -60,8 +60,8 @@ func asyncDescribeAPITable(asyncAPI schema.APIResponse, env cliconfig.Environmen return "", errors.ErrorUnexpected("missing metadata from operator response") } - if asyncAPI.Status == nil { - return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", asyncAPI.Metadata.Name)) + if asyncAPI.ReplicaCounts == nil { + return "", errors.ErrorUnexpected(fmt.Sprintf("missing replica counts for %s api", asyncAPI.Metadata.Name)) } t := asyncAPIsTable([]schema.APIResponse{asyncAPI}, []string{env.Name}) @@ -75,7 +75,7 @@ func asyncDescribeAPITable(asyncAPI schema.APIResponse, env cliconfig.Environmen out += "\n" + console.Bold("endpoint: ") + *asyncAPI.Endpoint + "\n" } - t = replicaCountTable(asyncAPI.Status.ReplicaCounts) + t = replicaCountTable(asyncAPI.ReplicaCounts) out += "\n" + t.MustFormat() return out, nil @@ -85,15 +85,27 @@ func asyncAPIsTable(asyncAPIs []schema.APIResponse, envNames []string) table.Tab rows := make([][]interface{}, 0, len(asyncAPIs)) for i, asyncAPI := range asyncAPIs { - if asyncAPI.Metadata == nil || asyncAPI.Status == nil { + if asyncAPI.Metadata == nil || (asyncAPI.Status == nil && asyncAPI.ReplicaCounts == nil) { continue } + + var ready, requested, upToDate int32 + if asyncAPI.Status != nil { + ready = asyncAPI.Status.Ready + requested = asyncAPI.Status.Requested + upToDate = asyncAPI.Status.UpToDate + } else { + ready = asyncAPI.ReplicaCounts.Ready + requested = asyncAPI.ReplicaCounts.Requested + upToDate = asyncAPI.ReplicaCounts.UpToDate + } + lastUpdated := time.Unix(asyncAPI.Metadata.LastUpdated, 0) rows = append(rows, []interface{}{ envNames[i], asyncAPI.Metadata.Name, - fmt.Sprintf("%d/%d", asyncAPI.Status.Ready, asyncAPI.Status.Requested), - asyncAPI.Status.UpToDate, + fmt.Sprintf("%d/%d", ready, requested), + upToDate, libtime.SinceStr(&lastUpdated), }) } diff --git a/cli/cmd/lib_realtime_apis.go b/cli/cmd/lib_realtime_apis.go index dd73db1282..92234a83f9 100644 --- a/cli/cmd/lib_realtime_apis.go +++ b/cli/cmd/lib_realtime_apis.go @@ -59,8 +59,8 @@ func realtimeDescribeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Envi return "", errors.ErrorUnexpected("missing metadata from operator response") } - if realtimeAPI.Status == nil { - return "", errors.ErrorUnexpected(fmt.Sprintf("missing status for %s api", realtimeAPI.Metadata.Name)) + if realtimeAPI.ReplicaCounts == nil { + return "", errors.ErrorUnexpected(fmt.Sprintf("missing replica counts for %s api", realtimeAPI.Metadata.Name)) } t := realtimeAPIsTable([]schema.APIResponse{realtimeAPI}, []string{env.Name}) @@ -74,7 +74,7 @@ func realtimeDescribeAPITable(realtimeAPI schema.APIResponse, env cliconfig.Envi out += "\n" + console.Bold("endpoint: ") + *realtimeAPI.Endpoint + "\n" } - t = replicaCountTable(realtimeAPI.Status.ReplicaCounts) + t = replicaCountTable(realtimeAPI.ReplicaCounts) out += "\n" + t.MustFormat() return out, nil @@ -84,15 +84,27 @@ func realtimeAPIsTable(realtimeAPIs []schema.APIResponse, envNames []string) tab rows := make([][]interface{}, 0, len(realtimeAPIs)) for i, realtimeAPI := range realtimeAPIs { - if realtimeAPI.Metadata == nil || realtimeAPI.Status == nil { + if realtimeAPI.Metadata == nil || (realtimeAPI.Status == nil && realtimeAPI.ReplicaCounts == nil) { continue } + + var ready, requested, upToDate int32 + if realtimeAPI.Status != nil { + ready = realtimeAPI.Status.Ready + requested = realtimeAPI.Status.Requested + upToDate = realtimeAPI.Status.UpToDate + } else { + ready = realtimeAPI.ReplicaCounts.Ready + requested = realtimeAPI.ReplicaCounts.Requested + upToDate = realtimeAPI.ReplicaCounts.UpToDate + } + lastUpdated := time.Unix(realtimeAPI.Metadata.LastUpdated, 0) rows = append(rows, []interface{}{ envNames[i], realtimeAPI.Metadata.Name, - fmt.Sprintf("%d/%d", realtimeAPI.Status.Ready, realtimeAPI.Status.Requested), - realtimeAPI.Status.UpToDate, + fmt.Sprintf("%d/%d", ready, requested), + upToDate, libtime.SinceStr(&lastUpdated), }) } diff --git a/cmd/autoscaler/main.go b/cmd/autoscaler/main.go index 71e8bd034e..24035c2c38 100644 --- a/cmd/autoscaler/main.go +++ b/cmd/autoscaler/main.go @@ -28,6 +28,7 @@ import ( "time" "github.com/cortexlabs/cortex/pkg/autoscaler" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" @@ -107,6 +108,9 @@ func main() { defer telemetry.Close() scheme := runtime.NewScheme() + if err := serverless.AddToScheme(scheme); err != nil { + exit(log, err, "failed to add k8s serverless to scheme") + } if err := clientgoscheme.AddToScheme(scheme); err != nil { exit(log, err, "failed to add k8s client-go-scheme to scheme") } diff --git a/go.mod b/go.mod index 4381c8a46b..490aac0a00 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/go-ole/go-ole v1.2.5 // indirect github.com/gobwas/glob v0.2.3 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/go-cmp v0.5.6 // indirect + github.com/google/go-cmp v0.5.6 github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.2.0 github.com/googleapis/gnostic v0.5.5 // indirect diff --git a/manager/manifests/autoscaler.yaml.j2 b/manager/manifests/autoscaler.yaml.j2 index ce875b24c3..2f00afb164 100644 --- a/manager/manifests/autoscaler.yaml.j2 +++ b/manager/manifests/autoscaler.yaml.j2 @@ -42,6 +42,13 @@ rules: - get - update - watch +- apiGroups: + - "serverless.cortex.dev" + resources: + - realtimeapis + verbs: + - get + - update --- diff --git a/pkg/activator/activator.go b/pkg/activator/activator.go index 7b68736951..f5400e5fcb 100644 --- a/pkg/activator/activator.go +++ b/pkg/activator/activator.go @@ -158,7 +158,7 @@ func (a *activator) getOrCreateReadinessTracker(apiName string) *readinessTracke } func (a *activator) addAPI(obj interface{}) { - apiMetadata, err := getAPIMeta(obj) + apiMetadata, err := getAPIMeta(obj, true) if err != nil { a.logger.Errorw("error during virtual service informer add callback", zap.Error(err)) telemetry.Error(err) @@ -182,7 +182,7 @@ func (a *activator) addAPI(obj interface{}) { } func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) { - apiMetadata, err := getAPIMeta(newObj) + apiMetadata, err := getAPIMeta(newObj, true) if err != nil { a.logger.Errorw("error during virtual service informer update callback", zap.Error(err)) telemetry.Error(err) @@ -195,7 +195,7 @@ func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) { apiName := apiMetadata.apiName - oldAPIMetatada, err := getAPIMeta(oldObj) + oldAPIMetatada, err := getAPIMeta(oldObj, true) if err != nil { a.logger.Errorw("error during virtual service informer update callback", zap.Error(err)) telemetry.Error(err) @@ -212,7 +212,7 @@ func (a *activator) updateAPI(oldObj interface{}, newObj interface{}) { } func (a *activator) removeAPI(obj interface{}) { - apiMetadata, err := getAPIMeta(obj) + apiMetadata, err := getAPIMeta(obj, false) if err != nil { a.logger.Errorw("error during virtual service informer delete callback", zap.Error(err)) telemetry.Error(err) @@ -250,7 +250,7 @@ func (a *activator) updateReadinessTracker(obj interface{}) { return } - api, err := getAPIMeta(obj) + api, err := getAPIMeta(obj, false) if err != nil { a.logger.Errorw("error during deployment informer callback", zap.Error(err)) telemetry.Error(err) @@ -271,7 +271,7 @@ func (a *activator) updateReadinessTracker(obj interface{}) { } func (a *activator) removeReadinessTracker(obj interface{}) { - api, err := getAPIMeta(obj) + api, err := getAPIMeta(obj, false) if err != nil { a.logger.Errorw("error during deployment informer callback", zap.Error(err)) telemetry.Error(err) diff --git a/pkg/activator/helpers.go b/pkg/activator/helpers.go index 5bce2cb7bf..e3d20b6f0f 100644 --- a/pkg/activator/helpers.go +++ b/pkg/activator/helpers.go @@ -31,7 +31,7 @@ type apiMeta struct { maxQueueLength int } -func getAPIMeta(obj interface{}) (apiMeta, error) { +func getAPIMeta(obj interface{}, includeAnnotations bool) (apiMeta, error) { resource, err := meta.Accessor(obj) if err != nil { return apiMeta{}, err @@ -48,16 +48,22 @@ func getAPIMeta(obj interface{}) (apiMeta, error) { return apiMeta{}, errors.ErrorUnexpected("got a virtual service without apiName label") } - maxQueueLength, maxConcurrency, err := userconfig.ConcurrencyFromAnnotations(resource) - if err != nil { - return apiMeta{}, err + var maxQueueLength, maxConcurrency int + var annotations map[string]string + + if includeAnnotations { + maxQueueLength, maxConcurrency, err = userconfig.ConcurrencyFromAnnotations(resource) + if err != nil { + return apiMeta{}, err + } + annotations = resource.GetAnnotations() } return apiMeta{ apiName: apiName, apiKind: userconfig.KindFromString(apiKind), labels: labels, - annotations: resource.GetAnnotations(), + annotations: annotations, maxConcurrency: maxConcurrency, maxQueueLength: maxQueueLength, }, nil diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index bd01d7e286..4dc461f9b7 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -297,12 +297,9 @@ func TestAutoscaler_Awake(t *testing.T) { ticker := time.NewTicker(250 * time.Millisecond) go func() { - for { - select { - case <-ticker.C: - err := autoscaleFn() - require.NoError(t, err) - } + for range ticker.C { + err := autoscaleFn() + require.NoError(t, err) } }() @@ -372,12 +369,9 @@ func TestAutoscaler_MinReplicas(t *testing.T) { ticker := time.NewTicker(250 * time.Millisecond) go func() { - for { - select { - case <-ticker.C: - err := autoscaleFn() - require.NoError(t, err) - } + for range ticker.C { + err := autoscaleFn() + require.NoError(t, err) } }() @@ -444,12 +438,9 @@ func TestAutoscaler_MaxReplicas(t *testing.T) { ticker := time.NewTicker(250 * time.Millisecond) go func() { - for { - select { - case <-ticker.C: - err := autoscaleFn() - require.NoError(t, err) - } + for range ticker.C { + err := autoscaleFn() + require.NoError(t, err) } }() diff --git a/pkg/autoscaler/helpers.go b/pkg/autoscaler/helpers.go new file mode 100644 index 0000000000..9f9e4af039 --- /dev/null +++ b/pkg/autoscaler/helpers.go @@ -0,0 +1,65 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoscaler + +import ( + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + "github.com/cortexlabs/cortex/pkg/lib/errors" + libstrings "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/types/userconfig" +) + +func generateAutoscalingFromServerlessRealtimeAPI(realtimeAPI serverless.RealtimeAPI) (*userconfig.Autoscaling, error) { + targetInFlight, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.TargetInFlight) + if !ok { + return nil, errors.ErrorUnexpected("failed to parse target-in-flight requests from autoscaling spec") + } + + maxDownscaleFactor, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.MaxDownscaleFactor) + if !ok { + return nil, errors.ErrorUnexpected("failed to parse max downscale factor from autoscaling spec") + } + + maxUpscaleFactor, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.MaxUpscaleFactor) + if !ok { + return nil, errors.ErrorUnexpected("failed to parse max upscale factor from autoscaling spec") + } + + downscaleTolerance, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.DownscaleTolerance) + if !ok { + return nil, errors.ErrorUnexpected("failed to parse downscale tolerance from autoscaling spec") + } + + upscaleTolerance, ok := libstrings.ParseFloat64(realtimeAPI.Spec.Autoscaling.UpscaleTolerance) + if !ok { + return nil, errors.ErrorUnexpected("failed to parse upscale tolerance from autoscaling spec") + } + + return &userconfig.Autoscaling{ + MinReplicas: realtimeAPI.Spec.Autoscaling.MinReplicas, + MaxReplicas: realtimeAPI.Spec.Autoscaling.MaxReplicas, + InitReplicas: realtimeAPI.Spec.Autoscaling.InitReplicas, + TargetInFlight: &targetInFlight, + Window: realtimeAPI.Spec.Autoscaling.Window.Duration, + DownscaleStabilizationPeriod: realtimeAPI.Spec.Autoscaling.DownscaleStabilizationPeriod.Duration, + UpscaleStabilizationPeriod: realtimeAPI.Spec.Autoscaling.UpscaleStabilizationPeriod.Duration, + MaxDownscaleFactor: maxDownscaleFactor, + MaxUpscaleFactor: maxUpscaleFactor, + DownscaleTolerance: downscaleTolerance, + UpscaleTolerance: upscaleTolerance, + }, nil +} diff --git a/pkg/autoscaler/realtime_scaler.go b/pkg/autoscaler/realtime_scaler.go index a0dbb60c28..9aa3e5c8ef 100644 --- a/pkg/autoscaler/realtime_scaler.go +++ b/pkg/autoscaler/realtime_scaler.go @@ -21,18 +21,13 @@ import ( "fmt" "time" - "github.com/cortexlabs/cortex/pkg/consts" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" - "github.com/cortexlabs/cortex/pkg/lib/pointer" - "github.com/cortexlabs/cortex/pkg/lib/telemetry" "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/cortex/pkg/workloads" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "go.uber.org/zap" - kapps "k8s.io/api/apps/v1" - kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -54,46 +49,23 @@ func (s *RealtimeScaler) Scale(apiName string, request int32) error { ctx := context.Background() // we use the controller-runtime client to make use of the cache mechanism - var deployment kapps.Deployment + var realtimeAPI serverless.RealtimeAPI err := s.k8s.Get(ctx, ctrlclient.ObjectKey{ Namespace: s.k8s.Namespace, - Name: workloads.K8sName(apiName), - }, &deployment) + Name: apiName, + }, &realtimeAPI) if err != nil { - return errors.Wrap(err, "failed to get deployment") + return errors.Wrap(err, "failed to get realtimeapi") } - if deployment.Spec.Replicas == nil { - return errors.Wrap(err, "k8s deployment doesn't have the replicas field set") - } - - current := *deployment.Spec.Replicas + current := realtimeAPI.Spec.Replicas if current == request { return nil } + realtimeAPI.Spec.Replicas = request - if request == 0 { - if err = s.routeToActivator(&deployment); err != nil { - return errors.Wrap(err, "failed to re-route traffic to activator") - } - } - - deployment.Spec.Replicas = pointer.Int32(request) - - if err = s.k8s.Update(ctx, &deployment); err != nil { - return errors.Wrap(err, "failed to update deployment") - } - - if current == 0 && request > 0 { - go func() { - if err := s.routeToService(&deployment); err != nil { - s.logger.Errorw("failed to re-route traffic to API", - zap.Error(err), zap.String("apiName", apiName), - ) - telemetry.Error(err) - } - }() - + if err = s.k8s.Update(ctx, &realtimeAPI); err != nil { + return errors.Wrap(err, "failed to update realtimeapi") } return nil @@ -136,133 +108,31 @@ func (s *RealtimeScaler) GetInFlightRequests(apiName string, window time.Duratio } func (s *RealtimeScaler) GetAutoscalingSpec(apiName string) (*userconfig.Autoscaling, error) { - deployment, err := s.k8s.GetDeployment(workloads.K8sName(apiName)) - if err != nil { - return nil, errors.Wrap(err, "failed to get deployment") - } - - if deployment == nil { - return nil, errors.ErrorUnexpected("unable to find k8s deployment", apiName) - } + ctx := context.Background() - autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) - if err != nil { + var api serverless.RealtimeAPI + if err := s.k8s.Get(ctx, ctrlclient.ObjectKey{ + Namespace: s.k8s.Namespace, + Name: apiName}, + &api, + ); err != nil { return nil, err } - return autoscalingSpec, nil + return generateAutoscalingFromServerlessRealtimeAPI(api) } func (s *RealtimeScaler) CurrentRequestedReplicas(apiName string) (int32, error) { ctx := context.Background() - // we use the controller-runtime client to make use of the cache mechanism - var deployment kapps.Deployment - err := s.k8s.Get(ctx, ctrlclient.ObjectKey{ + var api serverless.RealtimeAPI + if err := s.k8s.Get(ctx, ctrlclient.ObjectKey{ Namespace: s.k8s.Namespace, - Name: workloads.K8sName(apiName), - }, &deployment) - if err != nil { - return 0, errors.Wrap(err, "failed to get deployment") - } - - if deployment.Spec.Replicas == nil { - return 0, errors.Wrap(err, "k8s deployment doesn't have the replicas field set") + Name: apiName}, + &api, + ); err != nil { + return 0, err } - return *deployment.Spec.Replicas, nil -} - -func (s *RealtimeScaler) routeToService(deployment *kapps.Deployment) error { - ctx := context.Background() - vs, err := s.k8s.GetVirtualService(deployment.Name) - if err != nil { - return errors.Wrap(err, "failed to get virtual service") - } - - if len(vs.Spec.Http) < 1 { - return errors.ErrorUnexpected("virtual service does not have any http entries") - } - - if err = s.waitForReadyReplicas(ctx, deployment); err != nil { - return errors.Wrap(err, "no ready replicas available") - } - - for i := range vs.Spec.Http { - if len(vs.Spec.Http[i].Route) != 2 { - return errors.ErrorUnexpected("virtual service does not have the required number of 2 http routes") - } - - vs.Spec.Http[i].Route[0].Weight = 100 // service traffic - vs.Spec.Http[i].Route[1].Weight = 0 // activator traffic - } - - vsClient := s.k8s.IstioClientSet().NetworkingV1beta1().VirtualServices(s.k8s.Namespace) - if _, err = vsClient.Update(ctx, vs, kmeta.UpdateOptions{}); err != nil { - return errors.Wrap(err, "failed to update virtual service") - } - - return nil -} - -func (s *RealtimeScaler) routeToActivator(deployment *kapps.Deployment) error { - ctx := context.Background() - vs, err := s.k8s.GetVirtualService(deployment.Name) - if err != nil { - return errors.Wrap(err, "failed to get virtual service") - } - - if len(vs.Spec.Http) < 1 { - return errors.ErrorUnexpected("virtual service does not have any http entries") - } - - for i := range vs.Spec.Http { - if len(vs.Spec.Http[i].Route) != 2 { - return errors.ErrorUnexpected("virtual service does not have the required number of 2 http routes") - } - - vs.Spec.Http[i].Route[0].Weight = 0 // service traffic - vs.Spec.Http[i].Route[1].Weight = 100 // activator traffic - } - - vsClient := s.k8s.IstioClientSet().NetworkingV1beta1().VirtualServices(s.k8s.Namespace) - if _, err = vsClient.Update(ctx, vs, kmeta.UpdateOptions{}); err != nil { - return errors.Wrap(err, "failed to update virtual service") - } - - return nil -} - -func (s *RealtimeScaler) waitForReadyReplicas(ctx context.Context, deployment *kapps.Deployment) error { - watcher, err := s.k8s.ClientSet().AppsV1().Deployments(s.k8s.Namespace).Watch( - ctx, - kmeta.ListOptions{ - FieldSelector: fmt.Sprintf("metadata.name=%s", deployment.Name), - Watch: true, - }, - ) - if err != nil { - return errors.Wrap(err, "could not create deployment watcher") - } - - defer watcher.Stop() - - ctx, cancel := context.WithTimeout(ctx, consts.WaitForReadyReplicasTimeout) - defer cancel() - - for { - select { - case event := <-watcher.ResultChan(): - deploy, ok := event.Object.(*kapps.Deployment) - if !ok { - continue - } - - if deploy.Status.ReadyReplicas > 0 { - return nil - } - case <-ctx.Done(): - return ctx.Err() - } - } + return api.Spec.Replicas, nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index eb7bd5e269..0e8b9bc566 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -24,6 +24,7 @@ import ( "github.com/DataDog/datadog-go/statsd" "github.com/cortexlabs/cortex/pkg/consts" batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" "github.com/cortexlabs/cortex/pkg/lib/aws" cr "github.com/cortexlabs/cortex/pkg/lib/configreader" "github.com/cortexlabs/cortex/pkg/lib/errors" @@ -55,6 +56,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(batch.AddToScheme(scheme)) + utilruntime.Must(serverless.AddToScheme(scheme)) } func InitConfigs(clusterConfig *clusterconfig.Config, operatorMetadata *clusterconfig.OperatorMetadata) { diff --git a/pkg/crds/PROJECT b/pkg/crds/PROJECT index 97b1925b84..a80d48987a 100644 --- a/pkg/crds/PROJECT +++ b/pkg/crds/PROJECT @@ -3,7 +3,7 @@ layout: - go.kubebuilder.io/v3 multigroup: true projectName: operator -repo: github.com/cortexlabs/cortex +repo: github.com/cortexlabs/cortex/pkg/crds resources: - api: crdVersion: v1 @@ -14,4 +14,13 @@ resources: kind: BatchJob path: github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1 version: v1alpha1 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: cortex.dev + group: serverless + kind: RealtimeAPI + path: github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1 + version: v1alpha1 version: "3" diff --git a/pkg/crds/apis/serverless/v1alpha1/groupversion_info.go b/pkg/crds/apis/serverless/v1alpha1/groupversion_info.go new file mode 100644 index 0000000000..f9193a464a --- /dev/null +++ b/pkg/crds/apis/serverless/v1alpha1/groupversion_info.go @@ -0,0 +1,36 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package v1alpha1 contains API Schema definitions for the api v1alpha1 API group +//+kubebuilder:object:generate=true +//+groupName=serverless.cortex.dev +package v1alpha1 + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/scheme" +) + +var ( + // GroupVersion is group version used to register these objects + GroupVersion = schema.GroupVersion{Group: "serverless.cortex.dev", Version: "v1alpha1"} + + // SchemeBuilder is used to add go types to the GroupVersionKind scheme + SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion} + + // AddToScheme adds the types in this group-version to the given scheme. + AddToScheme = SchemeBuilder.AddToScheme +) diff --git a/pkg/crds/apis/serverless/v1alpha1/realtimeapi_types.go b/pkg/crds/apis/serverless/v1alpha1/realtimeapi_types.go new file mode 100644 index 0000000000..cf0708a735 --- /dev/null +++ b/pkg/crds/apis/serverless/v1alpha1/realtimeapi_types.go @@ -0,0 +1,283 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "bytes" + "fmt" + + "github.com/cortexlabs/cortex/pkg/lib/hash" + "github.com/cortexlabs/cortex/pkg/lib/k8s" + s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/userconfig" + kcore "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +// RealtimeAPISpec defines the desired state of RealtimeAPI +type RealtimeAPISpec struct { + // +kubebuilder:validation:Required + // Number of desired replicas + Replicas int32 `json:"replicas"` + + // Pod configuration + // +kubebuilder:validation:Required + Pod PodSpec `json:"pod"` + + // +kubebuilder:validation:Optional + // Autoscaling configuration + Autoscaling AutoscalingSpec `json:"autoscaling"` + + // +kubebuilder:validation:Optional + // List of node groups on which this API can run (default: all node groups are eligible) + NodeGroups []string `json:"node_groups"` + + // +kubebuilder:validation:Optional + // Deployment strategy to use when replacing existing replicas with new ones + UpdateStrategy UpdateStrategySpec `json:"update_strategy"` + + // +kubebuilder:validation:Required + // Networking configuration + Networking NetworkingSpec `json:"networking"` +} + +type PodSpec struct { + // +kubebuilder:validation:Required + // Port to which requests will be sent to + Port int32 `json:"port"` + + // +kubebuilder:validation:Required + // Maximum number of requests that will be concurrently sent into the container + MaxConcurrency int32 `json:"max_concurrency"` + + // +kubebuilder:validation:Required + // Maximum number of requests per replica which will be queued + // (beyond max_concurrency) before requests are rejected with error code 503 + MaxQueueLength int32 `json:"max_queue_length"` + + // +kubebuilder:validation:Required + // Configurations for the containers to run + Containers []ContainerSpec `json:"containers"` +} + +type ContainerSpec struct { + // +kubebuilder:validation:Required + // Name of the container + Name string `json:"name"` + + // +kubebuilder:validation:Required + // Docker image to use for the container + Image string `json:"image"` + + // +kubebuilder:validation:Optional + // Entrypoint (not executed within a shell) + Command []string `json:"command,omitempty"` + + // +kubebuilder:validation:Optional + // Arguments to the entrypoint + Args []string `json:"args,omitempty"` + + // +kubebuilder:validation:Optional + // Environment variables to set in the container + Env []kcore.EnvVar `json:"env,omitempty"` + + // Compute resource requests + Compute *ComputeSpec `json:"compute,omitempty"` + + // +kubebuilder:validation:Optional + // Periodic probe of container readiness; + // traffic will not be sent into the pod unless all containers' readiness probes are succeeding + ReadinessProbe *kcore.Probe `json:"readiness_probe,omitempty"` + + // +kubebuilder:validation:Optional + // Periodic probe of container liveness; container will be restarted if the probe fails + LivenessProbe *kcore.Probe `json:"liveness_probe,omitempty"` +} + +type ComputeSpec struct { + // +kubebuilder:validation:Optional + // CPU request for the container; one unit of CPU corresponds to one virtual CPU; + // fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix + CPU *resource.Quantity `json:"cpu,omitempty"` + + // +kubebuilder:validation:Optional + // GPU request for the container; one unit of GPU corresponds to one virtual GPU + GPU int64 `json:"gpu,omitempty"` + + // +kubebuilder:validation:Optional + // Inferentia request for the container; one unit of Inf corresponds to one virtual Inf chip + Inf int64 `json:"inf,omitempty"` + + // +kubebuilder:validation:Optional + // Memory request for the container; + // one unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T + // (or their power-of two counterparts: Ki, Mi, Gi, Ti) + Mem *resource.Quantity `json:"mem,omitempty"` + + // +kubebuilder:validation:Optional + // Size of shared memory (/dev/shm) for sharing data between multiple processes + Shm *resource.Quantity `json:"shm,omitempty"` +} + +type AutoscalingSpec struct { + // +kubebuilder:validation:Optional + // Initial number of replicas + InitReplicas int32 `json:"init_replicas,omitempty"` + + // +kubebuilder:validation:Optional + // Minimum number of replicas + MinReplicas int32 `json:"min_replicas,omitempty"` + + // +kubebuilder:validation:Optional + // Maximum number of replicas + MaxReplicas int32 `json:"max_replicas,omitempty"` + + // +kubebuilder:validation:Optional + // Desired number of in-flight requests per replica (including requests actively being processed as well as queued), + // which the autoscaler tries to maintain + TargetInFlight string `json:"target_in_flight,omitempty"` + + // +kubebuilder:validation:Optional + // Duration over which to average the API's in-flight requests per replica + Window kmeta.Duration `json:"window,omitempty"` + + // +kubebuilder:validation:Optional + // The API will not scale below the highest recommendation made during this period + DownscaleStabilizationPeriod kmeta.Duration `json:"downscale_stabilization_period,omitempty"` + + // +kubebuilder:validation:Optional + // The API will not scale above the lowest recommendation made during this period + UpscaleStabilizationPeriod kmeta.Duration `json:"upscale_stabilization_period,omitempty"` + + // +kubebuilder:validation:Optional + // Maximum factor by which to scale down the API on a single scaling event + MaxDownscaleFactor string `json:"max_downscale_factor,omitempty"` + + // +kubebuilder:validation:Optional + // Maximum factor by which to scale up the API on a single scaling event + MaxUpscaleFactor string `json:"max_upscale_factor,omitempty"` + + // +kubebuilder:validation:Optional + // Any recommendation falling within this factor below the current number of replicas will not trigger a + // scale down event + DownscaleTolerance string `json:"downscale_tolerance,omitempty"` + + // +kubebuilder:validation:Optional + // Any recommendation falling within this factor above the current number of replicas will not trigger a scale up event + UpscaleTolerance string `json:"upscale_tolerance,omitempty"` +} + +type UpdateStrategySpec struct { + // +kubebuilder:validation:Optional + // Maximum number of replicas that can be scheduled above the desired number of replicas during an update; + // can be an absolute number, e.g. 5, or a percentage of desired replicas, e.g. 10% (default: 25%) + // (set to 0 to disable rolling updates) + MaxSurge intstr.IntOrString `json:"max_surge"` + + // +kubebuilder:validation:Optional + // maximum number of replicas that can be unavailable during an update; can be an absolute number, + // e.g. 5, or a percentage of desired replicas, e.g. 10% + MaxUnavailable intstr.IntOrString `json:"max_unavailable"` +} + +type NetworkingSpec struct { + // +kubebuilder:validation:Optional + // Endpoint for the API + Endpoint string `json:"endpoint,omitempty"` +} + +// RealtimeAPIStatus defines the observed state of RealtimeAPI +type RealtimeAPIStatus struct { + // +kubebuilder:validation:Optional + // Number of ready pods + Ready int32 `json:"ready"` + + // +kubebuilder:validation:Optional + // Number of requested pods + Requested int32 `json:"requested"` + + // +kubebuilder:validation:Optional + // Number of pods with the last requested spec + UpToDate int32 `json:"up_to_date"` + + // +kubebuilder:validation:Optional + // URL of the deployed API + Endpoint string `json:"endpoint,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +//+kubebuilder:printcolumn:JSONPath=".status.ready",name="Ready",type="integer" +//+kubebuilder:printcolumn:JSONPath=".status.requested",name="Requested",type="integer" +//+kubebuilder:printcolumn:JSONPath=".status.up_to_date",name="Up-To-Date",type="integer" +//+kubebuilder:printcolumn:JSONPath=".status.endpoint",name="Endpoint",type="string" + +// RealtimeAPI is the Schema for the realtimeapis API +type RealtimeAPI struct { + kmeta.TypeMeta `json:",inline"` + kmeta.ObjectMeta `json:"metadata,omitempty"` + + Spec RealtimeAPISpec `json:"spec,omitempty"` + Status RealtimeAPIStatus `json:"status,omitempty"` +} + +// GetOrCreateAPIIDs retrieves API ids from annotations or creates them if they don't exist +func (api RealtimeAPI) GetOrCreateAPIIDs() (deploymentID, podID, specID, apiID string) { + deploymentID = api.Annotations["cortex.dev/deployment-id"] + if deploymentID == "" { + deploymentID = k8s.RandomName()[:10] + } + + var buf bytes.Buffer + + buf.WriteString(api.Name) + buf.WriteString(userconfig.RealtimeAPIKind.String()) + buf.WriteString(s.Obj(api.Spec.Pod)) + podID = hash.Bytes(buf.Bytes()) + + buf.Reset() + buf.WriteString(podID) + buf.WriteString(s.Obj(api.Spec.Networking)) + buf.WriteString(s.Obj(api.Spec.Autoscaling)) + buf.WriteString(s.Obj(api.Spec.NodeGroups)) + buf.WriteString(s.Obj(api.Spec.UpdateStrategy)) + specID = hash.Bytes(buf.Bytes())[:32] + + apiID = api.Annotations["cortex.dev/api-id"] + if apiID == "" || + api.Annotations["cortex.dev/deployment-id"] != deploymentID || + api.Annotations["cortex.dev/spec-id"] != specID { + apiID = fmt.Sprintf("%s-%s-%s", spec.MonotonicallyDecreasingID(), deploymentID, specID) + } + return deploymentID, podID, specID, apiID +} + +//+kubebuilder:object:root=true + +// RealtimeAPIList contains a list of RealtimeAPI +type RealtimeAPIList struct { + kmeta.TypeMeta `json:",inline"` + kmeta.ListMeta `json:"metadata,omitempty"` + Items []RealtimeAPI `json:"items"` +} + +func init() { + SchemeBuilder.Register(&RealtimeAPI{}, &RealtimeAPIList{}) +} diff --git a/pkg/crds/apis/serverless/v1alpha1/zz_generated.deepcopy.go b/pkg/crds/apis/serverless/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 0000000000..c81d4ac6f6 --- /dev/null +++ b/pkg/crds/apis/serverless/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,273 @@ +// +build !ignore_autogenerated + +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by controller-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AutoscalingSpec) DeepCopyInto(out *AutoscalingSpec) { + *out = *in + out.Window = in.Window + out.DownscaleStabilizationPeriod = in.DownscaleStabilizationPeriod + out.UpscaleStabilizationPeriod = in.UpscaleStabilizationPeriod +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AutoscalingSpec. +func (in *AutoscalingSpec) DeepCopy() *AutoscalingSpec { + if in == nil { + return nil + } + out := new(AutoscalingSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ComputeSpec) DeepCopyInto(out *ComputeSpec) { + *out = *in + if in.CPU != nil { + in, out := &in.CPU, &out.CPU + x := (*in).DeepCopy() + *out = &x + } + if in.Mem != nil { + in, out := &in.Mem, &out.Mem + x := (*in).DeepCopy() + *out = &x + } + if in.Shm != nil { + in, out := &in.Shm, &out.Shm + x := (*in).DeepCopy() + *out = &x + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ComputeSpec. +func (in *ComputeSpec) DeepCopy() *ComputeSpec { + if in == nil { + return nil + } + out := new(ComputeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerSpec) DeepCopyInto(out *ContainerSpec) { + *out = *in + if in.Command != nil { + in, out := &in.Command, &out.Command + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Args != nil { + in, out := &in.Args, &out.Args + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]v1.EnvVar, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Compute != nil { + in, out := &in.Compute, &out.Compute + *out = new(ComputeSpec) + (*in).DeepCopyInto(*out) + } + if in.ReadinessProbe != nil { + in, out := &in.ReadinessProbe, &out.ReadinessProbe + *out = new(v1.Probe) + (*in).DeepCopyInto(*out) + } + if in.LivenessProbe != nil { + in, out := &in.LivenessProbe, &out.LivenessProbe + *out = new(v1.Probe) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSpec. +func (in *ContainerSpec) DeepCopy() *ContainerSpec { + if in == nil { + return nil + } + out := new(ContainerSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NetworkingSpec) DeepCopyInto(out *NetworkingSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NetworkingSpec. +func (in *NetworkingSpec) DeepCopy() *NetworkingSpec { + if in == nil { + return nil + } + out := new(NetworkingSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodSpec) DeepCopyInto(out *PodSpec) { + *out = *in + if in.Containers != nil { + in, out := &in.Containers, &out.Containers + *out = make([]ContainerSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodSpec. +func (in *PodSpec) DeepCopy() *PodSpec { + if in == nil { + return nil + } + out := new(PodSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RealtimeAPI) DeepCopyInto(out *RealtimeAPI) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + out.Status = in.Status +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RealtimeAPI. +func (in *RealtimeAPI) DeepCopy() *RealtimeAPI { + if in == nil { + return nil + } + out := new(RealtimeAPI) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RealtimeAPI) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RealtimeAPIList) DeepCopyInto(out *RealtimeAPIList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]RealtimeAPI, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RealtimeAPIList. +func (in *RealtimeAPIList) DeepCopy() *RealtimeAPIList { + if in == nil { + return nil + } + out := new(RealtimeAPIList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *RealtimeAPIList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RealtimeAPISpec) DeepCopyInto(out *RealtimeAPISpec) { + *out = *in + in.Pod.DeepCopyInto(&out.Pod) + out.Autoscaling = in.Autoscaling + if in.NodeGroups != nil { + in, out := &in.NodeGroups, &out.NodeGroups + *out = make([]string, len(*in)) + copy(*out, *in) + } + out.UpdateStrategy = in.UpdateStrategy + out.Networking = in.Networking +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RealtimeAPISpec. +func (in *RealtimeAPISpec) DeepCopy() *RealtimeAPISpec { + if in == nil { + return nil + } + out := new(RealtimeAPISpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RealtimeAPIStatus) DeepCopyInto(out *RealtimeAPIStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RealtimeAPIStatus. +func (in *RealtimeAPIStatus) DeepCopy() *RealtimeAPIStatus { + if in == nil { + return nil + } + out := new(RealtimeAPIStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpdateStrategySpec) DeepCopyInto(out *UpdateStrategySpec) { + *out = *in + out.MaxSurge = in.MaxSurge + out.MaxUnavailable = in.MaxUnavailable +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpdateStrategySpec. +func (in *UpdateStrategySpec) DeepCopy() *UpdateStrategySpec { + if in == nil { + return nil + } + out := new(UpdateStrategySpec) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/crds/config/crd/bases/serverless.cortex.dev_realtimeapis.yaml b/pkg/crds/config/crd/bases/serverless.cortex.dev_realtimeapis.yaml new file mode 100644 index 0000000000..7a8c6a755b --- /dev/null +++ b/pkg/crds/config/crd/bases/serverless.cortex.dev_realtimeapis.yaml @@ -0,0 +1,615 @@ + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.4.1 + creationTimestamp: null + name: realtimeapis.serverless.cortex.dev +spec: + group: serverless.cortex.dev + names: + kind: RealtimeAPI + listKind: RealtimeAPIList + plural: realtimeapis + singular: realtimeapi + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .status.ready + name: Ready + type: integer + - jsonPath: .status.requested + name: Requested + type: integer + - jsonPath: .status.up_to_date + name: Up-To-Date + type: integer + - jsonPath: .status.endpoint + name: Endpoint + type: string + name: v1alpha1 + schema: + openAPIV3Schema: + description: RealtimeAPI is the Schema for the realtimeapis API + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + spec: + description: RealtimeAPISpec defines the desired state of RealtimeAPI + properties: + autoscaling: + description: Autoscaling configuration + properties: + downscale_stabilization_period: + description: The API will not scale below the highest recommendation + made during this period + type: string + downscale_tolerance: + description: Any recommendation falling within this factor below + the current number of replicas will not trigger a scale down + event + type: string + init_replicas: + description: Initial number of replicas + format: int32 + type: integer + max_downscale_factor: + description: Maximum factor by which to scale down the API on + a single scaling event + type: string + max_replicas: + description: Maximum number of replicas + format: int32 + type: integer + max_upscale_factor: + description: Maximum factor by which to scale up the API on a + single scaling event + type: string + min_replicas: + description: Minimum number of replicas + format: int32 + type: integer + target_in_flight: + description: Desired number of in-flight requests per replica + (including requests actively being processed as well as queued), + which the autoscaler tries to maintain + type: string + upscale_stabilization_period: + description: The API will not scale above the lowest recommendation + made during this period + type: string + upscale_tolerance: + description: Any recommendation falling within this factor above + the current number of replicas will not trigger a scale up event + type: string + window: + description: Duration over which to average the API's in-flight + requests per replica + type: string + type: object + networking: + description: Networking configuration + properties: + endpoint: + description: Endpoint for the API + type: string + type: object + node_groups: + description: 'List of node groups on which this API can run (default: + all node groups are eligible)' + items: + type: string + type: array + pod: + description: Pod configuration + properties: + containers: + description: Configurations for the containers to run + items: + properties: + args: + description: Arguments to the entrypoint + items: + type: string + type: array + command: + description: Entrypoint (not executed within a shell) + items: + type: string + type: array + compute: + description: Compute resource requests + properties: + cpu: + anyOf: + - type: integer + - type: string + description: CPU request for the container; one unit + of CPU corresponds to one virtual CPU; fractional + requests are allowed, and can be specified as a floating + point number or via the "m" suffix + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + gpu: + description: GPU request for the container; one unit + of GPU corresponds to one virtual GPU + format: int64 + type: integer + inf: + description: Inferentia request for the container; one + unit of Inf corresponds to one virtual Inf chip + format: int64 + type: integer + mem: + anyOf: + - type: integer + - type: string + description: 'Memory request for the container; one + unit of memory is one byte and can be expressed as + an integer or by using one of these suffixes: K, M, + G, T (or their power-of two counterparts: Ki, Mi, + Gi, Ti)' + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + shm: + anyOf: + - type: integer + - type: string + description: Size of shared memory (/dev/shm) for sharing + data between multiple processes + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + type: object + env: + description: Environment variables to set in the container + items: + description: EnvVar represents an environment variable + present in a Container. + properties: + name: + description: Name of the environment variable. Must + be a C_IDENTIFIER. + type: string + value: + description: 'Variable references $(VAR_NAME) are + expanded using the previous defined environment + variables in the container and any service environment + variables. If a variable cannot be resolved, the + reference in the input string will be unchanged. + The $(VAR_NAME) syntax can be escaped with a double + $$, ie: $$(VAR_NAME). Escaped references will never + be expanded, regardless of whether the variable + exists or not. Defaults to "".' + type: string + valueFrom: + description: Source for the environment variable's + value. Cannot be used if value is not empty. + properties: + configMapKeyRef: + description: Selects a key of a ConfigMap. + properties: + key: + description: The key to select. + type: string + name: + description: 'Name of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, + kind, uid?' + type: string + optional: + description: Specify whether the ConfigMap + or its key must be defined + type: boolean + required: + - key + type: object + fieldRef: + description: 'Selects a field of the pod: supports + metadata.name, metadata.namespace, `metadata.labels['''']`, + `metadata.annotations['''']`, spec.nodeName, + spec.serviceAccountName, status.hostIP, status.podIP, + status.podIPs.' + properties: + apiVersion: + description: Version of the schema the FieldPath + is written in terms of, defaults to "v1". + type: string + fieldPath: + description: Path of the field to select in + the specified API version. + type: string + required: + - fieldPath + type: object + resourceFieldRef: + description: 'Selects a resource of the container: + only resources limits and requests (limits.cpu, + limits.memory, limits.ephemeral-storage, requests.cpu, + requests.memory and requests.ephemeral-storage) + are currently supported.' + properties: + containerName: + description: 'Container name: required for + volumes, optional for env vars' + type: string + divisor: + anyOf: + - type: integer + - type: string + description: Specifies the output format of + the exposed resources, defaults to "1" + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + resource: + description: 'Required: resource to select' + type: string + required: + - resource + type: object + secretKeyRef: + description: Selects a key of a secret in the + pod's namespace + properties: + key: + description: The key of the secret to select + from. Must be a valid secret key. + type: string + name: + description: 'Name of the referent. More info: + https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Add other useful fields. apiVersion, + kind, uid?' + type: string + optional: + description: Specify whether the Secret or + its key must be defined + type: boolean + required: + - key + type: object + type: object + required: + - name + type: object + type: array + image: + description: Docker image to use for the container + type: string + liveness_probe: + description: Periodic probe of container liveness; container + will be restarted if the probe fails + properties: + exec: + description: One and only one of the following should + be specified. Exec specifies the action to take. + properties: + command: + description: Command is the command line to execute + inside the container, the working directory for + the command is root ('/') in the container's + filesystem. The command is simply exec'd, it is + not run inside a shell, so traditional shell instructions + ('|', etc) won't work. To use a shell, you need + to explicitly call out to that shell. Exit status + of 0 is treated as live/healthy and non-zero is + unhealthy. + items: + type: string + type: array + type: object + failureThreshold: + description: Minimum consecutive failures for the probe + to be considered failed after having succeeded. Defaults + to 3. Minimum value is 1. + format: int32 + type: integer + httpGet: + description: HTTPGet specifies the http request to perform. + properties: + host: + description: Host name to connect to, defaults to + the pod IP. You probably want to set "Host" in + httpHeaders instead. + type: string + httpHeaders: + description: Custom headers to set in the request. + HTTP allows repeated headers. + items: + description: HTTPHeader describes a custom header + to be used in HTTP probes + properties: + name: + description: The header field name + type: string + value: + description: The header field value + type: string + required: + - name + - value + type: object + type: array + path: + description: Path to access on the HTTP server. + type: string + port: + anyOf: + - type: integer + - type: string + description: Name or number of the port to access + on the container. Number must be in the range + 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + scheme: + description: Scheme to use for connecting to the + host. Defaults to HTTP. + type: string + required: + - port + type: object + initialDelaySeconds: + description: 'Number of seconds after the container + has started before liveness probes are initiated. + More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' + format: int32 + type: integer + periodSeconds: + description: How often (in seconds) to perform the probe. + Default to 10 seconds. Minimum value is 1. + format: int32 + type: integer + successThreshold: + description: Minimum consecutive successes for the probe + to be considered successful after having failed. Defaults + to 1. Must be 1 for liveness and startup. Minimum + value is 1. + format: int32 + type: integer + tcpSocket: + description: 'TCPSocket specifies an action involving + a TCP port. TCP hooks not yet supported TODO: implement + a realistic TCP lifecycle hook' + properties: + host: + description: 'Optional: Host name to connect to, + defaults to the pod IP.' + type: string + port: + anyOf: + - type: integer + - type: string + description: Number or name of the port to access + on the container. Number must be in the range + 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + required: + - port + type: object + timeoutSeconds: + description: 'Number of seconds after which the probe + times out. Defaults to 1 second. Minimum value is + 1. More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' + format: int32 + type: integer + type: object + name: + description: Name of the container + type: string + readiness_probe: + description: Periodic probe of container readiness; traffic + will not be sent into the pod unless all containers' readiness + probes are succeeding + properties: + exec: + description: One and only one of the following should + be specified. Exec specifies the action to take. + properties: + command: + description: Command is the command line to execute + inside the container, the working directory for + the command is root ('/') in the container's + filesystem. The command is simply exec'd, it is + not run inside a shell, so traditional shell instructions + ('|', etc) won't work. To use a shell, you need + to explicitly call out to that shell. Exit status + of 0 is treated as live/healthy and non-zero is + unhealthy. + items: + type: string + type: array + type: object + failureThreshold: + description: Minimum consecutive failures for the probe + to be considered failed after having succeeded. Defaults + to 3. Minimum value is 1. + format: int32 + type: integer + httpGet: + description: HTTPGet specifies the http request to perform. + properties: + host: + description: Host name to connect to, defaults to + the pod IP. You probably want to set "Host" in + httpHeaders instead. + type: string + httpHeaders: + description: Custom headers to set in the request. + HTTP allows repeated headers. + items: + description: HTTPHeader describes a custom header + to be used in HTTP probes + properties: + name: + description: The header field name + type: string + value: + description: The header field value + type: string + required: + - name + - value + type: object + type: array + path: + description: Path to access on the HTTP server. + type: string + port: + anyOf: + - type: integer + - type: string + description: Name or number of the port to access + on the container. Number must be in the range + 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + scheme: + description: Scheme to use for connecting to the + host. Defaults to HTTP. + type: string + required: + - port + type: object + initialDelaySeconds: + description: 'Number of seconds after the container + has started before liveness probes are initiated. + More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' + format: int32 + type: integer + periodSeconds: + description: How often (in seconds) to perform the probe. + Default to 10 seconds. Minimum value is 1. + format: int32 + type: integer + successThreshold: + description: Minimum consecutive successes for the probe + to be considered successful after having failed. Defaults + to 1. Must be 1 for liveness and startup. Minimum + value is 1. + format: int32 + type: integer + tcpSocket: + description: 'TCPSocket specifies an action involving + a TCP port. TCP hooks not yet supported TODO: implement + a realistic TCP lifecycle hook' + properties: + host: + description: 'Optional: Host name to connect to, + defaults to the pod IP.' + type: string + port: + anyOf: + - type: integer + - type: string + description: Number or name of the port to access + on the container. Number must be in the range + 1 to 65535. Name must be an IANA_SVC_NAME. + x-kubernetes-int-or-string: true + required: + - port + type: object + timeoutSeconds: + description: 'Number of seconds after which the probe + times out. Defaults to 1 second. Minimum value is + 1. More info: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle#container-probes' + format: int32 + type: integer + type: object + required: + - image + - name + type: object + type: array + max_concurrency: + description: Maximum number of requests that will be concurrently + sent into the container + format: int32 + type: integer + max_queue_length: + description: Maximum number of requests per replica which will + be queued (beyond max_concurrency) before requests are rejected + with error code 503 + format: int32 + type: integer + port: + description: Port to which requests will be sent to + format: int32 + type: integer + required: + - containers + - max_concurrency + - max_queue_length + - port + type: object + replicas: + description: Number of desired replicas + format: int32 + type: integer + update_strategy: + description: Deployment strategy to use when replacing existing replicas + with new ones + properties: + max_surge: + anyOf: + - type: integer + - type: string + description: 'Maximum number of replicas that can be scheduled + above the desired number of replicas during an update; can be + an absolute number, e.g. 5, or a percentage of desired replicas, + e.g. 10% (default: 25%) (set to 0 to disable rolling updates)' + x-kubernetes-int-or-string: true + max_unavailable: + anyOf: + - type: integer + - type: string + description: maximum number of replicas that can be unavailable + during an update; can be an absolute number, e.g. 5, or a percentage + of desired replicas, e.g. 10% + x-kubernetes-int-or-string: true + type: object + required: + - networking + - pod + - replicas + type: object + status: + description: RealtimeAPIStatus defines the observed state of RealtimeAPI + properties: + endpoint: + description: URL of the deployed API + type: string + ready: + description: Number of ready pods + format: int32 + type: integer + requested: + description: Number of requested pods + format: int32 + type: integer + up_to_date: + description: Number of pods with the last requested spec + format: int32 + type: integer + type: object + type: object + served: true + storage: true + subresources: + status: {} +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/pkg/crds/config/crd/kustomization.yaml b/pkg/crds/config/crd/kustomization.yaml index 73e33703bb..77bbf7b21d 100644 --- a/pkg/crds/config/crd/kustomization.yaml +++ b/pkg/crds/config/crd/kustomization.yaml @@ -3,17 +3,20 @@ # It should be run by config/default resources: - bases/batch.cortex.dev_batchjobs.yaml +- bases/serverless.cortex.dev_realtimeapis.yaml #+kubebuilder:scaffold:crdkustomizeresource -patchesStrategicMerge: +#patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD #- patches/webhook_in_batchjobs.yaml +#- patches/webhook_in_realtimeapis.yaml #+kubebuilder:scaffold:crdkustomizewebhookpatch # [CERTMANAGER] To enable webhook, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD #- patches/cainjection_in_batchjobs.yaml +#- patches/cainjection_in_realtimeapis.yaml #+kubebuilder:scaffold:crdkustomizecainjectionpatch # the following config is for teaching kustomize how to do kustomization for CRDs. diff --git a/pkg/crds/config/crd/patches/cainjection_in_realtimeapis.yaml b/pkg/crds/config/crd/patches/cainjection_in_realtimeapis.yaml new file mode 100644 index 0000000000..a1311cf904 --- /dev/null +++ b/pkg/crds/config/crd/patches/cainjection_in_realtimeapis.yaml @@ -0,0 +1,7 @@ +# The following patch adds a directive for certmanager to inject CA into the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) + name: realtimeapis.api.cortex.dev diff --git a/pkg/crds/config/crd/patches/webhook_in_realtimeapis.yaml b/pkg/crds/config/crd/patches/webhook_in_realtimeapis.yaml new file mode 100644 index 0000000000..4ee0f5880c --- /dev/null +++ b/pkg/crds/config/crd/patches/webhook_in_realtimeapis.yaml @@ -0,0 +1,14 @@ +# The following patch enables a conversion webhook for the CRD +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: realtimeapis.api.cortex.dev +spec: + conversion: + strategy: Webhook + webhook: + clientConfig: + service: + namespace: system + name: webhook-service + path: /convert diff --git a/pkg/crds/config/rbac/realtimeapi_editor_role.yaml b/pkg/crds/config/rbac/realtimeapi_editor_role.yaml new file mode 100644 index 0000000000..34e836e2e9 --- /dev/null +++ b/pkg/crds/config/rbac/realtimeapi_editor_role.yaml @@ -0,0 +1,24 @@ +# permissions for end users to edit realtimeapis. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: realtimeapi-editor-role +rules: +- apiGroups: + - api.cortex.dev + resources: + - realtimeapis + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - api.cortex.dev + resources: + - realtimeapis/status + verbs: + - get diff --git a/pkg/crds/config/rbac/realtimeapi_viewer_role.yaml b/pkg/crds/config/rbac/realtimeapi_viewer_role.yaml new file mode 100644 index 0000000000..004387bf35 --- /dev/null +++ b/pkg/crds/config/rbac/realtimeapi_viewer_role.yaml @@ -0,0 +1,20 @@ +# permissions for end users to view realtimeapis. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: realtimeapi-viewer-role +rules: +- apiGroups: + - api.cortex.dev + resources: + - realtimeapis + verbs: + - get + - list + - watch +- apiGroups: + - api.cortex.dev + resources: + - realtimeapis/status + verbs: + - get diff --git a/pkg/crds/config/rbac/role.yaml b/pkg/crds/config/rbac/role.yaml index 4b64fb36ab..2c9787c377 100644 --- a/pkg/crds/config/rbac/role.yaml +++ b/pkg/crds/config/rbac/role.yaml @@ -23,6 +23,28 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - services + verbs: + - create + - get + - list + - patch + - update + - watch +- apiGroups: + - apps + resources: + - deployments + verbs: + - create + - get + - list + - patch + - update + - watch - apiGroups: - batch resources: @@ -60,3 +82,40 @@ rules: - get - patch - update +- apiGroups: + - networking.istio.io + resources: + - virtualservices + verbs: + - create + - get + - list + - patch + - update + - watch +- apiGroups: + - serverless.cortex.dev + resources: + - realtimeapis + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - serverless.cortex.dev + resources: + - realtimeapis/finalizers + verbs: + - update +- apiGroups: + - serverless.cortex.dev + resources: + - realtimeapis/status + verbs: + - get + - patch + - update diff --git a/pkg/crds/config/samples/serverless_v1alpha1_realtimeapi.yaml b/pkg/crds/config/samples/serverless_v1alpha1_realtimeapi.yaml new file mode 100644 index 0000000000..a3e9ab2f4f --- /dev/null +++ b/pkg/crds/config/samples/serverless_v1alpha1_realtimeapi.yaml @@ -0,0 +1,15 @@ +apiVersion: serverless.cortex.dev/v1alpha1 +kind: RealtimeAPI +metadata: + name: hello-world +spec: + pod: + containers: + - name: api + image: quay.io/cortexlabs-test/realtime-hello-world-cpu:latest + max_concurrency: 1 + max_queue_length: 100 + port: 8080 + replicas: 1 + networking: + endpoint: "/hello-world" diff --git a/pkg/crds/controllers/serverless/realtimeapi_controller.go b/pkg/crds/controllers/serverless/realtimeapi_controller.go new file mode 100644 index 0000000000..f422813acf --- /dev/null +++ b/pkg/crds/controllers/serverless/realtimeapi_controller.go @@ -0,0 +1,151 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serverlesscontroller + +import ( + "context" + "fmt" + + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + "github.com/cortexlabs/cortex/pkg/crds/controllers" + "github.com/cortexlabs/cortex/pkg/types/clusterconfig" + "github.com/go-logr/logr" + istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" + kapps "k8s.io/api/apps/v1" + kcore "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const _terminationGracePeriodSeconds int64 = 60 // seconds + +// RealtimeAPIReconciler reconciles a RealtimeAPI object +type RealtimeAPIReconciler struct { + client.Client + ClusterConfig *clusterconfig.Config + Log logr.Logger + Scheme *runtime.Scheme +} + +// +kubebuilder:rbac:groups=serverless.cortex.dev,resources=realtimeapis,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=serverless.cortex.dev,resources=realtimeapis/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=serverless.cortex.dev,resources=realtimeapis/finalizers,verbs=update +// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch +// +kubebuilder:rbac:groups=networking.istio.io,resources=virtualservices,verbs=get;list;watch;create;update;patch +// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *RealtimeAPIReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("realtimeapi", req.NamespacedName) + + // Step 1: get resource from request + api := serverless.RealtimeAPI{} + log.V(1).Info("retrieving resource") + if err := r.Get(ctx, req.NamespacedName, &api); err != nil { + if !kerrors.IsNotFound(err) { + log.Error(err, "failed to retrieve resource") + } + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + // Step 2: Update status + log.V(1).Info("getting deployment") + deployment, err := r.getDeployment(ctx, api) + if err != nil { + log.Error(err, "failed to get deployment") + return ctrl.Result{}, err + } + + log.V(1).Info("updating status") + if err = r.updateStatus(ctx, &api, deployment); err != nil { + if controllers.IsOptimisticLockError(err) { + log.Info("conflict during status update, retrying") + return ctrl.Result{Requeue: true}, nil + } + log.Error(err, "failed to update status") + return ctrl.Result{}, err + } + + // Step 3: Get or create deployment and API ids + desiredReplicasChanged := r.ensureDesiredReplicasRange(ctx, &api) + deploymentID, podID, specID, apiID := api.GetOrCreateAPIIDs() + + idsOutdated := api.Annotations["cortex.dev/deployment-id"] != deploymentID || + api.Annotations["cortex.dev/spec-id"] != specID || + api.Annotations["cortex.dev/api-id"] != apiID + + if api.Annotations == nil { + api.Annotations = map[string]string{} + } + + if api.Annotations["cortex.dev/deployment-id"] != deploymentID { + log.V(1).Info("updating deployment id annotation") + api.Annotations["cortex.dev/deployment-id"] = deploymentID + } + + if api.Annotations["cortex.dev/spec-id"] != specID { + log.V(1).Info("updating pod and spec id annotations") + api.Annotations["cortex.dev/pod-id"] = podID + api.Annotations["cortex.dev/spec-id"] = specID + } + + if api.Annotations["cortex.dev/api-id"] != apiID { + log.V(1).Info("updating api id annotation") + api.Annotations["cortex.dev/api-id"] = apiID + } + + if idsOutdated || desiredReplicasChanged { + if err = r.Update(ctx, &api); err != nil { + return ctrl.Result{}, err + } + } + + // Step 4: Create or Update Resources + svcOp, err := r.createOrUpdateService(ctx, api) + if err != nil { + return ctrl.Result{}, err + } + log.V(1).Info(fmt.Sprintf("service %s", svcOp)) + + vsOp, err := r.createOrUpdateVirtualService(ctx, api) + if err != nil { + return ctrl.Result{}, err + } + log.V(1).Info(fmt.Sprintf("virtual service %s", vsOp)) + + deployOp, err := r.createOrUpdateDeployment(ctx, api) + if err != nil { + return ctrl.Result{}, err + } + log.V(1).Info(fmt.Sprintf("deployment %s", deployOp)) + + return ctrl.Result{}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *RealtimeAPIReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&serverless.RealtimeAPI{}). + Owns(&kapps.Deployment{}). + Owns(&kcore.Service{}). + Owns(&istioclientnetworking.VirtualService{}). + Complete(r) +} diff --git a/pkg/crds/controllers/serverless/realtimeapi_controller_helpers.go b/pkg/crds/controllers/serverless/realtimeapi_controller_helpers.go new file mode 100644 index 0000000000..909c446475 --- /dev/null +++ b/pkg/crds/controllers/serverless/realtimeapi_controller_helpers.go @@ -0,0 +1,481 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serverlesscontroller + +import ( + "context" + "fmt" + + "github.com/cortexlabs/cortex/pkg/consts" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + "github.com/cortexlabs/cortex/pkg/lib/errors" + "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/maps" + "github.com/cortexlabs/cortex/pkg/lib/pointer" + s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/urls" + "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/cortex/pkg/workloads" + istionetworking "istio.io/api/networking/v1beta1" + istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" + kapps "k8s.io/api/apps/v1" + kcore "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + kresource "k8s.io/apimachinery/pkg/api/resource" + kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +func (r *RealtimeAPIReconciler) getDeployment(ctx context.Context, api serverless.RealtimeAPI) (*kapps.Deployment, error) { + req := client.ObjectKey{Namespace: api.Namespace, Name: workloads.K8sName(api.Name)} + deployment := kapps.Deployment{} + if err := r.Get(ctx, req, &deployment); err != nil { + if kerrors.IsNotFound(err) { + return nil, nil + } + return nil, err + } + return &deployment, nil +} + +func (r *RealtimeAPIReconciler) updateStatus(ctx context.Context, api *serverless.RealtimeAPI, deployment *kapps.Deployment) error { + var err error + api.Status.Endpoint, err = r.getEndpoint(ctx, api) + if err != nil { + return errors.Wrap(err, "failed to get api endpoint") + } + + if deployment != nil { + api.Status.Ready = deployment.Status.ReadyReplicas + api.Status.UpToDate = deployment.Status.UpdatedReplicas + if deployment.Spec.Replicas != nil { + api.Status.Requested = *deployment.Spec.Replicas + } + + if err = r.Status().Update(ctx, api); err != nil { + return err + } + } + + return nil +} + +func (r *RealtimeAPIReconciler) ensureDesiredReplicasRange(ctx context.Context, api *serverless.RealtimeAPI) bool { + replicasFieldChanged := false + desiredReplicas := api.Spec.Replicas + + if desiredReplicas < api.Spec.Autoscaling.MinReplicas { + desiredReplicas = api.Spec.Autoscaling.MinReplicas + replicasFieldChanged = true + } else if desiredReplicas > api.Spec.Autoscaling.MaxReplicas { + desiredReplicas = api.Spec.Autoscaling.MaxReplicas + replicasFieldChanged = true + } + + api.Spec.Replicas = desiredReplicas + return replicasFieldChanged +} + +func (r *RealtimeAPIReconciler) createOrUpdateDeployment(ctx context.Context, api serverless.RealtimeAPI) (controllerutil.OperationResult, error) { + deployment := kapps.Deployment{ + ObjectMeta: kmeta.ObjectMeta{ + Name: workloads.K8sName(api.Name), + Namespace: api.Namespace}, + } + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &deployment, func() error { + desiredDeployment := r.desiredDeployment(api) + deployment.Labels = desiredDeployment.Labels + deployment.Annotations = maps.MergeStrMapsString(deployment.Annotations, desiredDeployment.Annotations) + deployment.Spec = desiredDeployment.Spec + + if err := ctrl.SetControllerReference(&api, &deployment, r.Scheme); err != nil { + return err + } + return nil + }) + if err != nil { + return op, err + } + return op, nil +} + +func (r *RealtimeAPIReconciler) createOrUpdateService(ctx context.Context, api serverless.RealtimeAPI) (controllerutil.OperationResult, error) { + service := kcore.Service{ + ObjectMeta: kmeta.ObjectMeta{ + Name: workloads.K8sName(api.Name), + Namespace: api.Namespace}, + } + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &service, func() error { + desiredSvc := r.desiredService(api) + // We need to set fields individually because some are immutable + service.Labels = desiredSvc.Labels + service.Annotations = maps.MergeStrMapsString(service.Annotations, desiredSvc.Annotations) + service.Spec.Type = desiredSvc.Spec.Type + service.Spec.Ports = desiredSvc.Spec.Ports + service.Spec.Selector = desiredSvc.Spec.Selector + + if err := ctrl.SetControllerReference(&api, &service, r.Scheme); err != nil { + return err + } + + return nil + }) + if err != nil { + return op, err + } + return op, nil +} + +func (r *RealtimeAPIReconciler) createOrUpdateVirtualService(ctx context.Context, api serverless.RealtimeAPI) (controllerutil.OperationResult, error) { + vs := istioclientnetworking.VirtualService{ + ObjectMeta: kmeta.ObjectMeta{ + Name: workloads.K8sName(api.Name), + Namespace: api.Namespace}, + } + op, err := controllerutil.CreateOrUpdate(ctx, r.Client, &vs, func() error { + desiredVirtualService := r.desiredVirtualService(api) + vs.Labels = desiredVirtualService.Labels + vs.Annotations = maps.MergeStrMapsString(vs.Annotations, desiredVirtualService.Annotations) + vs.Spec = desiredVirtualService.Spec + + if err := ctrl.SetControllerReference(&api, &vs, r.Scheme); err != nil { + return err + } + + return nil + }) + if err != nil { + return op, err + } + return op, nil +} + +func (r *RealtimeAPIReconciler) getEndpoint(ctx context.Context, api *serverless.RealtimeAPI) (string, error) { + req := client.ObjectKey{Namespace: consts.IstioNamespace, Name: "ingressgateway-apis"} + svc := kcore.Service{} + if err := r.Get(ctx, req, &svc); err != nil { + return "", err + } + + ingress := svc.Status.LoadBalancer.Ingress + if len(ingress) == 0 { + return "", nil + } + + endpoint := urls.Join( + fmt.Sprintf("http://%s", svc.Status.LoadBalancer.Ingress[0].Hostname), + api.Spec.Networking.Endpoint, + ) + + return endpoint, nil +} + +func (r *RealtimeAPIReconciler) desiredDeployment(api serverless.RealtimeAPI) kapps.Deployment { + containers, volumes := r.desiredContainers(api) + + return *k8s.Deployment(&k8s.DeploymentSpec{ + Name: workloads.K8sName(api.Name), + Replicas: api.Spec.Replicas, + MaxSurge: pointer.String(api.Spec.UpdateStrategy.MaxSurge.String()), + MaxUnavailable: pointer.String(api.Spec.UpdateStrategy.MaxUnavailable.String()), + Labels: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + "apiID": api.Annotations["cortex.dev/api-id"], + "podID": api.Annotations["cortex.dev/pod-id"], + "deploymentID": api.Annotations["cortex.dev/deployment-id"], + "cortex.dev/api": "true", + }, + Annotations: r.generateAPIAnnotations(api), + Selector: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + }, + PodSpec: k8s.PodSpec{ + Labels: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + "podID": api.Annotations["cortex.dev/pod-id"], + "deploymentID": api.Annotations["cortex.dev/deployment-id"], + "cortex.dev/api": "true", + }, + Annotations: map[string]string{ + "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", + }, + K8sPodSpec: kcore.PodSpec{ + RestartPolicy: kcore.RestartPolicyAlways, + TerminationGracePeriodSeconds: pointer.Int64(_terminationGracePeriodSeconds), + Containers: containers, + NodeSelector: workloads.NodeSelectors(), + Tolerations: workloads.GenerateResourceTolerations(), + Affinity: workloads.GenerateNodeAffinities(api.Spec.NodeGroups), + Volumes: volumes, + ServiceAccountName: workloads.ServiceAccountName, + }, + }, + }) +} + +func (r *RealtimeAPIReconciler) desiredContainers(api serverless.RealtimeAPI) ([]kcore.Container, []kcore.Volume) { + containers, volumes := r.userContainers(api) + proxyContainer, proxyVolume := r.proxyContainer(api) + + containers = append(containers, proxyContainer) + volumes = append(volumes, proxyVolume) + + return containers, volumes +} + +func (r *RealtimeAPIReconciler) desiredService(api serverless.RealtimeAPI) kcore.Service { + return *k8s.Service(&k8s.ServiceSpec{ + Name: workloads.K8sName(api.Name), + PortName: "http", + Port: consts.ProxyPortInt32, + TargetPort: consts.ProxyPortInt32, + Annotations: r.generateAPIAnnotations(api), + Labels: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + "cortex.dev/api": "true", + }, + Selector: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + }, + }) +} + +func (r *RealtimeAPIReconciler) desiredVirtualService(api serverless.RealtimeAPI) istioclientnetworking.VirtualService { + var activatorWeight int32 + if api.Spec.Replicas == 0 || api.Status.Ready == 0 { + activatorWeight = 100 + } + + return *k8s.VirtualService(&k8s.VirtualServiceSpec{ + Name: workloads.K8sName(api.Name), + Gateways: []string{"apis-gateway"}, + Destinations: []k8s.Destination{ + { + ServiceName: workloads.K8sName(api.Name), + Weight: 100 - activatorWeight, + Port: uint32(consts.ProxyPortInt32), + Headers: &istionetworking.Headers{ + Response: &istionetworking.Headers_HeaderOperations{ + Set: map[string]string{ + consts.CortexOriginHeader: "api", + }, + }, + }, + }, + { + ServiceName: consts.ActivatorName, + Weight: activatorWeight, + Port: uint32(consts.ActivatorPortInt32), + Headers: &istionetworking.Headers{ + Request: &istionetworking.Headers_HeaderOperations{ + Set: map[string]string{ + consts.CortexAPINameHeader: api.Name, + consts.CortexTargetServiceHeader: fmt.Sprintf( + "http://%s.%s:%d", + workloads.K8sName(api.Name), + consts.DefaultNamespace, + consts.ProxyPortInt32, + ), + }, + }, + Response: &istionetworking.Headers_HeaderOperations{ + Set: map[string]string{ + consts.CortexOriginHeader: consts.ActivatorName, + }, + }, + }, + }, + }, + PrefixPath: pointer.String(api.Spec.Networking.Endpoint), + Rewrite: pointer.String("/"), + Annotations: r.generateAPIAnnotations(api), + Labels: map[string]string{ + "apiName": api.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + "apiID": api.Annotations["cortex.dev/api-id"], + "podID": api.Annotations["cortex.dev/pod-id"], + "deploymentID": api.Annotations["cortex.dev/deployment-id"], + "cortex.dev/api": "true", + }, + }) +} + +func (r *RealtimeAPIReconciler) userContainers(api serverless.RealtimeAPI) ([]kcore.Container, []kcore.Volume) { + volumes := []kcore.Volume{ + workloads.MntVolume(), + workloads.CortexVolume(), + workloads.ClientConfigVolume(), + } + containerMounts := []kcore.VolumeMount{ + workloads.MntMount(), + workloads.CortexMount(), + workloads.ClientConfigMount(), + } + + containers := make([]kcore.Container, len(api.Spec.Pod.Containers)) + for i, container := range api.Spec.Pod.Containers { + containerResourceList := kcore.ResourceList{} + containerResourceLimitsList := kcore.ResourceList{} + securityContext := kcore.SecurityContext{ + Privileged: pointer.Bool(true), + } + + if container.Compute != nil { + if container.Compute.CPU != nil { + containerResourceList[kcore.ResourceCPU] = *k8s.QuantityPtr(container.Compute.CPU.DeepCopy()) + } + + if container.Compute.Mem != nil { + containerResourceList[kcore.ResourceMemory] = *k8s.QuantityPtr(container.Compute.Mem.DeepCopy()) + } + + if container.Compute.GPU > 0 { + containerResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(container.Compute.GPU, kresource.DecimalSI) + containerResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(container.Compute.GPU, kresource.DecimalSI) + } + + if container.Compute.Inf > 0 { + totalHugePages := container.Compute.Inf * workloads.HugePagesMemPerInf + containerResourceList["aws.amazon.com/neuron"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) + containerResourceList["hugepages-2Mi"] = *kresource.NewQuantity(totalHugePages, kresource.BinarySI) + containerResourceLimitsList["aws.amazon.com/neuron"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) + containerResourceLimitsList["hugepages-2Mi"] = *kresource.NewQuantity(totalHugePages, kresource.BinarySI) + + securityContext.Capabilities = &kcore.Capabilities{ + Add: []kcore.Capability{ + "SYS_ADMIN", + "IPC_LOCK", + }, + } + } + + if container.Compute.Shm != nil { + volumes = append(volumes, workloads.ShmVolume(*container.Compute.Shm, "dshm-"+container.Name)) + containerMounts = append(containerMounts, workloads.ShmMount("dshm-"+container.Name)) + } + } + + containerEnvVars := workloads.BaseEnvVars + containerEnvVars = append(containerEnvVars, workloads.ClientConfigEnvVar()) + containerEnvVars = append(containerEnvVars, kcore.EnvVar{ + Name: "CORTEX_PORT", + Value: s.Int32(api.Spec.Pod.Port), + }) + containerEnvVars = append(containerEnvVars, container.Env...) + + containers[i] = kcore.Container{ + Name: container.Name, + Image: container.Image, + Command: container.Command, + Args: container.Args, + Env: containerEnvVars, + VolumeMounts: containerMounts, + LivenessProbe: container.LivenessProbe, + ReadinessProbe: container.ReadinessProbe, + Resources: kcore.ResourceRequirements{ + Requests: containerResourceList, + Limits: containerResourceLimitsList, + }, + ImagePullPolicy: kcore.PullAlways, + SecurityContext: &securityContext, + } + } + + return containers, volumes +} + +func (r *RealtimeAPIReconciler) proxyContainer(api serverless.RealtimeAPI) (kcore.Container, kcore.Volume) { + return kcore.Container{ + Name: workloads.ProxyContainerName, + Image: r.ClusterConfig.ImageProxy, + ImagePullPolicy: kcore.PullAlways, + Args: []string{ + "--cluster-config", + consts.DefaultInClusterConfigPath, + "--port", + consts.ProxyPortStr, + "--admin-port", + consts.AdminPortStr, + "--user-port", + s.Int32(api.Spec.Pod.Port), + "--max-concurrency", + s.Int32(api.Spec.Pod.MaxConcurrency), + "--max-queue-length", + s.Int32(api.Spec.Pod.MaxQueueLength), + }, + Ports: []kcore.ContainerPort{ + { + Name: consts.AdminPortName, + ContainerPort: consts.AdminPortInt32, + Protocol: kcore.ProtocolTCP, + }, + { + ContainerPort: consts.ProxyPortInt32, + Protocol: kcore.ProtocolTCP, + }, + }, + Env: workloads.BaseEnvVars, + EnvFrom: workloads.BaseClusterEnvVars(), + VolumeMounts: []kcore.VolumeMount{ + workloads.ClusterConfigMount(), + }, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexProxyCPU, + kcore.ResourceMemory: consts.CortexProxyMem, + }, + }, + ReadinessProbe: &kcore.Probe{ + Handler: kcore.Handler{ + HTTPGet: &kcore.HTTPGetAction{ + Path: "/healthz", + Port: intstr.FromInt(int(consts.AdminPortInt32)), + }, + }, + InitialDelaySeconds: 1, + TimeoutSeconds: 1, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 1, + }, + }, workloads.ClusterConfigVolume() +} + +func (r *RealtimeAPIReconciler) generateAPIAnnotations(api serverless.RealtimeAPI) map[string]string { + return map[string]string{ + userconfig.MinReplicasAnnotationKey: s.Int32(api.Spec.Autoscaling.MinReplicas), + userconfig.MaxReplicasAnnotationKey: s.Int32(api.Spec.Autoscaling.MaxReplicas), + userconfig.TargetInFlightAnnotationKey: api.Spec.Autoscaling.TargetInFlight, + userconfig.WindowAnnotationKey: api.Spec.Autoscaling.Window.Duration.String(), + userconfig.DownscaleStabilizationPeriodAnnotationKey: api.Spec.Autoscaling.DownscaleStabilizationPeriod.Duration.String(), + userconfig.UpscaleStabilizationPeriodAnnotationKey: api.Spec.Autoscaling.UpscaleStabilizationPeriod.Duration.String(), + userconfig.MaxDownscaleFactorAnnotationKey: api.Spec.Autoscaling.MaxDownscaleFactor, + userconfig.MaxUpscaleFactorAnnotationKey: api.Spec.Autoscaling.MaxUpscaleFactor, + userconfig.DownscaleToleranceAnnotationKey: api.Spec.Autoscaling.DownscaleTolerance, + userconfig.UpscaleToleranceAnnotationKey: api.Spec.Autoscaling.UpscaleTolerance, + userconfig.MaxQueueLengthAnnotationKey: s.Int32(api.Spec.Pod.MaxQueueLength), + userconfig.MaxConcurrencyAnnotationKey: s.Int32(api.Spec.Pod.MaxConcurrency), + } +} diff --git a/pkg/crds/controllers/serverless/suite_test.go b/pkg/crds/controllers/serverless/suite_test.go new file mode 100644 index 0000000000..96c9626946 --- /dev/null +++ b/pkg/crds/controllers/serverless/suite_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package serverlesscontroller + +import ( + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + //+kubebuilder:scaffold:imports +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. + +//var cfg *rest.Config +var k8sClient client.Client +var testEnv *envtest.Environment + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecsWithDefaultAndCustomReporters(t, + "Controller Suite", + []Reporter{printer.NewlineReporter{}}) +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + + cfg, err := testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) + + err = serverless.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + + //+kubebuilder:scaffold:scheme + + k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) + Expect(err).NotTo(HaveOccurred()) + Expect(k8sClient).NotTo(BeNil()) + +}, 60) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + err := testEnv.Stop() + Expect(err).NotTo(HaveOccurred()) +}) diff --git a/pkg/crds/main.go b/pkg/crds/main.go index ee8c0c476b..9948837832 100644 --- a/pkg/crds/main.go +++ b/pkg/crds/main.go @@ -34,6 +34,7 @@ import ( // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" + istioscheme "istio.io/client-go/pkg/clientset/versioned/scheme" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -42,7 +43,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" batch "github.com/cortexlabs/cortex/pkg/crds/apis/batch/v1alpha1" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" batchcontrollers "github.com/cortexlabs/cortex/pkg/crds/controllers/batch" + serverlesscontrollers "github.com/cortexlabs/cortex/pkg/crds/controllers/serverless" //+kubebuilder:scaffold:imports ) @@ -53,8 +56,10 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(istioscheme.AddToScheme(scheme)) utilruntime.Must(batch.AddToScheme(scheme)) + utilruntime.Must(serverless.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -151,7 +156,7 @@ func main() { if err = (&batchcontrollers.BatchJobReconciler{ Client: mgr.GetClient(), Config: batchcontrollers.BatchJobReconcilerConfig{}.ApplyDefaults(), - Log: ctrl.Log.WithName("controllers").WithName("BatchJob"), + Log: ctrl.Log.WithName("controllers").WithName("batch").WithName("BatchJob"), ClusterConfig: clusterConfig, AWS: awsClient, Prometheus: promv1.NewAPI(promClient), @@ -160,6 +165,15 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "BatchJob") os.Exit(1) } + if err = (&serverlesscontrollers.RealtimeAPIReconciler{ + Client: mgr.GetClient(), + ClusterConfig: clusterConfig, + Log: ctrl.Log.WithName("controllers").WithName("serverless").WithName("RealtimeAPI"), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "RealtimeAPI") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/pkg/operator/endpoints/logs.go b/pkg/operator/endpoints/logs.go index 68f5d938d7..09651333d5 100644 --- a/pkg/operator/endpoints/logs.go +++ b/pkg/operator/endpoints/logs.go @@ -108,7 +108,7 @@ func GetLogURL(w http.ResponseWriter, r *http.Request) { LogURL: logURL, }) case userconfig.RealtimeAPIKind: - apiResponse, err := realtimeapi.GetAPIByName(deployedResource) + apiResponse, err := realtimeapi.GetAPIByName(apiName) if err != nil { respondError(w, r, err) return diff --git a/pkg/operator/operator/logging.go b/pkg/operator/operator/logging.go index f49746f64b..51e6ece794 100644 --- a/pkg/operator/operator/logging.go +++ b/pkg/operator/operator/logging.go @@ -101,40 +101,6 @@ func initializeLogger(key string, level userconfig.LogLevel, fields map[string]i return sugarLogger, nil } -func GetRealtimeAPILogger(apiName string, apiID string) (*zap.SugaredLogger, error) { - loggerCacheKey := fmt.Sprintf("apiName=%s,apiID=%s", apiName, apiID) - logger := getFromCacheOrNil(loggerCacheKey) - - if logger != nil { - return logger, nil - } - - apiSpec, err := DownloadAPISpec(apiName, apiID) - if err != nil { - return nil, err - } - - return initializeLogger(loggerCacheKey, userconfig.InfoLogLevel, map[string]interface{}{ - "apiName": apiSpec.Name, - "apiKind": apiSpec.Kind.String(), - "apiID": apiSpec.ID, - }) -} - -func GetRealtimeAPILoggerFromSpec(apiSpec *spec.API) (*zap.SugaredLogger, error) { - loggerCacheKey := fmt.Sprintf("apiName=%s,apiID=%s", apiSpec.Name, apiSpec.ID) - logger := getFromCacheOrNil(loggerCacheKey) - if logger != nil { - return logger, nil - } - - return initializeLogger(loggerCacheKey, userconfig.InfoLogLevel, map[string]interface{}{ - "apiName": apiSpec.Name, - "apiKind": apiSpec.Kind.String(), - "apiID": apiSpec.ID, - }) -} - func GetJobLogger(jobKey spec.JobKey) (*zap.SugaredLogger, error) { loggerCacheKey := fmt.Sprintf("apiName=%s,jobID=%s", jobKey.APIName, jobKey.ID) logger := getFromCacheOrNil(loggerCacheKey) diff --git a/pkg/operator/resources/asyncapi/api.go b/pkg/operator/resources/asyncapi/api.go index 508a7b5121..46e5efb2a8 100644 --- a/pkg/operator/resources/asyncapi/api.go +++ b/pkg/operator/resources/asyncapi/api.go @@ -333,7 +333,6 @@ func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.AP if err != nil { return nil, err } - apiStatus.ReplicaCounts = GetReplicaCounts(apiDeployment, apiPods) apiEndpoint, err := operator.APIEndpointFromResource(deployedResource) if err != nil { @@ -344,10 +343,10 @@ func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.AP return []schema.APIResponse{ { - Metadata: apiMetadata, - Status: apiStatus, - Endpoint: &apiEndpoint, - DashboardURL: dashboardURL, + Metadata: apiMetadata, + ReplicaCounts: GetReplicaCounts(apiStatus, apiDeployment, apiPods), + Endpoint: &apiEndpoint, + DashboardURL: dashboardURL, }, }, nil } @@ -524,7 +523,7 @@ func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { return false, err } - replicaCounts := GetReplicaCounts(deployment, pods) + replicaCounts := GetReplicaCounts(nil, deployment, pods) autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) if err != nil { diff --git a/pkg/operator/resources/asyncapi/status.go b/pkg/operator/resources/asyncapi/status.go index 7035c31c01..41b0d11fab 100644 --- a/pkg/operator/resources/asyncapi/status.go +++ b/pkg/operator/resources/asyncapi/status.go @@ -23,9 +23,12 @@ import ( kcore "k8s.io/api/core/v1" ) -func GetReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) *status.ReplicaCounts { +func GetReplicaCounts(apiStatus *status.Status, deployment *kapps.Deployment, pods []kcore.Pod) *status.ReplicaCounts { counts := status.ReplicaCounts{} - counts.Requested = *deployment.Spec.Replicas + if apiStatus != nil { + counts.Requested = apiStatus.Requested + counts.UpToDate = apiStatus.UpToDate + } for i := range pods { pod := pods[i] diff --git a/pkg/operator/resources/realtimeapi/api.go b/pkg/operator/resources/realtimeapi/api.go index 885c661366..52b995bcb7 100644 --- a/pkg/operator/resources/realtimeapi/api.go +++ b/pkg/operator/resources/realtimeapi/api.go @@ -17,147 +17,156 @@ limitations under the License. package realtimeapi import ( + "context" "fmt" - "path/filepath" "sort" "time" "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" "github.com/cortexlabs/cortex/pkg/lib/errors" - "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/maps" "github.com/cortexlabs/cortex/pkg/lib/parallel" "github.com/cortexlabs/cortex/pkg/lib/pointer" - "github.com/cortexlabs/cortex/pkg/operator/lib/routines" + s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/operator/schema" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/status" "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/cortex/pkg/workloads" - istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" - kapps "k8s.io/api/apps/v1" + "github.com/google/go-cmp/cmp" kcore "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) -const _realtimeDashboardUID = "realtimeapi" - -func generateDeploymentID() string { - return k8s.RandomName()[:10] -} - func UpdateAPI(apiConfig *userconfig.API, force bool) (*spec.API, string, error) { - prevDeployment, prevService, prevVirtualService, err := getK8sResources(apiConfig.Name) - if err != nil { - return nil, "", err - } + ctx := context.Background() + var api serverless.RealtimeAPI + key := client.ObjectKey{Namespace: consts.DefaultNamespace, Name: apiConfig.Name} - initialDeploymentTime := time.Now().UnixNano() - deploymentID := generateDeploymentID() - if prevVirtualService != nil && prevVirtualService.Labels["initialDeploymentTime"] != "" { - var err error - initialDeploymentTime, err = k8s.ParseInt64Label(prevVirtualService, "initialDeploymentTime") - if err != nil { - return nil, "", err + err := config.K8s.Get(ctx, key, &api) + if err != nil { + if kerrors.IsNotFound(err) { + if kerrors.IsNotFound(err) { + api = k8sResourceFromAPIConfig(*apiConfig, nil) + if err = config.K8s.Create(ctx, &api); err != nil { + return nil, "", errors.Wrap(err, "failed to create realtime api resource") + } + + apiSpec := &spec.API{ + API: apiConfig, + ID: api.Annotations["cortex.dev/api-id"], + SpecID: api.Annotations["cortex.dev/spec-id"], + PodID: api.Annotations["cortex.dev/pod-id"], + DeploymentID: api.Annotations["cortex.dev/deployment-id"], + Key: spec.Key(apiConfig.Name, api.Annotations["cortex.dev/api-id"], config.ClusterConfig.ClusterUID), + InitialDeploymentTime: api.CreationTimestamp.Unix(), + LastUpdated: api.CreationTimestamp.Unix(), + MetadataRoot: spec.MetadataRoot(apiConfig.Name, config.ClusterConfig.ClusterUID), + } + + if err := config.AWS.UploadJSONToS3(apiSpec, config.ClusterConfig.Bucket, apiSpec.Key); err != nil { + return nil, "", errors.Wrap(err, "failed to upload api spec") + } + + return apiSpec, fmt.Sprintf("creating %s", apiConfig.Resource.UserString()), nil + } } - deploymentID = prevVirtualService.Labels["deploymentID"] + return nil, "", errors.Wrap(err, "failed to get realtime api resource") } - api := spec.GetAPISpec(apiConfig, initialDeploymentTime, deploymentID, config.ClusterConfig.ClusterUID) + desiredAPI := k8sResourceFromAPIConfig(*apiConfig, &api) - if prevDeployment == nil { - if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil { - return nil, "", errors.Wrap(err, "upload api spec") - } + apiSpec := &spec.API{ + API: apiConfig, + ID: desiredAPI.Annotations["cortex.dev/api-id"], + SpecID: desiredAPI.Annotations["cortex.dev/spec-id"], + PodID: desiredAPI.Annotations["cortex.dev/pod-id"], + DeploymentID: desiredAPI.Annotations["cortex.dev/deployment-id"], + Key: spec.Key(apiConfig.Name, desiredAPI.Annotations["cortex.dev/api-id"], config.ClusterConfig.ClusterUID), + InitialDeploymentTime: api.CreationTimestamp.Unix(), + MetadataRoot: spec.MetadataRoot(apiConfig.Name, config.ClusterConfig.ClusterUID), + } - if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - routines.RunWithPanicHandler(func() { - _ = deleteK8sResources(api.Name) - }) - return nil, "", err - } + if !cmp.Equal(api.Spec, desiredAPI.Spec) || force { + api.Spec = desiredAPI.Spec + api.Annotations = maps.MergeStrMapsString(api.Annotations, desiredAPI.Annotations) - return api, fmt.Sprintf("creating %s", api.Resource.UserString()), nil - } + lastUpdated := time.Now().Unix() + api.Annotations["cortex.dev/last-updated"] = s.Int64(lastUpdated) + apiSpec.LastUpdated = lastUpdated - if prevVirtualService.Labels["specID"] != api.SpecID || prevVirtualService.Labels["deploymentID"] != api.DeploymentID { - isUpdating, err := isAPIUpdating(prevDeployment) - if err != nil { - return nil, "", err - } - if isUpdating && !force { - return nil, "", ErrorAPIUpdating(api.Name) + if err = config.K8s.Update(ctx, &api); err != nil { + return nil, "", errors.Wrap(err, "failed to update realtime api resource") } - if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil { - return nil, "", errors.Wrap(err, "upload api spec") + if err := config.AWS.UploadJSONToS3(apiSpec, config.ClusterConfig.Bucket, apiSpec.Key); err != nil { + return nil, "", errors.Wrap(err, "failed to upload api spec") } - if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - return nil, "", err - } - return api, fmt.Sprintf("updating %s", api.Resource.UserString()), nil + return apiSpec, fmt.Sprintf("updating %s", apiConfig.Resource.UserString()), nil } - // deployment didn't change - isUpdating, err := isAPIUpdating(prevDeployment) - if err != nil { - return nil, "", err - } - if isUpdating { - return api, fmt.Sprintf("%s is already updating", api.Resource.UserString()), nil - } - return api, fmt.Sprintf("%s is up to date", api.Resource.UserString()), nil + return apiSpec, fmt.Sprintf("%s is up to date", apiConfig.Resource.UserString()), nil } -func RefreshAPI(apiName string, force bool) (string, error) { - prevDeployment, prevService, prevVirtualService, err := getK8sResources(apiName) - if err != nil { - return "", err - } else if prevDeployment == nil || prevVirtualService == nil { - return "", errors.ErrorUnexpected("unable to find deployment", apiName) - } +func RefreshAPI(apiName string) (string, error) { + ctx := context.Background() + var api serverless.RealtimeAPI + key := client.ObjectKey{Namespace: consts.DefaultNamespace, Name: apiName} - isUpdating, err := isAPIUpdating(prevDeployment) + err := config.K8s.Get(ctx, key, &api) if err != nil { - return "", err - } - - if isUpdating && !force { - return "", ErrorAPIUpdating(apiName) + return "", errors.Wrap(err, "failed to get realtime api resource") } - apiID, err := k8s.GetLabel(prevDeployment, "apiID") + apiSpec, err := operator.DownloadAPISpec(api.Name, api.Annotations["cortex.dev/api-id"]) if err != nil { return "", err } - api, err := operator.DownloadAPISpec(apiName, apiID) - if err != nil { - return "", err - } + // create new deployment + api.Annotations["cortex.dev/deployment-id"] = "" + deploymentID, _, _, apiID := api.GetOrCreateAPIIDs() + api.Annotations["cortex.dev/deployment-id"] = deploymentID - initialDeploymentTime, err := k8s.ParseInt64Label(prevVirtualService, "initialDeploymentTime") + err = config.K8s.Update(ctx, &api) if err != nil { - return "", err + return "", errors.Wrap(err, "failed to update realtime api resource") } - api = spec.GetAPISpec(api.API, initialDeploymentTime, generateDeploymentID(), config.ClusterConfig.ClusterUID) + apiSpec.ID = apiID + apiSpec.Key = spec.Key(apiName, apiID, config.ClusterConfig.ClusterUID) - if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil { - return "", errors.Wrap(err, "upload api spec") + if err := config.AWS.UploadJSONToS3(apiSpec, config.ClusterConfig.Bucket, apiSpec.Key); err != nil { + return "", errors.Wrap(err, "failed to upload api spec") } - if err := applyK8sResources(api, prevDeployment, prevService, prevVirtualService); err != nil { - return "", err + apiResource := userconfig.Resource{ + Name: apiName, + Kind: userconfig.RealtimeAPIKind, } - return fmt.Sprintf("updating %s", api.Resource.UserString()), nil + return fmt.Sprintf("updating %s", apiResource.UserString()), nil } func DeleteAPI(apiName string, keepCache bool) error { - err := parallel.RunFirstErr( + return parallel.RunFirstErr( func() error { - return deleteK8sResources(apiName) + ctx := context.Background() + api := serverless.RealtimeAPI{ + ObjectMeta: kmeta.ObjectMeta{ + Name: apiName, + Namespace: consts.DefaultNamespace, + }, + } + if err := config.K8s.Delete(ctx, &api); err != nil { + return errors.Wrap(err, "failed to delete realtime api resource") + } + return nil }, func() error { if keepCache { @@ -168,266 +177,116 @@ func DeleteAPI(apiName string, keepCache bool) error { return nil }, ) +} - if err != nil { - return err +func GetAllAPIs() ([]schema.APIResponse, error) { + ctx := context.Background() + apis := serverless.RealtimeAPIList{} + if err := config.K8s.List(ctx, &apis); err != nil { + return nil, errors.Wrap(err, "failed to list realtime api resources") } - return nil -} - -func GetAllAPIs(deployments []kapps.Deployment) ([]schema.APIResponse, error) { - realtimeAPIs := make([]schema.APIResponse, len(deployments)) - mappedRealtimeAPIs := make(map[string]schema.APIResponse, len(deployments)) - apiNames := make([]string, len(deployments)) + apiNames := make([]string, len(apis.Items)) + for i, api := range apis.Items { + apiNames[i] = api.Name + } - for i := range deployments { - apiName := deployments[i].Labels["apiName"] - apiNames[i] = apiName + realtimeAPIs := make([]schema.APIResponse, len(apis.Items)) + mappedRealtimeAPIs := map[string]schema.APIResponse{} + for i := range apis.Items { + api := apis.Items[i] - metadata, err := spec.MetadataFromDeployment(&deployments[i]) + metadata, err := metadataFromRealtimeAPI(&api) if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("api %s", apiName)) + return nil, errors.Wrap(err, fmt.Sprintf("api %s", api.Name)) } - mappedRealtimeAPIs[apiName] = schema.APIResponse{ - Status: status.FromDeployment(&deployments[i]), + + mappedRealtimeAPIs[api.Name] = schema.APIResponse{ Metadata: metadata, + Status: &status.Status{ + Ready: api.Status.Ready, + Requested: api.Status.Requested, + UpToDate: api.Status.UpToDate, + }, } } sort.Strings(apiNames) - for i := range apiNames { - realtimeAPIs[i] = mappedRealtimeAPIs[apiNames[i]] + for i, apiName := range apiNames { + realtimeAPIs[i] = mappedRealtimeAPIs[apiName] } return realtimeAPIs, nil } -func GetAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - deployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) - if err != nil { - return nil, err - } +func GetAPIByName(apiName string) ([]schema.APIResponse, error) { + ctx := context.Background() - if deployment == nil { - return nil, errors.ErrorUnexpected("unable to find deployment", deployedResource.Name) + api := serverless.RealtimeAPI{} + key := client.ObjectKey{Namespace: consts.DefaultNamespace, Name: apiName} + if err := config.K8s.Get(ctx, key, &api); err != nil { + return nil, errors.Wrap(err, "failed to get realtime api resource") } - apiStatus := status.FromDeployment(deployment) - apiMetadata, err := spec.MetadataFromDeployment(deployment) - if err != nil { - return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) - } - - api, err := operator.DownloadAPISpec(apiMetadata.Name, apiMetadata.APIID) + metadata, err := metadataFromRealtimeAPI(&api) if err != nil { return nil, err } - apiEndpoint, err := operator.APIEndpoint(api) + apiSpec, err := operator.DownloadAPISpec(api.Name, api.Annotations["cortex.dev/api-id"]) if err != nil { return nil, err } - dashboardURL := pointer.String(getDashboardURL(api.Name)) - return []schema.APIResponse{ { - Spec: api, - Metadata: apiMetadata, - Status: apiStatus, - Endpoint: &apiEndpoint, - DashboardURL: dashboardURL, + Spec: apiSpec, + Metadata: metadata, + Status: &status.Status{ + Ready: api.Status.Ready, + Requested: api.Status.Requested, + UpToDate: api.Status.UpToDate, + }, + Endpoint: &api.Status.Endpoint, + DashboardURL: pointer.String(getDashboardURL(apiName)), }, }, nil } -func DescribeAPIByName(deployedResource *operator.DeployedResource) ([]schema.APIResponse, error) { - deployment, err := config.K8s.GetDeployment(workloads.K8sName(deployedResource.Name)) - if err != nil { - return nil, err - } - - if deployment == nil { - return nil, errors.ErrorUnexpected("unable to find deployment", deployedResource.Name) - } +func DescribeAPIByName(apiName string) ([]schema.APIResponse, error) { + ctx := context.Background() - apiStatus := status.FromDeployment(deployment) - apiMetadata, err := spec.MetadataFromDeployment(deployment) - if err != nil { - return nil, errors.ErrorUnexpected("unable to obtain metadata", deployedResource.Name) + api := serverless.RealtimeAPI{} + key := client.ObjectKey{Namespace: consts.DefaultNamespace, Name: apiName} + if err := config.K8s.Get(ctx, key, &api); err != nil { + return nil, errors.Wrap(err, "failed to get realtime api resource") } - pods, err := config.K8s.ListPodsByLabel("apiName", deployment.Labels["apiName"]) + metadata, err := metadataFromRealtimeAPI(&api) if err != nil { return nil, err } - apiStatus.ReplicaCounts = GetReplicaCounts(deployment, pods) - apiEndpoint, err := operator.APIEndpointFromResource(deployedResource) - if err != nil { + var podList kcore.PodList + if err = config.K8s.List(ctx, &podList, client.MatchingLabels{ + "apiName": metadata.Name, + "apiKind": userconfig.RealtimeAPIKind.String(), + }); err != nil { return nil, err } - dashboardURL := pointer.String(getDashboardURL(deployedResource.Name)) + replicaCounts := getReplicaCounts(podList.Items, metadata) + replicaCounts.Requested = api.Status.Requested + replicaCounts.UpToDate = api.Status.UpToDate + + dashboardURL := pointer.String(getDashboardURL(api.Name)) return []schema.APIResponse{ { - Metadata: apiMetadata, - Status: apiStatus, - Endpoint: &apiEndpoint, - DashboardURL: dashboardURL, + Metadata: metadata, + ReplicaCounts: &replicaCounts, + Endpoint: &api.Status.Endpoint, + DashboardURL: dashboardURL, }, }, nil } - -func getK8sResources(apiName string) (*kapps.Deployment, *kcore.Service, *istioclientnetworking.VirtualService, error) { - var deployment *kapps.Deployment - var service *kcore.Service - var virtualService *istioclientnetworking.VirtualService - - err := parallel.RunFirstErr( - func() error { - var err error - deployment, err = config.K8s.GetDeployment(workloads.K8sName(apiName)) - return err - }, - func() error { - var err error - service, err = config.K8s.GetService(workloads.K8sName(apiName)) - return err - }, - func() error { - var err error - virtualService, err = config.K8s.GetVirtualService(workloads.K8sName(apiName)) - return err - }, - ) - - return deployment, service, virtualService, err -} - -func applyK8sResources(api *spec.API, prevDeployment *kapps.Deployment, prevService *kcore.Service, prevVirtualService *istioclientnetworking.VirtualService) error { - return parallel.RunFirstErr( - func() error { - return applyK8sDeployment(api, prevDeployment) - }, - func() error { - return applyK8sService(api, prevService) - }, - func() error { - return applyK8sVirtualService(api, prevVirtualService) - }, - ) -} - -func applyK8sDeployment(api *spec.API, prevDeployment *kapps.Deployment) error { - newDeployment := deploymentSpec(api, prevDeployment) - - if prevDeployment == nil { - _, err := config.K8s.CreateDeployment(newDeployment) - if err != nil { - return err - } - } else if prevDeployment.Status.ReadyReplicas == 0 { - // Delete deployment if it never became ready - _, _ = config.K8s.DeleteDeployment(workloads.K8sName(api.Name)) - _, err := config.K8s.CreateDeployment(newDeployment) - if err != nil { - return err - } - } else { - _, err := config.K8s.UpdateDeployment(newDeployment) - if err != nil { - return err - } - } - - return nil -} - -func applyK8sService(api *spec.API, prevService *kcore.Service) error { - newService := serviceSpec(api) - - if prevService == nil { - _, err := config.K8s.CreateService(newService) - return err - } - - _, err := config.K8s.UpdateService(prevService, newService) - return err -} - -func applyK8sVirtualService(api *spec.API, prevVirtualService *istioclientnetworking.VirtualService) error { - newVirtualService := virtualServiceSpec(api) - - if prevVirtualService == nil { - _, err := config.K8s.CreateVirtualService(newVirtualService) - return err - } - - _, err := config.K8s.UpdateVirtualService(prevVirtualService, newVirtualService) - return err -} - -func deleteK8sResources(apiName string) error { - return parallel.RunFirstErr( - func() error { - _, err := config.K8s.DeleteDeployment(workloads.K8sName(apiName)) - return err - }, - func() error { - _, err := config.K8s.DeleteService(workloads.K8sName(apiName)) - return err - }, - func() error { - _, err := config.K8s.DeleteVirtualService(workloads.K8sName(apiName)) - return err - }, - ) -} - -func deleteBucketResources(apiName string) error { - prefix := filepath.Join(config.ClusterConfig.ClusterUID, "apis", apiName) - return config.AWS.DeleteS3Dir(config.ClusterConfig.Bucket, prefix, true) -} - -// returns true if min_replicas are not ready and no updated replicas have errored -func isAPIUpdating(deployment *kapps.Deployment) (bool, error) { - pods, err := config.K8s.ListPodsByLabel("apiName", deployment.Labels["apiName"]) - if err != nil { - return false, err - } - - replicaCounts := GetReplicaCounts(deployment, pods) - - autoscalingSpec, err := userconfig.AutoscalingFromAnnotations(deployment) - if err != nil { - return false, err - } - - if replicaCounts.Ready < autoscalingSpec.MinReplicas && replicaCounts.TotalFailed() == 0 { - return true, nil - } - - return false, nil -} - -func isPodSpecLatest(deployment *kapps.Deployment, pod *kcore.Pod) bool { - return deployment.Spec.Template.Labels["podID"] == pod.Labels["podID"] && - deployment.Spec.Template.Labels["deploymentID"] == pod.Labels["deploymentID"] -} - -func getDashboardURL(apiName string) string { - loadBalancerURL, err := operator.LoadBalancerURL() - if err != nil { - return "" - } - - dashboardURL := fmt.Sprintf( - "%s/dashboard/d/%s/realtimeapi?orgId=1&refresh=30s&var-api_name=%s", - loadBalancerURL, _realtimeDashboardUID, apiName, - ) - - return dashboardURL -} diff --git a/pkg/operator/resources/realtimeapi/errors.go b/pkg/operator/resources/realtimeapi/errors.go deleted file mode 100644 index 58b60a7743..0000000000 --- a/pkg/operator/resources/realtimeapi/errors.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright 2021 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package realtimeapi - -import ( - "fmt" - - "github.com/cortexlabs/cortex/pkg/lib/errors" -) - -const ( - ErrAPIUpdating = "realtimeapi.api_updating" -) - -func ErrorAPIUpdating(apiName string) error { - return errors.WithStack(&errors.Error{ - Kind: ErrAPIUpdating, - Message: fmt.Sprintf("%s is updating (override with --force)", apiName), - }) -} diff --git a/pkg/operator/resources/realtimeapi/helpers.go b/pkg/operator/resources/realtimeapi/helpers.go new file mode 100644 index 0000000000..979ad90ef3 --- /dev/null +++ b/pkg/operator/resources/realtimeapi/helpers.go @@ -0,0 +1,255 @@ +/* +Copyright 2021 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package realtimeapi + +import ( + "fmt" + "path/filepath" + + "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" + "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + serverless "github.com/cortexlabs/cortex/pkg/crds/apis/serverless/v1alpha1" + "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/operator/operator" + "github.com/cortexlabs/cortex/pkg/types/spec" + "github.com/cortexlabs/cortex/pkg/types/status" + "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/cortex/pkg/workloads" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const _realtimeDashboardUID = "realtimeapi" + +func generateDeploymentID() string { + return k8s.RandomName()[:10] +} + +func getDashboardURL(apiName string) string { + loadBalancerURL, err := operator.LoadBalancerURL() + if err != nil { + return "" + } + + dashboardURL := fmt.Sprintf( + "%s/dashboard/d/%s/realtimeapi?orgId=1&refresh=30s&var-api_name=%s", + loadBalancerURL, _realtimeDashboardUID, apiName, + ) + + return dashboardURL +} + +// k8sResourceFromAPIConfig converts a cortex API config into a realtime API CRD resource +func k8sResourceFromAPIConfig(apiConfig userconfig.API, prevAPI *serverless.RealtimeAPI) v1alpha1.RealtimeAPI { + containers := make([]v1alpha1.ContainerSpec, len(apiConfig.Pod.Containers)) + for i := range apiConfig.Pod.Containers { + container := apiConfig.Pod.Containers[i] + var env []v1.EnvVar + for k, v := range container.Env { + env = append(env, v1.EnvVar{ + Name: k, + Value: v, + }) + } + + var compute *v1alpha1.ComputeSpec + if container.Compute != nil { + var cpu *resource.Quantity + if container.Compute.CPU != nil { + cpu = &container.Compute.CPU.Quantity + } + var mem *resource.Quantity + if container.Compute.Mem != nil { + mem = &container.Compute.Mem.Quantity + } + var shm *resource.Quantity + if container.Compute.Shm != nil { + shm = &container.Compute.Shm.Quantity + } + + compute = &v1alpha1.ComputeSpec{ + CPU: cpu, + GPU: container.Compute.GPU, + Inf: container.Compute.Inf, + Mem: mem, + Shm: shm, + } + } + + containers[i] = v1alpha1.ContainerSpec{ + Name: container.Name, + Image: container.Image, + Command: container.Command, + Args: container.Args, + Env: env, + Compute: compute, + ReadinessProbe: workloads.GetProbeSpec(container.ReadinessProbe), + LivenessProbe: workloads.GetProbeSpec(container.LivenessProbe), + } + } + + api := v1alpha1.RealtimeAPI{ + ObjectMeta: v12.ObjectMeta{ + Name: apiConfig.Name, + Namespace: consts.DefaultNamespace, + }, + Spec: v1alpha1.RealtimeAPISpec{ + Replicas: apiConfig.Autoscaling.InitReplicas, + Pod: v1alpha1.PodSpec{ + Port: *apiConfig.Pod.Port, + MaxConcurrency: int32(apiConfig.Pod.MaxConcurrency), + MaxQueueLength: int32(apiConfig.Pod.MaxQueueLength), + Containers: containers, + }, + Autoscaling: v1alpha1.AutoscalingSpec{ + InitReplicas: apiConfig.Autoscaling.InitReplicas, + MinReplicas: apiConfig.Autoscaling.MinReplicas, + MaxReplicas: apiConfig.Autoscaling.MaxReplicas, + TargetInFlight: fmt.Sprintf("%f", *apiConfig.Autoscaling.TargetInFlight), + Window: v12.Duration{Duration: apiConfig.Autoscaling.Window}, + DownscaleStabilizationPeriod: v12.Duration{Duration: apiConfig.Autoscaling.DownscaleStabilizationPeriod}, + UpscaleStabilizationPeriod: v12.Duration{Duration: apiConfig.Autoscaling.UpscaleStabilizationPeriod}, + MaxDownscaleFactor: fmt.Sprintf("%f", apiConfig.Autoscaling.MaxDownscaleFactor), + MaxUpscaleFactor: fmt.Sprintf("%f", apiConfig.Autoscaling.MaxUpscaleFactor), + DownscaleTolerance: fmt.Sprintf("%f", apiConfig.Autoscaling.DownscaleTolerance), + UpscaleTolerance: fmt.Sprintf("%f", apiConfig.Autoscaling.UpscaleTolerance), + }, + NodeGroups: apiConfig.NodeGroups, + UpdateStrategy: v1alpha1.UpdateStrategySpec{ + MaxSurge: intstr.FromString(apiConfig.UpdateStrategy.MaxSurge), + MaxUnavailable: intstr.FromString(apiConfig.UpdateStrategy.MaxUnavailable), + }, + Networking: v1alpha1.NetworkingSpec{ + Endpoint: *apiConfig.Networking.Endpoint, + }, + }, + } + + if prevAPI != nil { + // we should keep the existing number of replicas instead of init_replicas + api.Spec.Replicas = prevAPI.Spec.Replicas + if prevDeployID := prevAPI.Annotations["cortex.dev/deployment-id"]; prevDeployID != "" { + api.Annotations = map[string]string{ + "cortex.dev/deployment-id": prevDeployID, + } + } + } + + deploymentID, podID, specID, apiID := api.GetOrCreateAPIIDs() + api.Annotations = map[string]string{ + "cortex.dev/deployment-id": deploymentID, + "cortex.dev/spec-id": specID, + "cortex.dev/pod-id": podID, + "cortex.dev/api-id": apiID, + } + + return api +} + +func deleteBucketResources(apiName string) error { + prefix := filepath.Join(config.ClusterConfig.ClusterUID, "apis", apiName) + return config.AWS.DeleteS3Dir(config.ClusterConfig.Bucket, prefix, true) +} + +func metadataFromRealtimeAPI(sv *v1alpha1.RealtimeAPI) (*spec.Metadata, error) { + lastUpdated, err := spec.TimeFromAPIID(sv.Annotations["cortex.dev/api-id"]) + if err != nil { + return nil, err + } + return &spec.Metadata{ + Resource: &userconfig.Resource{ + Name: sv.Name, + Kind: userconfig.RealtimeAPIKind, + }, + APIID: sv.Annotations["cortex.dev/api-id"], + DeploymentID: sv.Annotations["cortex.dev/deployment-id"], + PodID: sv.Annotations["cortex.dev/pod-id"], + LastUpdated: lastUpdated.Unix(), + }, nil +} + +func getReplicaCounts(pods []v1.Pod, metadata *spec.Metadata) status.ReplicaCounts { + counts := status.ReplicaCounts{} + + for i := range pods { + pod := pods[i] + if pod.Labels["apiName"] != metadata.Name { + continue + } + addPodToReplicaCounts(&pods[i], metadata, &counts) + } + + return counts +} + +func addPodToReplicaCounts(pod *v1.Pod, metadata *spec.Metadata, counts *status.ReplicaCounts) { + latest := false + if isPodSpecLatest(pod, metadata) { + latest = true + } + + isPodReady := k8s.IsPodReady(pod) + if latest && isPodReady { + counts.Ready++ + return + } else if !latest && isPodReady { + counts.ReadyOutOfDate++ + return + } + + podStatus := k8s.GetPodStatus(pod) + + if podStatus == k8s.PodStatusTerminating { + counts.Terminating++ + return + } + + if !latest { + return + } + + switch podStatus { + case k8s.PodStatusPending: + counts.Pending++ + case k8s.PodStatusStalled: + counts.Stalled++ + case k8s.PodStatusCreating: + counts.Creating++ + case k8s.PodStatusReady: + counts.Ready++ + case k8s.PodStatusNotReady: + counts.NotReady++ + case k8s.PodStatusErrImagePull: + counts.ErrImagePull++ + case k8s.PodStatusFailed: + counts.Failed++ + case k8s.PodStatusKilled: + counts.Killed++ + case k8s.PodStatusKilledOOM: + counts.KilledOOM++ + case k8s.PodStatusUnknown: + counts.Unknown++ + } +} + +func isPodSpecLatest(pod *v1.Pod, metadata *spec.Metadata) bool { + return metadata.DeploymentID == pod.Labels["deploymentID"] && + metadata.PodID == pod.Labels["podID"] +} diff --git a/pkg/operator/resources/realtimeapi/k8s_specs.go b/pkg/operator/resources/realtimeapi/k8s_specs.go deleted file mode 100644 index 7a6824d14d..0000000000 --- a/pkg/operator/resources/realtimeapi/k8s_specs.go +++ /dev/null @@ -1,182 +0,0 @@ -/* -Copyright 2021 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package realtimeapi - -import ( - "fmt" - - "github.com/cortexlabs/cortex/pkg/consts" - "github.com/cortexlabs/cortex/pkg/lib/k8s" - "github.com/cortexlabs/cortex/pkg/lib/pointer" - s "github.com/cortexlabs/cortex/pkg/lib/strings" - "github.com/cortexlabs/cortex/pkg/types/spec" - "github.com/cortexlabs/cortex/pkg/workloads" - istionetworking "istio.io/api/networking/v1beta1" - istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" - kapps "k8s.io/api/apps/v1" - kcore "k8s.io/api/core/v1" -) - -var _terminationGracePeriodSeconds int64 = 60 // seconds - -func deploymentSpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment { - containers, volumes := workloads.RealtimeContainers(*api) - - return k8s.Deployment(&k8s.DeploymentSpec{ - Name: workloads.K8sName(api.Name), - Replicas: getRequestedReplicasFromDeployment(*api, prevDeployment), - MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge), - MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable), - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "apiID": api.ID, - "specID": api.SpecID, - "initialDeploymentTime": s.Int64(api.InitialDeploymentTime), - "deploymentID": api.DeploymentID, - "podID": api.PodID, - "cortex.dev/api": "true", - }, - Annotations: api.ToK8sAnnotations(), - Selector: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - }, - PodSpec: k8s.PodSpec{ - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "initialDeploymentTime": s.Int64(api.InitialDeploymentTime), - "deploymentID": api.DeploymentID, - "podID": api.PodID, - "cortex.dev/api": "true", - }, - Annotations: map[string]string{ - "traffic.sidecar.istio.io/excludeOutboundIPRanges": "0.0.0.0/0", - }, - K8sPodSpec: kcore.PodSpec{ - RestartPolicy: "Always", - TerminationGracePeriodSeconds: pointer.Int64(_terminationGracePeriodSeconds), - Containers: containers, - NodeSelector: workloads.NodeSelectors(), - Tolerations: workloads.GenerateResourceTolerations(), - Affinity: workloads.GenerateNodeAffinities(api.NodeGroups), - Volumes: volumes, - ServiceAccountName: workloads.ServiceAccountName, - }, - }, - }) -} - -func serviceSpec(api *spec.API) *kcore.Service { - return k8s.Service(&k8s.ServiceSpec{ - Name: workloads.K8sName(api.Name), - PortName: "http", - Port: consts.ProxyPortInt32, - TargetPort: consts.ProxyPortInt32, - Annotations: api.ToK8sAnnotations(), - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "cortex.dev/api": "true", - }, - Selector: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - }, - }) -} - -func virtualServiceSpec(api *spec.API) *istioclientnetworking.VirtualService { - var activatorWeight int32 - if api.Autoscaling.InitReplicas == 0 { - activatorWeight = 100 - } - - return k8s.VirtualService(&k8s.VirtualServiceSpec{ - Name: workloads.K8sName(api.Name), - Gateways: []string{"apis-gateway"}, - Destinations: []k8s.Destination{ - { - ServiceName: workloads.K8sName(api.Name), - Weight: 100 - activatorWeight, - Port: uint32(consts.ProxyPortInt32), - Headers: &istionetworking.Headers{ - Response: &istionetworking.Headers_HeaderOperations{ - Set: map[string]string{ - consts.CortexOriginHeader: "api", - }, - }, - }, - }, - { - ServiceName: consts.ActivatorName, - Weight: activatorWeight, - Port: uint32(consts.ActivatorPortInt32), - Headers: &istionetworking.Headers{ - Request: &istionetworking.Headers_HeaderOperations{ - Set: map[string]string{ - consts.CortexAPINameHeader: api.Name, - consts.CortexTargetServiceHeader: fmt.Sprintf( - "http://%s.%s:%d", - workloads.K8sName(api.Name), - consts.DefaultNamespace, - consts.ProxyPortInt32, - ), - }, - }, - Response: &istionetworking.Headers_HeaderOperations{ - Set: map[string]string{ - consts.CortexOriginHeader: consts.ActivatorName, - }, - }, - }, - }, - }, - PrefixPath: api.Networking.Endpoint, - Rewrite: pointer.String("/"), - Annotations: api.ToK8sAnnotations(), - Labels: map[string]string{ - "apiName": api.Name, - "apiKind": api.Kind.String(), - "apiID": api.ID, - "specID": api.SpecID, - "initialDeploymentTime": s.Int64(api.InitialDeploymentTime), - "deploymentID": api.DeploymentID, - "podID": api.PodID, - "cortex.dev/api": "true", - }, - }) -} - -func getRequestedReplicasFromDeployment(api spec.API, deployment *kapps.Deployment) int32 { - requestedReplicas := api.Autoscaling.InitReplicas - - if deployment != nil && deployment.Spec.Replicas != nil && *deployment.Spec.Replicas > 0 { - requestedReplicas = *deployment.Spec.Replicas - } - - if requestedReplicas < api.Autoscaling.MinReplicas { - requestedReplicas = api.Autoscaling.MinReplicas - } - - if requestedReplicas > api.Autoscaling.MaxReplicas { - requestedReplicas = api.Autoscaling.MaxReplicas - } - - return requestedReplicas -} diff --git a/pkg/operator/resources/realtimeapi/status.go b/pkg/operator/resources/realtimeapi/status.go deleted file mode 100644 index a90c42f387..0000000000 --- a/pkg/operator/resources/realtimeapi/status.go +++ /dev/null @@ -1,89 +0,0 @@ -/* -Copyright 2021 Cortex Labs, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package realtimeapi - -import ( - "github.com/cortexlabs/cortex/pkg/lib/k8s" - "github.com/cortexlabs/cortex/pkg/types/status" - kapps "k8s.io/api/apps/v1" - kcore "k8s.io/api/core/v1" -) - -func GetReplicaCounts(deployment *kapps.Deployment, pods []kcore.Pod) *status.ReplicaCounts { - counts := status.ReplicaCounts{} - counts.Requested = *deployment.Spec.Replicas - - for i := range pods { - pod := pods[i] - if pod.Labels["apiName"] != deployment.Labels["apiName"] { - continue - } - addPodToReplicaCounts(&pods[i], deployment, &counts) - } - - return &counts -} - -func addPodToReplicaCounts(pod *kcore.Pod, deployment *kapps.Deployment, counts *status.ReplicaCounts) { - latest := false - if isPodSpecLatest(deployment, pod) { - latest = true - } - - isPodReady := k8s.IsPodReady(pod) - if latest && isPodReady { - counts.Ready++ - return - } else if !latest && isPodReady { - counts.ReadyOutOfDate++ - return - } - - podStatus := k8s.GetPodStatus(pod) - - if podStatus == k8s.PodStatusTerminating { - counts.Terminating++ - return - } - - if !latest { - return - } - - switch podStatus { - case k8s.PodStatusPending: - counts.Pending++ - case k8s.PodStatusStalled: - counts.Stalled++ - case k8s.PodStatusCreating: - counts.Creating++ - case k8s.PodStatusReady: - counts.Ready++ - case k8s.PodStatusNotReady: - counts.NotReady++ - case k8s.PodStatusErrImagePull: - counts.ErrImagePull++ - case k8s.PodStatusFailed: - counts.Failed++ - case k8s.PodStatusKilled: - counts.Killed++ - case k8s.PodStatusKilledOOM: - counts.KilledOOM++ - case k8s.PodStatusUnknown: - counts.Unknown++ - } -} diff --git a/pkg/operator/resources/resources.go b/pkg/operator/resources/resources.go index 445571ad25..11cd50a5c8 100644 --- a/pkg/operator/resources/resources.go +++ b/pkg/operator/resources/resources.go @@ -174,7 +174,7 @@ func RefreshAPI(apiName string, force bool) (string, error) { switch deployedResource.Kind { case userconfig.RealtimeAPIKind: - return realtimeapi.RefreshAPI(apiName, force) + return realtimeapi.RefreshAPI(apiName) case userconfig.AsyncAPIKind: return asyncapi.RefreshAPI(apiName, force) default: @@ -297,12 +297,9 @@ func GetAPIs() ([]schema.APIResponse, error) { return nil, err } - var realtimeAPIDeployments []kapps.Deployment var asyncAPIDeployments []kapps.Deployment for _, deployment := range deployments { switch deployment.Labels["apiKind"] { - case userconfig.RealtimeAPIKind.String(): - realtimeAPIDeployments = append(realtimeAPIDeployments, deployment) case userconfig.AsyncAPIKind.String(): asyncAPIDeployments = append(asyncAPIDeployments, deployment) } @@ -323,7 +320,7 @@ func GetAPIs() ([]schema.APIResponse, error) { } } - realtimeAPIList, err := realtimeapi.GetAllAPIs(realtimeAPIDeployments) + realtimeAPIList, err := realtimeapi.GetAllAPIs() if err != nil { return nil, err } @@ -370,7 +367,7 @@ func GetAPI(apiName string) ([]schema.APIResponse, error) { switch deployedResource.Kind { case userconfig.RealtimeAPIKind: - apiResponse, err = realtimeapi.GetAPIByName(deployedResource) + apiResponse, err = realtimeapi.GetAPIByName(apiName) if err != nil { return nil, err } @@ -494,7 +491,7 @@ func DescribeAPI(apiName string) ([]schema.APIResponse, error) { switch deployedResource.Kind { case userconfig.RealtimeAPIKind: - apiResponse, err = realtimeapi.DescribeAPIByName(deployedResource) + apiResponse, err = realtimeapi.DescribeAPIByName(apiName) if err != nil { return nil, err } diff --git a/pkg/operator/schema/schema.go b/pkg/operator/schema/schema.go index 93acfd7a6d..04da32599c 100644 --- a/pkg/operator/schema/schema.go +++ b/pkg/operator/schema/schema.go @@ -58,6 +58,7 @@ type APIResponse struct { Spec *spec.API `json:"spec,omitempty" yaml:"spec,omitempty"` Metadata *spec.Metadata `json:"metadata,omitempty" yaml:"metadata,omitempty"` Status *status.Status `json:"status,omitempty" yaml:"status,omitempty"` + ReplicaCounts *status.ReplicaCounts `json:"replica_counts,omitempty" yaml:"replica_counts,omitempty"` NumTrafficSplitterTargets *int32 `json:"num_traffic_splitter_targets,omitempty" yaml:"num_traffic_splitter_targets,omitempty"` Endpoint *string `json:"endpoint,omitempty" yaml:"endpoint,omitempty"` DashboardURL *string `json:"dashboard_url,omitempty" yaml:"dashboard_url,omitempty"` diff --git a/pkg/types/spec/api.go b/pkg/types/spec/api.go index 5b0d39210c..3361114078 100644 --- a/pkg/types/spec/api.go +++ b/pkg/types/spec/api.go @@ -51,6 +51,7 @@ type API struct { type Metadata struct { *userconfig.Resource APIID string `json:"id" yaml:"id"` + PodID string `json:"pod_id,omitempty" yaml:"pod_id,omitempty"` DeploymentID string `json:"deployment_id,omitempty" yaml:"deployment_id,omitempty"` LastUpdated int64 `json:"last_updated" yaml:"last_updated"` } diff --git a/pkg/types/status/status.go b/pkg/types/status/status.go index e0de4943ef..6c61b3ac6b 100644 --- a/pkg/types/status/status.go +++ b/pkg/types/status/status.go @@ -21,15 +21,30 @@ import ( ) type Status struct { - Ready int32 `json:"ready" yaml:"ready"` // deployment-reported number of ready replicas (latest + out of date) - Requested int32 `json:"requested" yaml:"requested"` // deployment-reported number of requested replicas - UpToDate int32 `json:"up_to_date" yaml:"up_to_date"` // deployment-reported number of up-to-date replicas (in whichever phase they are found in) - ReplicaCounts *ReplicaCounts `json:"replica_counts,omitempty" yaml:"replica_counts,omitempty"` + Ready int32 `json:"ready" yaml:"ready"` // deployment-reported number of ready replicas (latest + out of date) + Requested int32 `json:"requested" yaml:"requested"` // deployment-reported number of requested replicas + UpToDate int32 `json:"up_to_date" yaml:"up_to_date"` // deployment-reported number of up-to-date replicas (in whichever phase they are found in) +} + +type ReplicaCounts struct { + Status + Pending int32 `json:"pending" yaml:"pending"` + Creating int32 `json:"creating" yaml:"creating"` + NotReady int32 `json:"not_ready" yaml:"not_ready"` + ReadyOutOfDate int32 `json:"ready_out_of_date" yaml:"ready_out_of_date"` + ErrImagePull int32 `json:"err_image_pull" yaml:"err_image_pull"` + Terminating int32 `json:"terminating" yaml:"terminating"` // includes up-to-date and out-of-date pods + Failed int32 `json:"failed" yaml:"failed"` + Killed int32 `json:"killed" yaml:"killed"` + KilledOOM int32 `json:"killed_oom" yaml:"killed_oom"` + Stalled int32 `json:"stalled" yaml:"stalled"` // pending for a long time + Unknown int32 `json:"unknown" yaml:"unknown"` } type ReplicaCountType string const ( + ReplicaCountUpToDate ReplicaCountType = "UpToDate" // total up-to-date pods ReplicaCountRequested ReplicaCountType = "Requested" // requested number of replicas (for up-to-date pods) ReplicaCountPending ReplicaCountType = "Pending" // pods that are in the pending state (for up-to-date pods) ReplicaCountCreating ReplicaCountType = "Creating" // pods that that have their init/non-init containers in the process of being created (for up-to-date pods) @@ -50,23 +65,7 @@ var ReplicaCountTypes []ReplicaCountType = []ReplicaCountType{ ReplicaCountNotReady, ReplicaCountReady, ReplicaCountReadyOutOfDate, ReplicaCountErrImagePull, ReplicaCountTerminating, ReplicaCountFailed, ReplicaCountKilled, ReplicaCountKilledOOM, ReplicaCountStalled, - ReplicaCountUnknown, -} - -type ReplicaCounts struct { - Requested int32 `json:"requested" yaml:"requested"` - Pending int32 `json:"pending" yaml:"pending"` - Creating int32 `json:"creating" yaml:"creating"` - NotReady int32 `json:"not_ready" yaml:"not_ready"` - Ready int32 `json:"ready" yaml:"ready"` - ReadyOutOfDate int32 `json:"ready_out_of_date" yaml:"ready_out_of_date"` - ErrImagePull int32 `json:"err_image_pull" yaml:"err_image_pull"` - Terminating int32 `json:"terminating" yaml:"terminating"` // includes up-to-date and out-of-date pods - Failed int32 `json:"failed" yaml:"failed"` - Killed int32 `json:"killed" yaml:"killed"` - KilledOOM int32 `json:"killed_oom" yaml:"killed_oom"` - Stalled int32 `json:"stalled" yaml:"stalled"` // pending for a long time - Unknown int32 `json:"unknown" yaml:"unknown"` + ReplicaCountUnknown, ReplicaCountUpToDate, } // Worker counts don't have as many failure variations because Jobs clean up dead pods, so counting different failure scenarios isn't interesting @@ -86,15 +85,21 @@ type WorkerCounts struct { } func FromDeployment(deployment *kapps.Deployment) *Status { + var requested int32 + if deployment.Spec.Replicas != nil { + requested = *deployment.Spec.Replicas + } return &Status{ Ready: deployment.Status.ReadyReplicas, - Requested: deployment.Status.Replicas, + Requested: requested, UpToDate: deployment.Status.UpdatedReplicas, } } func (counts *ReplicaCounts) GetCountBy(replicaType ReplicaCountType) int32 { switch replicaType { + case ReplicaCountUpToDate: + return counts.UpToDate case ReplicaCountRequested: return counts.Requested case ReplicaCountPending: diff --git a/pkg/workloads/helpers.go b/pkg/workloads/helpers.go index 1d0bf847ba..c14cb77d7a 100644 --- a/pkg/workloads/helpers.go +++ b/pkg/workloads/helpers.go @@ -228,7 +228,7 @@ func APIConfigMount(name string) kcore.VolumeMount { func ClientConfigMount() kcore.VolumeMount { return kcore.VolumeMount{ Name: _clientConfigDirVolume, - MountPath: path.Join(_clientConfigDir, "cli.yaml"), + MountPath: path.Join(clientConfigDir, "cli.yaml"), SubPath: "cli.yaml", } } @@ -248,3 +248,10 @@ func ShmMount(volumeName string) kcore.VolumeMount { func KubexitMount() kcore.VolumeMount { return k8s.EmptyDirVolumeMount(_kubexitGraveyardName, _kubexitGraveyardMountPath) } + +func ClientConfigEnvVar() kcore.EnvVar { + return kcore.EnvVar{ + Name: "CORTEX_CLI_CONFIG_DIR", + Value: clientConfigDir, + } +} diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 5e460aa7af..6e12445dc4 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -41,7 +41,7 @@ const ( const ( _cortexDirVolumeName = "cortex" _cortexDirMountPath = "/cortex" - _clientConfigDir = "/cortex/client" + clientConfigDir = "/cortex/client" _emptyDirVolumeName = "mnt" _emptyDirMountPath = "/mnt" @@ -67,7 +67,7 @@ var ( _statsdAddress = fmt.Sprintf("prometheus-statsd-exporter.%s:9125", consts.PrometheusNamespace) // each Inferentia chip requires 128 HugePages with each HugePage having a size of 2Mi - _hugePagesMemPerInf = int64(128 * 2 * 1024 * 1024) // bytes + HugePagesMemPerInf = int64(128 * 2 * 1024 * 1024) // bytes ) func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container, kcore.Volume) { @@ -349,7 +349,7 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } if container.Compute.Inf > 0 { - totalHugePages := container.Compute.Inf * _hugePagesMemPerInf + totalHugePages := container.Compute.Inf * HugePagesMemPerInf containerResourceList["aws.amazon.com/neuron"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) containerResourceList["hugepages-2Mi"] = *kresource.NewQuantity(totalHugePages, kresource.BinarySI) containerResourceLimitsList["aws.amazon.com/neuron"] = *kresource.NewQuantity(container.Compute.Inf, kresource.DecimalSI) @@ -369,12 +369,7 @@ func userPodContainers(api spec.API) ([]kcore.Container, []kcore.Volume) { } containerEnvVars := BaseEnvVars - - containerEnvVars = append(containerEnvVars, kcore.EnvVar{ - Name: "CORTEX_CLI_CONFIG_DIR", - Value: _clientConfigDir, - }) - + containerEnvVars = append(containerEnvVars, ClientConfigEnvVar()) if api.Kind != userconfig.TaskAPIKind { containerEnvVars = append(containerEnvVars, kcore.EnvVar{ Name: "CORTEX_PORT",