Skip to content

Commit

Permalink
feat: Watch resources to record metrics instead of List
Browse files Browse the repository at this point in the history
This will help with system load. With this change, K8s API server no
longer needs to collect all resources when we fetch metrics.
  • Loading branch information
jasonvigil committed Dec 27, 2024
1 parent 5978c11 commit 8d9f03d
Showing 1 changed file with 78 additions and 99 deletions.
177 changes: 78 additions & 99 deletions cmd/recorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,16 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
flag "github.com/spf13/pflag"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
)

const (
NumberOfWorkers = 20
MaximumListResults = 50
)

var (
logger = crlog.Log.WithName("setup")
appliedResources = metrics.NewAppliedResourcesCollector()
Expand Down Expand Up @@ -128,12 +121,6 @@ func run(ctx context.Context) error {
return fmt.Errorf("error getting kubernetes configuration: %w", err)
}

// Get a client to talk to the APIServer
kubeClient, err := client.New(restConfig, client.Options{})
if err != nil {
return fmt.Errorf("building kubernetes client: %w", err)
}

restHTTPClient, err := rest.HTTPClientFor(restConfig)
if err != nil {
return fmt.Errorf("building kubernetes http client: %w", err)
Expand All @@ -147,124 +134,116 @@ func run(ctx context.Context) error {
crdGVR := schema.GroupVersionResource{Group: "apiextensions.k8s.io", Version: "v1", Resource: "customresourcedefinitions"}
crdInfos := kube.WatchKube(ctx, kubeTarget, crdGVR, buildCRDInfo)

statViews := make(map[CRDInfo]*kube.KubeView[ResourceStats])

for {
time.Sleep(time.Duration(metricInterval) * time.Second)

// Reset all metrics before updating.
appliedResources.Reset()

// Skip reporting if CRDs aren't synced up.
if !crdInfos.HasSyncedOnce() {
logger.Info("CRDs have not yet synced, skipping metric collection")
logger.Info("CRDs have not yet synced, skipping metric reporting")
continue
}
var gvks []schema.GroupVersionKind

seenCRDs := make(map[CRDInfo]bool)
for _, crdInfo := range crdInfos.Snapshot() {
// Skip non-KCC resources.
if !strings.HasSuffix(crdInfo.GVK.Group, ".cnrm.cloud.google.com") {
continue
}
gvks = append(gvks, crdInfo.GVK)
}

if err := doRecord(ctx, kubeClient, gvks); err != nil {
logger.Error(err, "error recording metrics")
}
}
}
// Skip ignored CRDs.
crdName := gvkToCRDName(crdInfo.GVK)
if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
fmt.Sprintf("please run `kubectl delete crd %s` to "+
"delete the orphaned CRD", crdName),
"crd", crdName)
continue
}

func doRecord(ctx context.Context, c client.Client, gvks []schema.GroupVersionKind) error {
logger := klog.FromContext(ctx)
// Record all KCC CRDs we see, so we can clean up unused watches.
seenCRDs[crdInfo] = true

// reset all metrics in this vector before the new run of recording
appliedResources.Reset()
// worker pool with semaphore
sem := make(chan struct{}, NumberOfWorkers)
for _, gvk := range gvks {
gvk := gvk
sem <- struct{}{}
go func() {
defer func() { <-sem }()
err := recordMetricsForGVK(ctx, c, gvk)
if err != nil {
logger.Error(err, "error recording metrics for CRD", "gvk", gvk.String())
// Register watch for this resource if we haven't already.
if _, ok := statViews[crdInfo]; !ok {
statView := kube.WatchKube(ctx, kubeTarget, crdInfo.GVR, gatherResourceStats)
statViews[crdInfo] = statView
}
}()
}
for i := 0; i < NumberOfWorkers; i++ {
sem <- struct{}{}
}
logger.Info("finished one run of recording resource metrics.")
return nil
}

func forEach(ctx context.Context, c client.Client, gvk schema.GroupVersionKind, listOptions *client.ListOptions, fn func(unstructured.Unstructured) error) error {
logger := klog.FromContext(ctx)
for ok := true; ok; ok = listOptions.Continue != "" {
crdName := gvkToCRDName(gvk)
if _, ok := opk8s.IgnoredCRDList[crdName]; ok {
logger.Error(fmt.Errorf("unexpected CRD %s", crdName),
fmt.Sprintf("please run `kubectl delete crd %s` to "+
"delete the orphaned CRD", crdName),
"crd", crdName)
continue
}
list := unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)
err := c.List(context.Background(), &list, listOptions)
if err != nil {
return fmt.Errorf("error listing objects: %w", err)
// Skip reporting for this resource if we aren't synced up.
if !statViews[crdInfo].HasSyncedOnce() {
logger.Info("CRs have not yet synced, skipping metric reporting", "gvk", crdInfo.GVK)
continue
}

// Aggregate stats for each namespace.
nsAggStats := make(map[string]*AggregatedResourceStats)
for i, s := range statViews[crdInfo].Snapshot() {
ns := i.Namespace
nsStats, ok := nsAggStats[ns]
if !ok {
nsStats = NewAggregatedResourceStats()
nsAggStats[ns] = nsStats
}
nsStats.lastConditionCounts[s.lastCondition]++
}

// Record stats.
for ns, stats := range nsAggStats {
for condition, count := range stats.lastConditionCounts {
logger.V(2).Info("posting metrics", "namespace", ns, "gvk", crdInfo.GVK.String(), "status", condition, "count", count)
appliedResources.WithLabelValues(ns, crdInfo.GVK.GroupKind().String(), condition).Set(float64(count))
}
}
}
for _, item := range list.Items {
if err := fn(item); err != nil {
return err

// Cleanup stale watches.
for crdInfo, view := range statViews {
if _, ok := seenCRDs[crdInfo]; !ok {
logger.Info("removing stale watch for resource", "gvk", crdInfo.GVK.String())
view.Close()
delete(statViews, crdInfo)
}
}
listOptions.Continue = list.GetContinue()
}
return nil
}

func gvkToCRDName(gvk schema.GroupVersionKind) string {
pluralLowercaseKind := strings.ToLower(gvk.Kind) + "s"
return pluralLowercaseKind + "." + gvk.Group
}

func recordMetricsForGVK(ctx context.Context, c client.Client, gvk schema.GroupVersionKind) error {
logger := klog.FromContext(ctx)
type ResourceStats struct {
lastCondition string
}

type AggregatedResourceStats struct {
lastConditionCounts map[string]int64
}

opts := &client.ListOptions{
Limit: MaximumListResults,
Raw: &v1.ListOptions{},
}
statsNamespaceMap := make(map[string]*Stats)
if err := forEach(ctx, c, gvk, opts, func(obj unstructured.Unstructured) error {
namespace := obj.GetNamespace()
s := statsNamespaceMap[namespace]
if s == nil {
s = &Stats{make(map[string]int64)}
statsNamespaceMap[namespace] = s
}
lastCondition, err := getTheLastCondition(obj)
if err != nil {
logger.Error(err, "error getting the last condition for metrics", "gvk", gvk.String())
return nil
}
s.countByStatus[lastCondition]++
return nil
}); err != nil {
return fmt.Errorf("error listing objects for %v: %w", gvk.String(), err)
func NewAggregatedResourceStats() *AggregatedResourceStats {
return &AggregatedResourceStats{
lastConditionCounts: make(map[string]int64),
}
for ns, stats := range statsNamespaceMap {
for status, count := range stats.countByStatus {
logger.V(2).Info("posting metrics", "namespace", ns, "gvk", gvk.String(), "status", status, "count", count)
appliedResources.WithLabelValues(ns, gvk.GroupKind().String(), status).Set(float64(count))
}
}
return nil
}

type Stats struct {
countByStatus map[string]int64
func gatherResourceStats(u *unstructured.Unstructured) ResourceStats {
lastCondition, err := getLastCondition(u)
if err != nil {
logger.Error(err, "error getting last condition for CR", "gvk", u.GroupVersionKind(), "name", u.GetName(), "namespace", u.GetNamespace())
}
return ResourceStats{
lastCondition: lastCondition,
}
}

// TODO: consolidate the logic with krmtotf.GetReadyCondition
func getTheLastCondition(obj unstructured.Unstructured) (string, error) {
currConditionsRaw, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
func getLastCondition(u *unstructured.Unstructured) (string, error) {
currConditionsRaw, found, err := unstructured.NestedSlice(u.Object, "status", "conditions")
if err != nil {
return "", err
}
Expand Down

0 comments on commit 8d9f03d

Please sign in to comment.