Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#2853 from jasonvigil/metrics-w…
Browse files Browse the repository at this point in the history
…atchers

feat: Watch resources to record metrics instead of List
  • Loading branch information
google-oss-prow[bot] authored Jan 22, 2025
2 parents 3761d2b + 8d9f03d commit 431347a
Showing 1 changed file with 78 additions and 99 deletions.
177 changes: 78 additions & 99 deletions cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
flag "github.com/spf13/pflag"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
NumberOfWorkers = 20
MaximumListResults = 50
)

var (
logger = crlog.Log.WithName("setup")
appliedResources = metrics.NewAppliedResourcesCollector()
Expand Down Expand Up @@ -128,12 +121,6 @@ func run(ctx context.Context) error {
return fmt.Errorf("error getting kubernetes configuration: %w", err)
}

// Get a client to talk to the APIServer
kubeClient, err := client.New(restConfig, client.Options{})
if err != nil {
return fmt.Errorf("building kubernetes client: %w", err)
}

restHTTPClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return fmt.Errorf("building kubernetes http client: %w", err)
Expand All @@ -147,124 +134,116 @@ func run(ctx context.Context) error {
crdGVR := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
crdInfos := kube.WatchKube(ctx, kubeTarget, crdGVR, buildCRDInfo)

statViews := make(map[CRDInfo]*kube.KubeView[ResourceStats])

for {
time.Sleep(time.Duration(metricInterval) * time.Second)

// Reset all metrics before updating.
appliedResources.Reset()

// Skip reporting if CRDs aren't synced up.
if !crdInfos.HasSyncedOnce() {
logger.Info("CRDs have not yet synced, skipping metric collection")
logger.Info("CRDs have not yet synced, skipping metric reporting")
continue
}
var gvks []schema.GroupVersionKind

seenCRDs := make(map[CRDInfo]bool)
for _, crdInfo := range crdInfos.Snapshot() {
// Skip non-KCC resources.
if !strings.HasSuffix(crdInfo.GVK.Group, ".cnrm.cloud.google.com") {
continue
}
gvks = append(gvks, crdInfo.GVK)
}

if err := doRecord(ctx, kubeClient, gvks); err != nil {
logger.Error(err, "error recording metrics")
}
}
}
// Skip ignored CRDs.
crdName := gvkToCRDName(crdInfo.GVK)
if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
fmt.Sprintf("please run `kubectl delete crd %s` to "+
"delete the orphaned CRD", crdName),
"crd", crdName)
continue
}

func doRecord(ctx context.Context, c client.Client, gvks []schema.GroupVersionKind) error {
logger := klog.FromContext(ctx)
// Record all KCC CRDs we see, so we can clean up unused watches.
seenCRDs[crdInfo] = true

// reset all metrics in this vector before the new run of recording
appliedResources.Reset()
// worker pool with semaphore
sem := make(chan struct{}, NumberOfWorkers)
for _, gvk := range gvks {
gvk := gvk
sem <- struct{}{}
go func() {
defer func() { <-sem }()
err := recordMetricsForGVK(ctx, c, gvk)
if err != nil {
logger.Error(err, "error recording metrics for CRD", "gvk", gvk.String())
// Register watch for this resource if we haven't already.
if _, ok := statViews[crdInfo]; !ok {
statView := kube.WatchKube(ctx, kubeTarget, crdInfo.GVR, gatherResourceStats)
statViews[crdInfo] = statView
}
}()
}
for i := 0; i < NumberOfWorkers; i++ {
sem <- struct{}{}
}
logger.Info("finished one run of recording resource metrics.")
return nil
}

func forEach(ctx context.Context, c client.Client, gvk schema.GroupVersionKind, listOptions *client.ListOptions, fn func(unstructured.Unstructured) error) error {
logger := klog.FromContext(ctx)
for ok := true; ok; ok = listOptions.Continue != "" {
crdName := gvkToCRDName(gvk)
if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
fmt.Sprintf("please run `kubectl delete crd %s` to "+
"delete the orphaned CRD", crdName),
"crd", crdName)
continue
}
list := unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)
err := c.List(context.Background(), &list, listOptions)
if err != nil {
return fmt.Errorf("error listing objects: %w", err)
// Skip reporting for this resource if we aren't synced up.
if !statViews[crdInfo].HasSyncedOnce() {
logger.Info("CRs have not yet synced, skipping metric reporting", "gvk", crdInfo.GVK)
continue
}

// Aggregate stats for each namespace.
nsAggStats := make(map[string]*AggregatedResourceStats)
for i, s := range statViews[crdInfo].Snapshot() {
ns := i.Namespace
nsStats, ok := nsAggStats[ns]
if !ok {
nsStats = NewAggregatedResourceStats()
nsAggStats[ns] = nsStats
}
nsStats.lastConditionCounts[s.lastCondition]++
}

// Record stats.
for ns, stats := range nsAggStats {
for condition, count := range stats.lastConditionCounts {
logger.V(2).Info("posting metrics", "namespace", ns, "gvk", crdInfo.GVK.String(), "status", condition, "count", count)
appliedResources.WithLabelValues(ns, crdInfo.GVK.GroupKind().String(), condition).Set(float64(count))
}
}
}
for _, item := range list.Items {
if err := fn(item); err != nil {
return err

// Cleanup stale watches.
for crdInfo, view := range statViews {
if _, ok := seenCRDs[crdInfo]; !ok {
logger.Info("removing stale watch for resource", "gvk", crdInfo.GVK.String())
view.Close()
delete(statViews, crdInfo)
}
}
listOptions.Continue = list.GetContinue()
}
return nil
}

func gvkToCRDName(gvk schema.GroupVersionKind) string {
pluralLowercaseKind := strings.ToLower(gvk.Kind) + "s"
return pluralLowercaseKind + "." + gvk.Group
}

func recordMetricsForGVK(ctx context.Context, c client.Client, gvk schema.GroupVersionKind) error {
logger := klog.FromContext(ctx)
type ResourceStats struct {
lastCondition string
}

type AggregatedResourceStats struct {
lastConditionCounts map[string]int64
}

opts := &client.ListOptions{
Limit: MaximumListResults,
Raw: &v1.ListOptions{},
}
statsNamespaceMap := make(map[string]*Stats)
if err := forEach(ctx, c, gvk, opts, func(obj unstructured.Unstructured) error {
namespace := obj.GetNamespace()
s := statsNamespaceMap[namespace]
if s == nil {
s = &Stats{make(map[string]int64)}
statsNamespaceMap[namespace] = s
}
lastCondition, err := getTheLastCondition(obj)
if err != nil {
logger.Error(err, "error getting the last condition for metrics", "gvk", gvk.String())
return nil
}
s.countByStatus[lastCondition]++
return nil
}); err != nil {
return fmt.Errorf("error listing objects for %v: %w", gvk.String(), err)
func NewAggregatedResourceStats() *AggregatedResourceStats {
return &AggregatedResourceStats{
lastConditionCounts: make(map[string]int64),
}
for ns, stats := range statsNamespaceMap {
for status, count := range stats.countByStatus {
logger.V(2).Info("posting metrics", "namespace", ns, "gvk", gvk.String(), "status", status, "count", count)
appliedResources.WithLabelValues(ns, gvk.GroupKind().String(), status).Set(float64(count))
}
}
return nil
}

type Stats struct {
countByStatus map[string]int64
func gatherResourceStats(u *unstructured.Unstructured) ResourceStats {
lastCondition, err := getLastCondition(u)
if err != nil {
logger.Error(err, "error getting last condition for CR", "gvk", u.GroupVersionKind(), "name", u.GetName(), "namespace", u.GetNamespace())
}
return ResourceStats{
lastCondition: lastCondition,
}
}

// TODO: consolidate the logic with krmtotf.GetReadyCondition
func getTheLastCondition(obj unstructured.Unstructured) (string, error) {
currConditionsRaw, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
func getLastCondition(u *unstructured.Unstructured) (string, error) {
currConditionsRaw, found, err := unstructured.NestedSlice(u.Object, "status", "conditions")
if err != nil {
return "", err
}
Expand Down

0 comments on commit 431347a

Please sign in to comment.