Skip to content

Commit

Permalink
feat: support yaml in blob, file, and http syncs (#1522)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Beemer <[email protected]>
  • Loading branch information
beeme1mr authored Jan 28, 2025
1 parent 7a06567 commit 76d673a
Show file tree
Hide file tree
Showing 13 changed files with 485 additions and 290 deletions.
133 changes: 2 additions & 131 deletions core/go.sum

Large diffs are not rendered by default.

19 changes: 15 additions & 4 deletions core/pkg/sync/blob/blob_sync.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package blob

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"path/filepath"
"time"

"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"github.com/open-feature/flagd/core/pkg/utils"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob" // needed to initialize Azure Blob Storage driver
_ "gocloud.dev/blob/gcsblob" // needed to initialize GCS driver
Expand Down Expand Up @@ -126,11 +128,20 @@ func (hs *Sync) fetchObjectModificationTime(ctx context.Context, bucket *blob.Bu
}

func (hs *Sync) fetchObject(ctx context.Context, bucket *blob.Bucket) (string, error) {
buf := bytes.NewBuffer(nil)
err := bucket.Download(ctx, hs.Object, buf, nil)
r, err := bucket.NewReader(ctx, hs.Object, nil)
if err != nil {
return "", fmt.Errorf("error opening reader for object %s/%s: %w", hs.Bucket, hs.Object, err)
}
defer r.Close()

data, err := io.ReadAll(r)
if err != nil {
return "", fmt.Errorf("error downloading object %s/%s: %w", hs.Bucket, hs.Object, err)
}

return buf.String(), nil
json, err := utils.ConvertToJSON(data, filepath.Ext(hs.Object), r.ContentType())
if err != nil {
return "", fmt.Errorf("error converting blob data to json: %w", err)
}
return json, nil
}
121 changes: 73 additions & 48 deletions core/pkg/sync/blob/blob_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,57 +12,77 @@ import (
"go.uber.org/mock/gomock"
)

const (
scheme = "xyz"
bucket = "b"
object = "o"
)

func TestSync(t *testing.T) {
ctrl := gomock.NewController(t)
mockCron := synctesting.NewMockCron(ctrl)
mockCron.EXPECT().AddFunc(gomock.Any(), gomock.Any()).DoAndReturn(func(spec string, cmd func()) error {
return nil
})
mockCron.EXPECT().Start().Times(1)

blobSync := &Sync{
Bucket: scheme + "://" + bucket,
Object: object,
Cron: mockCron,
Logger: logger.NewLogger(nil, false),
func TestBlobSync(t *testing.T) {
tests := map[string]struct {
scheme string
bucket string
object string
content string
convertedContent string
}{
"json file type": {
scheme: "xyz",
bucket: "b",
object: "flags.json",
content: "{\"flags\":{}}",
convertedContent: "{\"flags\":{}}",
},
"yaml file type": {
scheme: "xyz",
bucket: "b",
object: "flags.yaml",
content: "flags: []",
convertedContent: "{\"flags\":[]}",
},
}
blobMock := NewMockBlob(scheme, func() *Sync {
return blobSync
})
blobSync.BlobURLMux = blobMock.URLMux()

ctx := context.Background()
dataSyncChan := make(chan sync.DataSync, 1)

config := "my-config"
blobMock.AddObject(object, config)

go func() {
err := blobSync.Sync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
}
}()

data := <-dataSyncChan // initial sync
if data.FlagData != config {
t.Errorf("expected content: %s, but received content: %s", config, data.FlagData)
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
ctrl := gomock.NewController(t)
mockCron := synctesting.NewMockCron(ctrl)
mockCron.EXPECT().AddFunc(gomock.Any(), gomock.Any()).DoAndReturn(func(spec string, cmd func()) error {
return nil
})
mockCron.EXPECT().Start().Times(1)

blobSync := &Sync{
Bucket: tt.scheme + "://" + tt.bucket,
Object: tt.object,
Cron: mockCron,
Logger: logger.NewLogger(nil, false),
}
blobMock := NewMockBlob(tt.scheme, func() *Sync {
return blobSync
})
blobSync.BlobURLMux = blobMock.URLMux()

ctx := context.Background()
dataSyncChan := make(chan sync.DataSync, 1)

blobMock.AddObject(tt.object, tt.content)

go func() {
err := blobSync.Sync(ctx, dataSyncChan)
if err != nil {
log.Fatalf("Error start sync: %s", err.Error())
return
}
}()

data := <-dataSyncChan // initial sync
if data.FlagData != tt.convertedContent {
t.Errorf("expected content: %s, but received content: %s", tt.convertedContent, data.FlagData)
}
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, tt.object, tt.convertedContent)
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, tt.object, tt.convertedContent)
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithoutConfigChange(t, mockCron, dataSyncChan)
})
}
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config")
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithConfigChange(t, mockCron, dataSyncChan, blobMock, "new config 2")
tickWithoutConfigChange(t, mockCron, dataSyncChan)
tickWithoutConfigChange(t, mockCron, dataSyncChan)
}

func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, newConfig string) {
func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSyncChan chan sync.DataSync, blobMock *MockBlob, object string, newConfig string) {
time.Sleep(1 * time.Millisecond) // sleep so the new file has different modification date
blobMock.AddObject(object, newConfig)
mockCron.Tick()
Expand All @@ -73,7 +93,7 @@ func tickWithConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataSync
t.Errorf("expected content: %s, but received content: %s", newConfig, data.FlagData)
}
} else {
t.Errorf("data channel unexpecdly closed")
t.Errorf("data channel unexpectedly closed")
}
default:
t.Errorf("data channel has no expected update")
Expand All @@ -87,13 +107,18 @@ func tickWithoutConfigChange(t *testing.T, mockCron *synctesting.MockCron, dataS
if ok {
t.Errorf("unexpected update: %s", data.FlagData)
} else {
t.Errorf("data channel unexpecdly closed")
t.Errorf("data channel unexpectedly closed")
}
default:
}
}

func TestReSync(t *testing.T) {
const (
scheme = "xyz"
bucket = "b"
object = "flags.json"
)
ctrl := gomock.NewController(t)
mockCron := synctesting.NewMockCron(ctrl)

Expand Down
50 changes: 14 additions & 36 deletions core/pkg/sync/file/filepath_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package file

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"strings"
"path/filepath"
msync "sync"

"github.com/fsnotify/fsnotify"
"github.com/open-feature/flagd/core/pkg/logger"
"github.com/open-feature/flagd/core/pkg/sync"
"gopkg.in/yaml.v3"
"github.com/open-feature/flagd/core/pkg/utils"
)

const (
Expand All @@ -32,8 +32,6 @@ type Watcher interface {
type Sync struct {
URI string
Logger *logger.Logger
// FileType indicates the file type e.g., json, yaml/yml etc.,
fileType string
// watchType indicates how to watch the file FSNOTIFY|FILEINFO
watchType string
watcher Watcher
Expand Down Expand Up @@ -176,42 +174,22 @@ func (fs *Sync) fetch(_ context.Context) (string, error) {
if fs.URI == "" {
return "", errors.New("no filepath string set")
}
if fs.fileType == "" {
uriSplit := strings.Split(fs.URI, ".")
fs.fileType = uriSplit[len(uriSplit)-1]
}
rawFile, err := os.ReadFile(fs.URI)
if err != nil {
return "", fmt.Errorf("error reading file %s: %w", fs.URI, err)
}

switch fs.fileType {
case "yaml", "yml":
return yamlToJSON(rawFile)
case "json":
return string(rawFile), nil
default:
return "", fmt.Errorf("filepath extension for URI: '%s' is not supported", fs.URI)
}
}

// yamlToJSON is a generic helper function to convert
// yaml to json
func yamlToJSON(rawFile []byte) (string, error) {
if len(rawFile) == 0 {
return "", nil
file, err := os.Open(fs.URI)
if err != nil {
return "", fmt.Errorf("error opening file %s: %w", fs.URI, err)
}
defer file.Close()

var ms map[string]interface{}
// yaml.Unmarshal unmarshals to map[interface]interface{}
if err := yaml.Unmarshal(rawFile, &ms); err != nil {
return "", fmt.Errorf("unmarshal yaml: %w", err)
data, err := io.ReadAll(file)
if err != nil {
return "", fmt.Errorf("error reading file %s: %w", fs.URI, err)
}

r, err := json.Marshal(ms)
// File extension is used to determine the content type, so media type is unnecessary
json, err := utils.ConvertToJSON(data, filepath.Ext(fs.URI), "")
if err != nil {
return "", fmt.Errorf("convert yaml to json: %w", err)
return "", fmt.Errorf("error converting file content to json: %w", err)
}

return string(r), err
return json, nil
}
34 changes: 3 additions & 31 deletions core/pkg/sync/file/filepath_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestSimpleSync(t *testing.T) {

func TestFilePathSync_Fetch(t *testing.T) {
successDirName := t.TempDir()
falureDirName := t.TempDir()
failureDirName := t.TempDir()
tests := map[string]struct {
fpSync Sync
handleResponse func(t *testing.T, fetched string, err error)
Expand All @@ -213,9 +213,9 @@ func TestFilePathSync_Fetch(t *testing.T) {
},
},
"not found": {
fetchDirName: falureDirName,
fetchDirName: failureDirName,
fpSync: Sync{
URI: fmt.Sprintf("%s/%s", falureDirName, "not_found"),
URI: fmt.Sprintf("%s/%s", failureDirName, "not_found"),
Logger: logger.NewLogger(nil, false),
},
handleResponse: func(t *testing.T, fetched string, err error) {
Expand Down Expand Up @@ -309,31 +309,3 @@ func writeToFile(t *testing.T, fetchDirName, fileContents string) {
t.Fatal(err)
}
}

func TestFilePathSync_yamlToJSON(t *testing.T) {
tests := map[string]struct {
input []byte
handleResponse func(t *testing.T, output string, err error)
}{
"empty": {
input: []byte(""),
handleResponse: func(t *testing.T, output string, err error) {
if err != nil {
t.Fatalf("expect no err, got err = %v", err)
}

if output != "" {
t.Fatalf("expect output = '', got output = '%v'", output)
}
},
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
output, err := yamlToJSON(tt.input)

tt.handleResponse(t, output, err)
})
}
}
Loading

0 comments on commit 76d673a

Please sign in to comment.