Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add support for unix socket connection in sync service (#1518) #1560

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference/flagd-cli/flagd_start.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ flagd start [flags]
-d, --socket-path string Flagd socket path. With grpc the service will become available on this address. With http(s) the grpc-gateway proxy will use this address internally.
-s, --sources string JSON representation of an array of SourceConfig objects. This object contains 2 required fields, uri (string) and provider (string). Documentation for this object: https://flagd.dev/reference/sync-configuration/#source-configuration
-g, --sync-port int32 gRPC Sync port (default 8015)
-e, --sync-socket-path string Flagd sync service socket path.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the --sync-socket-path fits to --socket-path but maybe the description could use the word "unix domain socket" in some way to make it clear which socket is meant?

-f, --uri .yaml/.yml/.json Set a sync provider uri to read data from, this can be a filepath, URL (HTTP and gRPC), FeatureFlag custom resource, or GCS or Azure Blob. When flag keys are duplicated across multiple providers the merge priority follows the index of the flag arguments, as such flags from the uri at index 0 take the lowest precedence, with duplicated keys being overwritten by those from the uri at index 1. Please note that if you are using filepath, flagd only supports files with .yaml/.yml/.json extension.
```

Expand Down
2 changes: 1 addition & 1 deletion docs/reference/specifications/providers.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ precedence.
Below are the supported configuration parameters (note that not all apply to both resolver modes):

| Option name | Environment variable name | Explanation | Type & Values | Default | Compatible resolver |
|-----------------------|--------------------------------|------------------------------------------------------------------------|------------------------------|-------------------------------|-------------------------|
| --------------------- | ------------------------------ | ---------------------------------------------------------------------- | ---------------------------- | ----------------------------- | ----------------------- |
| resolver | FLAGD_RESOLVER | mode of operation | String - `rpc`, `in-process` | rpc | rpc & in-process |
| host | FLAGD_HOST | remote host | String | localhost | rpc & in-process |
| port | FLAGD_PORT | remote port | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
Expand Down
41 changes: 25 additions & 16 deletions flagd/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
syncbuilder "github.com/open-feature/flagd/core/pkg/sync/builder"
"github.com/open-feature/flagd/flagd/pkg/runtime"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -33,6 +34,7 @@ const (
socketPathFlagName = "socket-path"
sourcesFlagName = "sources"
syncPortFlagName = "sync-port"
syncSocketPathFlagName = "sync-socket-path"
uriFlagName = "uri"
contextValueFlagName = "context-value"
)
Expand All @@ -51,6 +53,7 @@ func init() {
flags.StringP(socketPathFlagName, "d", "", "Flagd socket path. "+
"With grpc the service will become available on this address. "+
"With http(s) the grpc-gateway proxy will use this address internally.")
flags.StringP(syncSocketPathFlagName, "e", "", "Flagd sync service socket path.")
flags.StringP(serverCertPathFlagName, "c", "", "Server side tls certificate path")
flags.StringP(serverKeyPathFlagName, "k", "", "Server side tls key path")
flags.StringSliceP(
Expand Down Expand Up @@ -81,6 +84,10 @@ func init() {
flags.StringToStringP(contextValueFlagName, "X", map[string]string{}, "add arbitrary key value pairs "+
"to the flag evaluation context")

bindFlags(flags)
}

func bindFlags(flags *pflag.FlagSet) {
_ = viper.BindPFlag(corsFlagName, flags.Lookup(corsFlagName))
_ = viper.BindPFlag(logFormatFlagName, flags.Lookup(logFormatFlagName))
_ = viper.BindPFlag(metricsExporter, flags.Lookup(metricsExporter))
Expand All @@ -96,6 +103,7 @@ func init() {
_ = viper.BindPFlag(sourcesFlagName, flags.Lookup(sourcesFlagName))
_ = viper.BindPFlag(uriFlagName, flags.Lookup(uriFlagName))
_ = viper.BindPFlag(syncPortFlagName, flags.Lookup(syncPortFlagName))
_ = viper.BindPFlag(syncSocketPathFlagName, flags.Lookup(syncSocketPathFlagName))
_ = viper.BindPFlag(ofrepPortFlagName, flags.Lookup(ofrepPortFlagName))
_ = viper.BindPFlag(contextValueFlagName, flags.Lookup(contextValueFlagName))
}
Expand Down Expand Up @@ -149,22 +157,23 @@ var startCmd = &cobra.Command{

// Build Runtime -----------------------------------------------------------
rt, err := runtime.FromConfig(logger, Version, runtime.Config{
CORS: viper.GetStringSlice(corsFlagName),
MetricExporter: viper.GetString(metricsExporter),
ManagementPort: viper.GetUint16(managementPortFlagName),
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
OtelCollectorURI: viper.GetString(otelCollectorURI),
OtelCertPath: viper.GetString(otelCertPathFlagName),
OtelKeyPath: viper.GetString(otelKeyPathFlagName),
OtelReloadInterval: viper.GetDuration(otelReloadIntervalFlagName),
OtelCAPath: viper.GetString(otelCAPathFlagName),
ServiceCertPath: viper.GetString(serverCertPathFlagName),
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
ServicePort: viper.GetUint16(portFlagName),
ServiceSocketPath: viper.GetString(socketPathFlagName),
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
CORS: viper.GetStringSlice(corsFlagName),
MetricExporter: viper.GetString(metricsExporter),
ManagementPort: viper.GetUint16(managementPortFlagName),
OfrepServicePort: viper.GetUint16(ofrepPortFlagName),
OtelCollectorURI: viper.GetString(otelCollectorURI),
OtelCertPath: viper.GetString(otelCertPathFlagName),
OtelKeyPath: viper.GetString(otelKeyPathFlagName),
OtelReloadInterval: viper.GetDuration(otelReloadIntervalFlagName),
OtelCAPath: viper.GetString(otelCAPathFlagName),
ServiceCertPath: viper.GetString(serverCertPathFlagName),
ServiceKeyPath: viper.GetString(serverKeyPathFlagName),
ServicePort: viper.GetUint16(portFlagName),
ServiceSocketPath: viper.GetString(socketPathFlagName),
SyncServicePort: viper.GetUint16(syncPortFlagName),
SyncServiceSocketPath: viper.GetString(syncSocketPathFlagName),
SyncProviders: syncProviders,
ContextValues: contextValuesToMap,
})
if err != nil {
rtLogger.Fatal(err.Error())
Expand Down
2 changes: 1 addition & 1 deletion flagd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/rs/cors v1.11.1
github.com/rs/xid v1.6.0
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.34.0
Expand Down Expand Up @@ -129,7 +130,6 @@ require (
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/twmb/murmur3 v1.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
14 changes: 0 additions & 14 deletions flagd/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ buf.build/gen/go/open-feature/flagd/connectrpc/go v1.18.1-20250127221518-be6d114
buf.build/gen/go/open-feature/flagd/connectrpc/go v1.18.1-20250127221518-be6d1143b690.1/go.mod h1:BSerK2QrH0wQdgiFP6fKevMB4QXotvyXUfmE6mExwHY=
buf.build/gen/go/open-feature/flagd/grpc/go v1.5.1-20250127221518-be6d1143b690.2 h1:D3HI5RQbqgffyf+Z77+hReDx5kigFVAKGvttULD9/ms=
buf.build/gen/go/open-feature/flagd/grpc/go v1.5.1-20250127221518-be6d1143b690.2/go.mod h1:b9rfG6rbGXZAlLwQwedvZ0kI0nUcR+aLaYF70pj920E=
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.36.4-20250127221518-be6d1143b690.1 h1:0vXmOkGv8nO5H1W8TkSz+GtZhvD2LNXiQaiucEio6vk=
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.36.4-20250127221518-be6d1143b690.1/go.mod h1:wemFLfCpuNfhrBQ7NwzbtYxbg+IihAYqJcNeS+fLpLI=
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.36.5-20250127221518-be6d1143b690.1 h1:eZKupK8gUTuc6zifAFQon8Gnt44fR4cd0GnTWjELvEw=
buf.build/gen/go/open-feature/flagd/protocolbuffers/go v1.36.5-20250127221518-be6d1143b690.1/go.mod h1:wJvVIADHM0IaBc5sYf8wgMMgSHi0nAtc6rgr5rfizhA=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down Expand Up @@ -247,14 +245,8 @@ github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/open-feature/flagd-schemas v0.2.9-0.20250106184836-37baa2cdea48 h1:qtUAqQxMT252ZE2T/ESExiSOgx+enrOnhPBUMIerKNs=
github.com/open-feature/flagd-schemas v0.2.9-0.20250106184836-37baa2cdea48/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd-schemas v0.2.9-0.20250127221449-bb763438abc5 h1:0RKCLYeQpvSsKR95kc894tm8GAZmq7bcG48v0KJ0HCs=
github.com/open-feature/flagd-schemas v0.2.9-0.20250127221449-bb763438abc5/go.mod h1:WKtwo1eW9/K6D+4HfgTXWBqCDzpvMhDa5eRxW7R5B2U=
github.com/open-feature/flagd/core v0.10.8 h1:JDspNtQ/AspLgtoC4UVsk8j+iCxY4yo4Ro51qdbYzEE=
github.com/open-feature/flagd/core v0.10.8/go.mod h1:8o7NXJdknnbseuK2HmfKYlqpIqvtwK8SZ+0Tg9/H4ac=
github.com/open-feature/flagd/core v0.11.0 h1:uyT2keRVqVptqoNOaK4Pq3LueLkcVIZ/qfCnN2mdKiw=
github.com/open-feature/flagd/core v0.11.0/go.mod h1:jjMq0sMdCvvsoi4dWoNwUTP4jN0vC1e8OtY/giqZIIM=
github.com/open-feature/flagd/core v0.11.1 h1:0qBVXcRBZOFoZ5lNK/Yba2IyUDdxUHcLsv5OhUJtltA=
github.com/open-feature/flagd/core v0.11.1/go.mod h1:yzPjp7D9wNusvOyKt8wBND5PQGslcu+5e+xmaIBGgLE=
github.com/open-feature/open-feature-operator/apis v0.2.44 h1:0r4Z+RnJltuHdRBv79NFgAckhna6/M3Wcec6gzNX5vI=
Expand Down Expand Up @@ -382,8 +374,6 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc=
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -426,8 +416,6 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -518,8 +506,6 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM=
google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
28 changes: 15 additions & 13 deletions flagd/pkg/runtime/from_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ const svcName = "flagd"

// Config is the configuration structure derived from startup arguments.
type Config struct {
MetricExporter string
ManagementPort uint16
OfrepServicePort uint16
OtelCollectorURI string
OtelCertPath string
OtelKeyPath string
OtelCAPath string
OtelReloadInterval time.Duration
ServiceCertPath string
ServiceKeyPath string
ServicePort uint16
ServiceSocketPath string
SyncServicePort uint16
MetricExporter string
ManagementPort uint16
OfrepServicePort uint16
OtelCollectorURI string
OtelCertPath string
OtelKeyPath string
OtelCAPath string
OtelReloadInterval time.Duration
ServiceCertPath string
ServiceKeyPath string
ServicePort uint16
ServiceSocketPath string
SyncServicePort uint16
SyncServiceSocketPath string

SyncProviders []sync.SourceConfig
CORS []string
Expand Down Expand Up @@ -119,6 +120,7 @@ func FromConfig(logger *logger.Logger, version string, config Config) (*Runtime,
ContextValues: config.ContextValues,
KeyPath: config.ServiceKeyPath,
CertPath: config.ServiceCertPath,
SocketPath: config.SyncServiceSocketPath,
})
if err != nil {
return nil, fmt.Errorf("error creating sync service: %w", err)
Expand Down
14 changes: 11 additions & 3 deletions flagd/pkg/service/flag-sync/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type SvcConfigurations struct {
ContextValues map[string]any
CertPath string
KeyPath string
SocketPath string
}

type Service struct {
Expand Down Expand Up @@ -61,6 +62,7 @@ func loadTLSCredentials(certPath string, keyPath string) (credentials.TransportC
}

func NewSyncService(cfg SvcConfigurations) (*Service, error) {
var err error
l := cfg.Logger
mux, err := NewMux(cfg.Store, cfg.Sources)
if err != nil {
Expand All @@ -84,14 +86,20 @@ func NewSyncService(cfg SvcConfigurations) (*Service, error) {
contextValues: cfg.ContextValues,
})

l.Info(fmt.Sprintf("starting flag sync service on port %d", cfg.Port))
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.Port))
var lis net.Listener
if cfg.SocketPath != "" {
l.Info(fmt.Sprintf("starting flag sync service at %s", cfg.SocketPath))
lis, err = net.Listen("unix", cfg.SocketPath)
} else {
l.Info(fmt.Sprintf("starting flag sync service on port %d", cfg.Port))
lis, err = net.Listen("tcp", fmt.Sprintf(":%d", cfg.Port))
}
if err != nil {
return nil, fmt.Errorf("error creating listener: %w", err)
}

return &Service{
listener: listener,
listener: lis,
logger: l,
mux: mux,
server: server,
Expand Down
41 changes: 24 additions & 17 deletions flagd/pkg/service/flag-sync/sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,34 @@ import (

func TestSyncServiceEndToEnd(t *testing.T) {
testCases := []struct {
title string
certPath string
keyPath string
clientCertPath string
socketPath string
tls bool
wantErr bool
}{
{"./test-cert/server-cert.pem", "./test-cert/server-key.pem", "./test-cert/ca-cert.pem", true, false},
{"", "", "", false, false},
{"./lol/not/a/cert", "./test-cert/server-key.pem", "./test-cert/ca-cert.pem", true, true},
{title: "with TLS Connection", certPath: "./test-cert/server-cert.pem", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: false},
{title: "witout TLS Connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "", tls: false, wantErr: false},
{title: "with invalid TLS certificate path", certPath: "./lol/not/a/cert", keyPath: "./test-cert/server-key.pem", clientCertPath: "./test-cert/ca-cert.pem", socketPath: "", tls: true, wantErr: true},
{title: "with unix socket connection", certPath: "", keyPath: "", clientCertPath: "", socketPath: "/tmp/flagd", tls: false, wantErr: false},
}

for _, tc := range testCases {
var testTitle string
if tc.tls {
testTitle = "Testing Sync Service with TLS Connection"
} else {
testTitle = "Testing Sync Service without TLS Connection"
}
t.Run(testTitle, func(t *testing.T) {
t.Run(fmt.Sprintf("Testing Sync Service %s", tc.title), func(t *testing.T) {
// given
port := 18016
store, sources := getSimpleFlagStore()

service, err := NewSyncService(SvcConfigurations{
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: tc.certPath,
KeyPath: tc.keyPath,
Logger: logger.NewLogger(nil, false),
Port: uint16(port),
Sources: sources,
Store: store,
CertPath: tc.certPath,
KeyPath: tc.keyPath,
SocketPath: tc.socketPath,
})

if tc.wantErr {
Expand Down Expand Up @@ -81,7 +79,16 @@ func TestSyncServiceEndToEnd(t *testing.T) {
}
con, err = grpc.Dial(fmt.Sprintf("0.0.0.0:%d", port), grpc.WithTransportCredentials(tlsCredentials))
} else {
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
if tc.socketPath != "" {
con, err = grpc.Dial(
fmt.Sprintf("unix://%s", tc.socketPath),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(2*time.Second),
)
} else {
con, err = grpc.DialContext(ctx, fmt.Sprintf("localhost:%d", port), grpc.WithTransportCredentials(insecure.NewCredentials()))
}
}
if err != nil {
t.Fatal(fmt.Printf("error creating grpc dial ctx: %v", err))
Expand Down
Loading