diff --git a/cmd/recorder/main.go b/cmd/recorder/main.go index cfaad51208..f935b43f45 100644 --- a/cmd/recorder/main.go +++ b/cmd/recorder/main.go @@ -35,23 +35,16 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" flag "github.com/spf13/pflag" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/rest" "k8s.io/klog/v2" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" crlog "sigs.k8s.io/controller-runtime/pkg/log" ) -const ( - NumberOfWorkers = 20 - MaximumListResults = 50 -) - var ( logger = crlog.Log.WithName("setup") appliedResources = metrics.NewAppliedResourcesCollector() @@ -128,12 +121,6 @@ func run(ctx context.Context) error { return fmt.Errorf("error getting kubernetes configuration: %w", err) } - // Get a client to talk to the APIServer - kubeClient, err := client.New(restConfig, client.Options{}) - if err != nil { - return fmt.Errorf("building kubernetes client: %w", err) - } - restHTTPClient, err := rest.HTTPClientFor(restConfig) if err != nil { return fmt.Errorf("building kubernetes http client: %w", err) @@ -147,76 +134,82 @@ func run(ctx context.Context) error { crdGVR := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"} crdInfos := kube.WatchKube(ctx, kubeTarget, crdGVR, buildCRDInfo) + statViews := make(map[CRDInfo]*kube.KubeView[ResourceStats]) + for { time.Sleep(time.Duration(metricInterval) * time.Second) + + // Reset all metrics before updating. + appliedResources.Reset() + + // Skip reporting if CRDs aren't synced up. if !crdInfos.HasSyncedOnce() { - logger.Info("CRDs have not yet synced, skipping metric collection") + logger.Info("CRDs have not yet synced, skipping metric reporting") continue } - var gvks []schema.GroupVersionKind + + seenCRDs := make(map[CRDInfo]bool) for _, crdInfo := range crdInfos.Snapshot() { + // Skip non-KCC resources. if !strings.HasSuffix(crdInfo.GVK.Group, ".cnrm.cloud.google.com") { continue } - gvks = append(gvks, crdInfo.GVK) - } - if err := doRecord(ctx, kubeClient, gvks); err != nil { - logger.Error(err, "error recording metrics") - } - } -} + // Skip ignored CRDs. + crdName := gvkToCRDName(crdInfo.GVK) + if _, ok := opk8s.IgnoredCRDList[crdName]; ok { + logger.Error(fmt.Errorf("unexpected CRD %s", crdName), + fmt.Sprintf("please run `kubectl delete crd %s` to "+ + "delete the orphaned CRD", crdName), + "crd", crdName) + continue + } -func doRecord(ctx context.Context, c client.Client, gvks []schema.GroupVersionKind) error { - logger := klog.FromContext(ctx) + // Record all KCC CRDs we see, so we can clean up unused watches. + seenCRDs[crdInfo] = true - // reset all metrics in this vector before the new run of recording - appliedResources.Reset() - // worker pool with semaphore - sem := make(chan struct{}, NumberOfWorkers) - for _, gvk := range gvks { - gvk := gvk - sem <- struct{}{} - go func() { - defer func() { <-sem }() - err := recordMetricsForGVK(ctx, c, gvk) - if err != nil { - logger.Error(err, "error recording metrics for CRD", "gvk", gvk.String()) + // Register watch for this resource if we haven't already. + if _, ok := statViews[crdInfo]; !ok { + statView := kube.WatchKube(ctx, kubeTarget, crdInfo.GVR, gatherResourceStats) + statViews[crdInfo] = statView } - }() - } - for i := 0; i < NumberOfWorkers; i++ { - sem <- struct{}{} - } - logger.Info("finished one run of recording resource metrics.") - return nil -} -func forEach(ctx context.Context, c client.Client, gvk schema.GroupVersionKind, listOptions *client.ListOptions, fn func(unstructured.Unstructured) error) error { - logger := klog.FromContext(ctx) - for ok := true; ok; ok = listOptions.Continue != "" { - crdName := gvkToCRDName(gvk) - if _, ok := opk8s.IgnoredCRDList[crdName]; ok { - logger.Error(fmt.Errorf("unexpected CRD %s", crdName), - fmt.Sprintf("please run `kubectl delete crd %s` to "+ - "delete the orphaned CRD", crdName), - "crd", crdName) - continue - } - list := unstructured.UnstructuredList{} - list.SetGroupVersionKind(gvk) - err := c.List(context.Background(), &list, listOptions) - if err != nil { - return fmt.Errorf("error listing objects: %w", err) + // Skip reporting for this resource if we aren't synced up. + if !statViews[crdInfo].HasSyncedOnce() { + logger.Info("CRs have not yet synced, skipping metric reporting", "gvk", crdInfo.GVK) + continue + } + + // Aggregate stats for each namespace. + nsAggStats := make(map[string]*AggregatedResourceStats) + for i, s := range statViews[crdInfo].Snapshot() { + ns := i.Namespace + nsStats, ok := nsAggStats[ns] + if !ok { + nsStats = NewAggregatedResourceStats() + nsAggStats[ns] = nsStats + } + nsStats.lastConditionCounts[s.lastCondition]++ + } + + // Record stats. + for ns, stats := range nsAggStats { + for condition, count := range stats.lastConditionCounts { + logger.V(2).Info("posting metrics", "namespace", ns, "gvk", crdInfo.GVK.String(), "status", condition, "count", count) + appliedResources.WithLabelValues(ns, crdInfo.GVK.GroupKind().String(), condition).Set(float64(count)) + } + } } - for _, item := range list.Items { - if err := fn(item); err != nil { - return err + + // Cleanup stale watches. + for crdInfo, view := range statViews { + if _, ok := seenCRDs[crdInfo]; !ok { + logger.Info("removing stale watch for resource", "gvk", crdInfo.GVK.String()) + view.Close() + delete(statViews, crdInfo) } } - listOptions.Continue = list.GetContinue() } - return nil } func gvkToCRDName(gvk schema.GroupVersionKind) string { @@ -224,47 +217,33 @@ func gvkToCRDName(gvk schema.GroupVersionKind) string { return pluralLowercaseKind + "." + gvk.Group } -func recordMetricsForGVK(ctx context.Context, c client.Client, gvk schema.GroupVersionKind) error { - logger := klog.FromContext(ctx) +type ResourceStats struct { + lastCondition string +} + +type AggregatedResourceStats struct { + lastConditionCounts map[string]int64 +} - opts := &client.ListOptions{ - Limit: MaximumListResults, - Raw: &v1.ListOptions{}, - } - statsNamespaceMap := make(map[string]*Stats) - if err := forEach(ctx, c, gvk, opts, func(obj unstructured.Unstructured) error { - namespace := obj.GetNamespace() - s := statsNamespaceMap[namespace] - if s == nil { - s = &Stats{make(map[string]int64)} - statsNamespaceMap[namespace] = s - } - lastCondition, err := getTheLastCondition(obj) - if err != nil { - logger.Error(err, "error getting the last condition for metrics", "gvk", gvk.String()) - return nil - } - s.countByStatus[lastCondition]++ - return nil - }); err != nil { - return fmt.Errorf("error listing objects for %v: %w", gvk.String(), err) +func NewAggregatedResourceStats() *AggregatedResourceStats { + return &AggregatedResourceStats{ + lastConditionCounts: make(map[string]int64), } - for ns, stats := range statsNamespaceMap { - for status, count := range stats.countByStatus { - logger.V(2).Info("posting metrics", "namespace", ns, "gvk", gvk.String(), "status", status, "count", count) - appliedResources.WithLabelValues(ns, gvk.GroupKind().String(), status).Set(float64(count)) - } - } - return nil } -type Stats struct { - countByStatus map[string]int64 +func gatherResourceStats(u *unstructured.Unstructured) ResourceStats { + lastCondition, err := getLastCondition(u) + if err != nil { + logger.Error(err, "error getting last condition for CR", "gvk", u.GroupVersionKind(), "name", u.GetName(), "namespace", u.GetNamespace()) + } + return ResourceStats{ + lastCondition: lastCondition, + } } // TODO: consolidate the logic with krmtotf.GetReadyCondition -func getTheLastCondition(obj unstructured.Unstructured) (string, error) { - currConditionsRaw, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions") +func getLastCondition(u *unstructured.Unstructured) (string, error) { + currConditionsRaw, found, err := unstructured.NestedSlice(u.Object, "status", "conditions") if err != nil { return "", err }