Skip to content

✨ Switch to Unstructured for all pod data handling so it supports all… #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
run: |
os=$(go env GOOS)
arch=$(go env GOARCH)
version=1.29.3
version=1.30.0
curl -L https://storage.googleapis.com/kubebuilder-tools/kubebuilder-tools-${version}-${os}-${arch}.tar.gz | tar -xz -C /tmp/
sudo mv /tmp/kubebuilder /usr/local/kubebuilder
- run: make test
Expand Down
199 changes: 118 additions & 81 deletions components/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package components

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

cu "github.com/coderanger/controller-utils"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -40,7 +41,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

migrationsv1beta1 "github.com/coderanger/migrations-operator/api/v1beta1"
argoprojstubv1alpha1 "github.com/coderanger/migrations-operator/stubs/argoproj/v1alpha1"
"github.com/coderanger/migrations-operator/utils"
"github.com/coderanger/migrations-operator/webhook"
)
Expand Down Expand Up @@ -83,6 +83,24 @@ func (comp *migrationsComponent) Setup(ctx *cu.Context, bldr *ctrl.Builder) erro
return nil
}

func deepCopyJSON(src map[string]interface{}, dest map[string]interface{}) error {
if src == nil {
return errors.New("src is nil. You cannot read from a nil map")
}
if dest == nil {
return errors.New("dest is nil. You cannot insert to a nil map")
}
jsonStr, err := json.Marshal(src)
if err != nil {
return err
}
err = json.Unmarshal(jsonStr, &dest)
if err != nil {
return err
}
return nil
}

func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
obj := ctx.Object.(*migrationsv1beta1.Migrator)

Expand All @@ -105,16 +123,18 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
}

// Find a template pod to start from.
allPods := &corev1.PodList{}
allPods := &unstructured.UnstructuredList{}
allPods.SetAPIVersion("v1")
allPods.SetKind("Pod")
err = ctx.Client.List(ctx, allPods, &client.ListOptions{Namespace: obj.Namespace})
if err != nil {
return cu.Result{}, errors.Wrapf(err, "error listing pods in namespace %s", obj.Namespace)
}
pods := []*corev1.Pod{}
var templatePod *corev1.Pod
pods := []*unstructured.Unstructured{}
var templatePod *unstructured.Unstructured
for i := range allPods.Items {
pod := &allPods.Items[i]
labelSet := labels.Set(pod.Labels)
labelSet := labels.Set(pod.GetLabels())
if selector.Matches(labelSet) {
pods = append(pods, pod)
if templatePod == nil && templateSelector.Matches(labelSet) {
Expand All @@ -138,54 +158,67 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
}

// Find the template container.
var templateContainer *corev1.Container
templatePodSpecContainers := templatePodSpec["containers"].([]interface{})
var templateContainer map[string]interface{}
if obj.Spec.Container != "" {
// Looking for a specific container name.
for _, c := range templatePodSpec.Containers {
if c.Name == obj.Spec.Container {
templateContainer = &c
for _, c := range templatePodSpecContainers {
container := c.(map[string]interface{})
if container["name"].(string) == obj.Spec.Container {
templateContainer = container
break
}
}
} else if len(templatePodSpec.Containers) > 0 {
templateContainer = &templatePodSpec.Containers[0]
} else if len(templatePodSpecContainers) > 0 {
templateContainer = templatePodSpecContainers[0].(map[string]interface{})
}
if templateContainer == nil {
// Welp, either nothing matched the name or somehow there are no containers.
return cu.Result{}, errors.New("no template container found")
}

// Build a migration job object.
migrationContainer := templateContainer.DeepCopy()
migrationContainer.Name = "migrations"
migrationContainer := make(map[string]interface{})
err = deepCopyJSON(templateContainer, migrationContainer)
if err != nil {
return cu.Result{}, errors.Wrap(err, "error copying template container")
}
migrationContainer["name"] = "migrations"
if obj.Spec.Image != "" {
migrationContainer.Image = obj.Spec.Image
migrationContainer["image"] = obj.Spec.Image
}
if obj.Spec.Command != nil {
migrationContainer.Command = *obj.Spec.Command
migrationContainer["command"] = *obj.Spec.Command
}
if obj.Spec.Args != nil {
migrationContainer.Args = *obj.Spec.Args
migrationContainer["args"] = *obj.Spec.Args
}
// TODO resources?

// Remove the probes since they will rarely work.
migrationContainer.ReadinessProbe = nil
migrationContainer.LivenessProbe = nil
migrationContainer.StartupProbe = nil
migrationContainer["readinessProbe"] = nil
migrationContainer["livenessProbe"] = nil
migrationContainer["startupProbe"] = nil

migrationPodSpec := templatePodSpec.DeepCopy()
migrationPodSpec.Containers = []corev1.Container{*migrationContainer}
migrationPodSpec.RestartPolicy = corev1.RestartPolicyNever
migrationPodSpec := make(map[string]interface{})
err = deepCopyJSON(templatePodSpec, migrationPodSpec)
if err != nil {
return cu.Result{}, errors.Wrap(err, "error copying template pod spec")
}
migrationPodSpec["containers"] = []map[string]interface{}{migrationContainer}
migrationPodSpec["restartPolicy"] = corev1.RestartPolicyNever

// Purge any migration wait initContainers since that would be a yodawg situation.
initContainers := []corev1.Container{}
for _, c := range migrationPodSpec.InitContainers {
if !strings.HasPrefix(c.Name, "migrate-wait-") {
initContainers = append(initContainers, c)
initContainers := []map[string]interface{}{}
if migrationPodSpec["initContainers"] != nil {
for _, c := range migrationPodSpec["initContainers"].([]interface{}) {
container := c.(map[string]interface{})
if !strings.HasPrefix(container["name"].(string), "migrate-wait-") {
initContainers = append(initContainers, container)
}
}
}
migrationPodSpec.InitContainers = initContainers
migrationPodSpec["initContainers"] = initContainers

// add labels to the job's pod template
jobTemplateLabels := map[string]string{"migrations": obj.Name}
Expand All @@ -205,21 +238,22 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
}
}

migrationJob := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: obj.Name + "-migrations",
Namespace: obj.Namespace,
Labels: obj.Labels,
Annotations: map[string]string{},
},
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: jobTemplateLabels,
Annotations: jobTemplateAnnotations,
},
Spec: *migrationPodSpec,
migrationJobName := obj.Name + "-migrations"
migrationJobNamespace := obj.Namespace
migrationJobImage := migrationContainer["image"].(string)
migrationJob := &unstructured.Unstructured{}
migrationJob.SetAPIVersion("batch/v1")
migrationJob.SetKind("Job")
migrationJob.SetName(migrationJobName)
migrationJob.SetNamespace(migrationJobNamespace)
migrationJob.SetLabels(obj.Labels)
migrationJob.UnstructuredContent()["spec"] = map[string]interface{}{
"template": map[string]interface{}{
"metadata": map[string]interface{}{
"labels": jobTemplateLabels,
"annotations": jobTemplateAnnotations,
},
"spec": migrationPodSpec,
},
}
err = controllerutil.SetControllerReference(obj, migrationJob, ctx.Scheme)
Expand All @@ -233,13 +267,13 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
if err != nil {
return cu.Result{}, errors.Wrap(err, "error getting latest migrator for status")
}
if uncachedObj.Status.LastSuccessfulMigration == migrationContainer.Image {
ctx.Conditions.SetfTrue(comp.GetReadyCondition(), "MigrationsUpToDate", "Migration %s already run", migrationContainer.Image)
if uncachedObj.Status.LastSuccessfulMigration == migrationJobImage {
ctx.Conditions.SetfTrue(comp.GetReadyCondition(), "MigrationsUpToDate", "Migration %s already run", migrationJobImage)
return cu.Result{}, nil
}

existingJob := &batchv1.Job{}
err = ctx.Client.Get(ctx, types.NamespacedName{Name: migrationJob.Name, Namespace: migrationJob.Namespace}, existingJob)
err = ctx.Client.Get(ctx, types.NamespacedName{Name: migrationJobName, Namespace: migrationJobNamespace}, existingJob)
if err != nil {
if kerrors.IsNotFound(err) {
// Try to start the migrations.
Expand All @@ -250,11 +284,11 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
ctx.Conditions.SetfUnknown(comp.GetReadyCondition(), "CreateError", "Error on create, possible conflict: %v", err)
return cu.Result{Requeue: true}, nil
}
ctx.Events.Eventf(obj, "Normal", "MigrationsStarted", "Started migration job %s/%s using image %s", migrationJob.Namespace, migrationJob.Name, migrationContainer.Image)
ctx.Conditions.SetfFalse(comp.GetReadyCondition(), "MigrationsRunning", "Started migration job %s/%s using image %s", migrationJob.Namespace, migrationJob.Name, migrationContainer.Image)
ctx.Events.Eventf(obj, "Normal", "MigrationsStarted", "Started migration job %s/%s using image %s", migrationJobNamespace, migrationJobName, migrationJobImage)
ctx.Conditions.SetfFalse(comp.GetReadyCondition(), "MigrationsRunning", "Started migration job %s/%s using image %s", migrationJobNamespace, migrationJobName, migrationJobImage)
return cu.Result{}, nil
} else {
return cu.Result{}, errors.Wrapf(err, "error getting existing migration job %s/%s", migrationJob.Namespace, migrationJob.Name)
return cu.Result{}, errors.Wrapf(err, "error getting existing migration job %s/%s", migrationJobNamespace, migrationJobName)
}
}

Expand All @@ -263,15 +297,15 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
if len(existingJob.Spec.Template.Spec.Containers) > 0 {
existingImage = existingJob.Spec.Template.Spec.Containers[0].Image
}
if existingImage == "" || existingImage != migrationContainer.Image {
if existingImage == "" || existingImage != migrationJobImage {
// Old, stale migration. Remove it and try again.
policy := metav1.DeletePropagationForeground
err = ctx.Client.Delete(ctx, existingJob, &client.DeleteOptions{PropagationPolicy: &policy})
if err != nil {
return cu.Result{}, errors.Wrapf(err, "error deleting stale migration job %s/%s", existingJob.Namespace, existingJob.Name)
}
ctx.Events.Eventf(obj, "Normal", "StaleJob", "Deleted stale migration job %s/%s (%s)", migrationJob.Namespace, migrationJob.Name, existingImage)
ctx.Conditions.SetfFalse(comp.GetReadyCondition(), "StaleJob", "Deleted stale migration job %s/%s (%s)", migrationJob.Namespace, migrationJob.Name, existingImage)
ctx.Events.Eventf(obj, "Normal", "StaleJob", "Deleted stale migration job %s/%s (%s)", migrationJobNamespace, migrationJobName, existingImage)
ctx.Conditions.SetfFalse(comp.GetReadyCondition(), "StaleJob", "Deleted stale migration job %s/%s (%s)", migrationJobNamespace, migrationJobName, existingImage)
return cu.Result{RequeueAfter: 1 * time.Second, SkipRemaining: true}, nil
}

Expand All @@ -284,7 +318,7 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
}
ctx.Events.Eventf(obj, "Normal", "MigrationsSucceeded", "Migration job %s/%s using image %s succeeded", existingJob.Namespace, existingJob.Name, existingImage)
ctx.Conditions.SetfTrue(comp.GetReadyCondition(), "MigrationsSucceeded", "Migration job %s/%s using image %s succeeded", existingJob.Namespace, existingJob.Name, existingImage)
obj.Status.LastSuccessfulMigration = migrationContainer.Image
obj.Status.LastSuccessfulMigration = migrationJobImage
return cu.Result{}, nil
}

Expand All @@ -301,27 +335,20 @@ func (comp *migrationsComponent) Reconcile(ctx *cu.Context) (cu.Result, error) {
return cu.Result{}, nil
}

func (_ *migrationsComponent) findOwners(ctx *cu.Context, obj client.Object) ([]client.Object, error) {
func (_ *migrationsComponent) findOwners(ctx *cu.Context, obj *unstructured.Unstructured) ([]*unstructured.Unstructured, error) {
namespace := obj.GetNamespace()
owners := []client.Object{}
owners := []*unstructured.Unstructured{}
for {
owners = append(owners, obj)
ref := metav1.GetControllerOfNoCopy(obj)
if ref == nil {
break
}
gvk := schema.FromAPIVersionAndKind(ref.APIVersion, ref.Kind)
ownerObj, err := ctx.Scheme.New(gvk)
if err != nil {
// Gracefully handle kinds that we haven't registered. Useful when a Rollout or Deployment is
// owned by someone's in-house operator
if runtime.IsNotRegisteredError(err) {
break
}
return nil, errors.Wrapf(err, "error finding object type for owner reference %v", ref)
}
obj = ownerObj.(client.Object)
err = ctx.Client.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: namespace}, obj)
obj = &unstructured.Unstructured{}
obj.SetGroupVersionKind(gvk)
obj.SetName(ref.Name) // Is this needed?
err := ctx.Client.Get(ctx, types.NamespacedName{Name: ref.Name, Namespace: namespace}, obj)
if err != nil {
// Gracefully handle objects we don't have access to
if kerrors.IsForbidden(err) {
Expand All @@ -337,34 +364,44 @@ func (_ *migrationsComponent) findOwners(ctx *cu.Context, obj client.Object) ([]
return owners, nil
}

func (_ *migrationsComponent) findSpecFor(ctx *cu.Context, obj client.Object) *corev1.PodSpec {
switch v := obj.(type) {
case *corev1.Pod:
return &v.Spec
case *appsv1.Deployment:
return &v.Spec.Template.Spec
case *argoprojstubv1alpha1.Rollout:
if v.Spec.WorkloadRef != nil {
if v.Spec.WorkloadRef.Kind == "Deployment" {
deployment := appsv1.Deployment{}
err := ctx.Client.Get(ctx, client.ObjectKey{Namespace: v.Namespace, Name: v.Spec.WorkloadRef.Name}, &deployment)
func (_ *migrationsComponent) findSpecFor(ctx *cu.Context, obj *unstructured.Unstructured) map[string]interface{} {
gvk := obj.GetObjectKind().GroupVersionKind()
switch fmt.Sprintf("%s/%s", gvk.Group, gvk.Kind) {
case "/Pod":
return obj.UnstructuredContent()["spec"].(map[string]interface{})
case "apps/Deployment":
spec := obj.UnstructuredContent()["spec"].(map[string]interface{})
template := spec["template"].(map[string]interface{})
return template["spec"].(map[string]interface{})
case "argoproj.io/Rollout":
spec := obj.UnstructuredContent()["spec"].(map[string]interface{})
if spec["workloadRef"] != nil {
workloadRef := spec["workloadRef"].(map[string]interface{})
workloadKind := workloadRef["kind"].(string)
if workloadKind == "Deployment" {
deployment := &unstructured.Unstructured{}
deployment.SetAPIVersion(workloadRef["apiVersion"].(string))
deployment.SetKind(workloadKind)
err := ctx.Client.Get(ctx, types.NamespacedName{Name: workloadRef["name"].(string), Namespace: obj.GetNamespace()}, deployment)
if err != nil {
return nil
}
return &deployment.Spec.Template.Spec
deploymentSpec := deployment.UnstructuredContent()["spec"].(map[string]interface{})
deploymentTemplate := deploymentSpec["template"].(map[string]interface{})
return deploymentTemplate["spec"].(map[string]interface{})
} else {
// TODO handle other WorkloadRef types
return nil
}
}
return &v.Spec.Template.Spec
// TODO other types. lots of them.
template := spec["template"].(map[string]interface{})
return template["spec"].(map[string]interface{})
default:
return nil
}
}

func (comp *migrationsComponent) findOwnerSpec(ctx *cu.Context, obj client.Object) (*corev1.PodSpec, error) {
func (comp *migrationsComponent) findOwnerSpec(ctx *cu.Context, obj *unstructured.Unstructured) (map[string]interface{}, error) {
owners, err := comp.findOwners(ctx, obj)
if err != nil {
return nil, err
Expand Down
Loading
Loading