Skip to content

Commit

Permalink
Fix to pass CRI test
Browse files Browse the repository at this point in the history
- Add "listen_path" field to the config to specify a custom socket path for the
  CRI image service. Especially fuse manager needs this to locate the image
  service socket.
- Add JSON tag to configuration fields. This is needed to send configuration to
  fuse manager in JSON format.

Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock committed Jan 7, 2025
1 parent 3cdf294 commit 2fe7581
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 80 deletions.
51 changes: 40 additions & 11 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,33 @@ type snapshotterConfig struct {
service.Config

// MetricsAddress is address for the metrics API
MetricsAddress string `toml:"metrics_address"`
MetricsAddress string `toml:"metrics_address" json:"metrics_address"`

// NoPrometheus is a flag to disable the emission of the metrics
NoPrometheus bool `toml:"no_prometheus"`
NoPrometheus bool `toml:"no_prometheus" json:"no_prometheus"`

// DebugAddress is a Unix domain socket address where the snapshotter exposes /debug/ endpoints.
DebugAddress string `toml:"debug_address"`
DebugAddress string `toml:"debug_address" json:"debug_address"`

// IPFS is a flag to enbale lazy pulling from IPFS.
IPFS bool `toml:"ipfs"`
IPFS bool `toml:"ipfs" json:"ipfs"`

// MetadataStore is the type of the metadata store to use.
MetadataStore string `toml:"metadata_store" default:"memory"`
MetadataStore string `toml:"metadata_store" default:"memory" json:"metadata_store"`

// FuseManagerConfig is configuration for fusemanager
FuseManagerConfig `toml:"fusemanager"`
FuseManagerConfig `toml:"fusemanager" json:"fusemanager"`
}

type FuseManagerConfig struct {
// EnableFuseManager is whether detach fusemanager or not
EnableFuseManager bool `toml:"enable_fusemanager" default:"false"`
EnableFuseManager bool `toml:"enable_fusemanager" default:"false" json:"enable_fusemanager"`

// FuseManagerAddress is address for the fusemanager's GRPC server (default: "/run/containerd-stargz-grpc/fuse-manager.sock")
FuseManagerAddress string `toml:"fusemanager_address"`
FuseManagerAddress string `toml:"fusemanager_address" json:"fusemanager_address"`

// FuseManagerPath is path to the fusemanager's executable (default: looking for a binary "stargz-fuse-manager")
FuseManagerPath string `toml:"fusemanager_path"`
FuseManagerPath string `toml:"fusemanager_path" json:"fusemanager_path"`
}

func main() {
Expand Down Expand Up @@ -177,7 +177,7 @@ func main() {
}

fuseManagerConfig := fusemanager.Config{
Config: &config.Config,
Config: config.Config,
IPFS: config.IPFS,
MetadataStore: config.MetadataStore,
DefaultImageServiceAddress: defaultImageServiceAddress,
Expand All @@ -193,10 +193,39 @@ func main() {
}
log.G(ctx).Infof("Start snapshotter with fusemanager mode")
} else {
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, rpc, &keyChainConfig)
crirpc := rpc
// For CRI keychain, if listening path is different from stargz-snapshotter's socket, prepare for the dedicated grpc server and the socket.
serveCRISocket := config.Config.CRIKeychainConfig.EnableKeychain && config.Config.CRIKeychainConfig.ListenPath != "" && config.Config.CRIKeychainConfig.ListenPath != *address
if serveCRISocket {
crirpc = grpc.NewServer()
}
credsFuncs, err := keychainconfig.ConfigKeychain(ctx, crirpc, &keyChainConfig)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to configure keychain")
}
if serveCRISocket {
addr := config.Config.CRIKeychainConfig.ListenPath
// Prepare the directory for the socket
if err := os.MkdirAll(filepath.Dir(addr), 0700); err != nil {
log.G(ctx).WithError(err).Fatalf("failed to create directory %q", filepath.Dir(addr))
}

// Try to remove the socket file to avoid EADDRINUSE
if err := os.RemoveAll(addr); err != nil {
log.G(ctx).WithError(err).Fatalf("failed to remove %q", addr)
}

// Listen and serve
l, err := net.Listen("unix", addr)
if err != nil {
log.G(ctx).WithError(err).Fatalf("error on listen socket %q", addr)
}
go func() {
if err := crirpc.Serve(l); err != nil {
log.G(ctx).WithError(err).Errorf("error on serving CRI via socket %q", addr)
}
}()
}

fsConfig := fsopts.Config{
EnableIpfs: config.IPFS,
Expand Down
41 changes: 40 additions & 1 deletion cmd/stargz-fuse-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@
package main

import (
"fmt"
"net"
"os"
"path/filepath"

"github.com/containerd/log"

"github.com/containerd/stargz-snapshotter/cmd/containerd-stargz-grpc/fsopts"
fusemanager "github.com/containerd/stargz-snapshotter/fusemanager"
"github.com/containerd/stargz-snapshotter/service"
"github.com/containerd/stargz-snapshotter/service/keychain/keychainconfig"
"google.golang.org/grpc"
)

func init() {
Expand All @@ -44,10 +52,41 @@ func init() {
DefaultImageServiceAddress: cc.Config.DefaultImageServiceAddress,
ImageServicePath: cc.Config.Config.CRIKeychainConfig.ImageServicePath,
}
credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.Server, &keyChainConfig)
if cc.Config.Config.CRIKeychainConfig.EnableKeychain && cc.Config.Config.CRIKeychainConfig.ListenPath == "" || cc.Config.Config.CRIKeychainConfig.ListenPath == cc.Address {
return nil, fmt.Errorf("listen path must be specified as a separated socket")
}
// For CRI keychain, if listening path is different from stargz-snapshotter's socket, prepare for the dedicated grpc server and the socket.
serveCRISocket := cc.Config.Config.CRIKeychainConfig.EnableKeychain && cc.Config.Config.CRIKeychainConfig.ListenPath != "" && cc.Config.Config.CRIKeychainConfig.ListenPath != cc.Address
if serveCRISocket {
cc.CRIServer = grpc.NewServer()
}
credsFuncs, err := keychainconfig.ConfigKeychain(cc.Ctx, cc.CRIServer, &keyChainConfig)
if err != nil {
return nil, err
}
if serveCRISocket {
addr := cc.Config.Config.CRIKeychainConfig.ListenPath
// Prepare the directory for the socket
if err := os.MkdirAll(filepath.Dir(addr), 0700); err != nil {
return nil, fmt.Errorf("failed to create directory %q: %w", filepath.Dir(addr), err)
}

// Try to remove the socket file to avoid EADDRINUSE
if err := os.RemoveAll(addr); err != nil {
return nil, fmt.Errorf("failed to remove %q: %w", addr, err)
}

// Listen and serve
l, err := net.Listen("unix", addr)
if err != nil {
return nil, fmt.Errorf("error on listen socket %q: %w", addr, err)
}
go func() {
if err := cc.CRIServer.Serve(l); err != nil {
log.G(cc.Ctx).WithError(err).Errorf("error on serving CRI via socket %q", addr)
}
}()
}
return []service.Option{service.WithCredsFuncs(credsFuncs...)}, nil
})
}
Expand Down
64 changes: 32 additions & 32 deletions fs/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,118 +37,118 @@ const (
type Config struct {
// Type of cache for compressed contents fetched from the registry. "memory" stores them on memory.
// Other values default to cache them on disk.
HTTPCacheType string `toml:"http_cache_type"`
HTTPCacheType string `toml:"http_cache_type" json:"http_cache_type"`

// Type of cache for uncompressed files contents. "memory" stores them on memory. Other values
// default to cache them on disk.
FSCacheType string `toml:"filesystem_cache_type"`
FSCacheType string `toml:"filesystem_cache_type" json:"filesystem_cache_type"`

// ResolveResultEntryTTLSec is TTL (in sec) to cache resolved layers for
// future use. (default 120s)
ResolveResultEntryTTLSec int `toml:"resolve_result_entry_ttl_sec"`
ResolveResultEntryTTLSec int `toml:"resolve_result_entry_ttl_sec" json:"resolve_result_entry_ttl_sec"`

// PrefetchSize is the default size (in bytes) to prefetch when mounting a layer. Default is 0. Stargz-snapshotter still
// uses the value specified by the image using "containerd.io/snapshot/remote/stargz.prefetch" or the landmark file.
PrefetchSize int64 `toml:"prefetch_size"`
PrefetchSize int64 `toml:"prefetch_size" json:"prefetch_size"`

// PrefetchTimeoutSec is the default timeout (in seconds) when the prefetching takes long. Default is 10s.
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec" json:"prefetch_timeout_sec"`

// NoPrefetch disables prefetching. Default is false.
NoPrefetch bool `toml:"noprefetch"`
NoPrefetch bool `toml:"noprefetch" json:"noprefetch"`

// NoBackgroundFetch disables the behaviour of fetching the entire layer contents in background. Default is false.
NoBackgroundFetch bool `toml:"no_background_fetch"`
NoBackgroundFetch bool `toml:"no_background_fetch" json:"no_background_fetch"`

// Debug enables filesystem debug log.
Debug bool `toml:"debug"`
Debug bool `toml:"debug" json:"debug"`

// AllowNoVerification allows mouting images without verification. Default is false.
AllowNoVerification bool `toml:"allow_no_verification"`
AllowNoVerification bool `toml:"allow_no_verification" json:"allow_no_verification"`

// DisableVerification disables verifying layer contents. Default is false.
DisableVerification bool `toml:"disable_verification"`
DisableVerification bool `toml:"disable_verification" json:"disable_verification"`

// MaxConcurrency is max number of concurrent background tasks for fetching layer contents. Default is 2.
MaxConcurrency int64 `toml:"max_concurrency"`
MaxConcurrency int64 `toml:"max_concurrency" json:"max_concurrency"`

// NoPrometheus disables exposing filesystem-related metrics. Default is false.
NoPrometheus bool `toml:"no_prometheus"`
NoPrometheus bool `toml:"no_prometheus" json:"no_prometheus"`

// BlobConfig is config for layer blob management.
BlobConfig `toml:"blob"`
BlobConfig `toml:"blob" json:"blob"`

// DirectoryCacheConfig is config for directory-based cache.
DirectoryCacheConfig `toml:"directory_cache"`
DirectoryCacheConfig `toml:"directory_cache" json:"directory_cache"`

// FuseConfig is configurations for FUSE fs.
FuseConfig `toml:"fuse"`
FuseConfig `toml:"fuse" json:"fuse"`

// ResolveResultEntry is a deprecated field.
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
ResolveResultEntry int `toml:"resolve_result_entry" json:"resolve_result_entry"` // deprecated
}

// BlobConfig is configuration for the logic to fetching blobs.
type BlobConfig struct {
// ValidInterval specifies a duration (in seconds) during which the layer can be reused without
// checking the connection to the registry. Default is 60.
ValidInterval int64 `toml:"valid_interval"`
ValidInterval int64 `toml:"valid_interval" json:"valid_interval"`

// CheckAlways overwrites ValidInterval to 0 if it's true. Default is false.
CheckAlways bool `toml:"check_always"`
CheckAlways bool `toml:"check_always" json:"check_always"`

// ChunkSize is the granularity (in bytes) at which background fetch and on-demand reads
// are fetched from the remote registry. Default is 50000.
ChunkSize int64 `toml:"chunk_size"`
ChunkSize int64 `toml:"chunk_size" json:"chunk_size"`

// FetchTimeoutSec is a timeout duration (in seconds) for fetching chunks from the registry. Default is 300.
FetchTimeoutSec int64 `toml:"fetching_timeout_sec"`
FetchTimeoutSec int64 `toml:"fetching_timeout_sec" json:"fetching_tieout_sec"`

// ForceSingleRangeMode disables using of multiple ranges in a Range Request and always specifies one larger
// region that covers them. Default is false.
ForceSingleRangeMode bool `toml:"force_single_range_mode"`
ForceSingleRangeMode bool `toml:"force_single_range_mode" json:"force_single_range_mode"`

// PrefetchChunkSize is the maximum bytes transferred per http GET from remote registry
// during prefetch. It is recommended to have PrefetchChunkSize > ChunkSize.
// If PrefetchChunkSize < ChunkSize prefetch bytes will be fetched as a single http GET,
// else total GET requests for prefetch = ceil(PrefetchSize / PrefetchChunkSize).
// Default is 0.
PrefetchChunkSize int64 `toml:"prefetch_chunk_size"`
PrefetchChunkSize int64 `toml:"prefetch_chunk_size" json:"prefetch_chunk_size"`

// MaxRetries is a max number of reries of a HTTP request. Default is 5.
MaxRetries int `toml:"max_retries"`
MaxRetries int `toml:"max_retries" json:"max_retries"`

// MinWaitMSec is minimal delay (in seconds) for the next retrying after a request failure. Default is 30.
MinWaitMSec int `toml:"min_wait_msec"`
MinWaitMSec int `toml:"min_wait_msec" json:"min_wait_msec"`

// MinWaitMSec is maximum delay (in seconds) for the next retrying after a request failure. Default is 30.
MaxWaitMSec int `toml:"max_wait_msec"`
MaxWaitMSec int `toml:"max_wait_msec" json:"max_wait_msec"`
}

// DirectoryCacheConfig is configuration for the disk-based cache.
type DirectoryCacheConfig struct {
// MaxLRUCacheEntry is the number of entries of LRU cache to cache data on memory. Default is 10.
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
MaxLRUCacheEntry int `toml:"max_lru_cache_entry" json:"max_lru_cache_entry"`

// MaxCacheFds is the number of entries of LRU cache to hold fds of files of cached contents. Default is 10.
MaxCacheFds int `toml:"max_cache_fds"`
MaxCacheFds int `toml:"max_cache_fds" json:"max_cache_fds"`

// SyncAdd being true means that each adding of data to the cache blocks until the data is fully written to the
// cache directory. Default is false.
SyncAdd bool `toml:"sync_add"`
SyncAdd bool `toml:"sync_add" json:"sync_add"`

// Direct disables on-memory data cache. Default is true for saving memory usage.
Direct bool `toml:"direct" default:"true"`
Direct bool `toml:"direct" default:"true" json:"direct"`
}

// FuseConfig is configuration for FUSE fs.
type FuseConfig struct {
// AttrTimeout defines overall timeout attribute for a file system in seconds.
AttrTimeout int64 `toml:"attr_timeout"`
AttrTimeout int64 `toml:"attr_timeout" json:"attr_timeout"`

// EntryTimeout defines TTL for directory, name lookup in seconds.
EntryTimeout int64 `toml:"entry_timeout"`
EntryTimeout int64 `toml:"entry_timeout" json:"entry_timeout"`

// PassThrough indicates whether to enable FUSE passthrough mode to improve local file read performance. Default is false.
PassThrough bool `toml:"passthrough" default:"false"`
PassThrough bool `toml:"passthrough" default:"false" json:"passthrough"`
}
2 changes: 1 addition & 1 deletion fusemanager/fusemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func runFuseManager(ctx context.Context) error {
}

server := grpc.NewServer()
fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr)
fm, err := NewFuseManager(ctx, l, server, fuseStoreAddr, address)
if err != nil {
return fmt.Errorf("failed to configure manager server: %w", err)
}
Expand Down
9 changes: 5 additions & 4 deletions fusemanager/fusemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type mockServer struct {
initErr error
}

func newMockServer(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr string) (*mockServer, error) {
s, err := NewFuseManager(ctx, listener, server, fuseStoreAddr)
func newMockServer(ctx context.Context, listener net.Listener, server *grpc.Server, fuseStoreAddr, serverAddr string) (*mockServer, error) {
s, err := NewFuseManager(ctx, listener, server, fuseStoreAddr, serverAddr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,6 +121,7 @@ func TestFuseManager(t *testing.T) {

socketPath := filepath.Join(tmpDir, "test.sock")
fuseStorePath := filepath.Join(tmpDir, "fusestore.db")
fuseManagerSocketPath := filepath.Join(tmpDir, "test-fusemanager.sock")

l, err := net.Listen("unix", socketPath)
if err != nil {
Expand All @@ -131,7 +132,7 @@ func TestFuseManager(t *testing.T) {
// Create server with mock
grpcServer := grpc.NewServer()
mockFs := newMockFileSystem(t)
fm, err := newMockServer(context.Background(), l, grpcServer, fuseStorePath)
fm, err := newMockServer(context.Background(), l, grpcServer, fuseStorePath, fuseManagerSocketPath)
if err != nil {
t.Fatalf("failed to create fuse manager: %v", err)
}
Expand Down Expand Up @@ -187,7 +188,7 @@ func TestFuseManager(t *testing.T) {
fm.initCalled = false

config := &Config{
Config: &service.Config{},
Config: service.Config{},
}
client, err := NewManagerClient(context.Background(), tmpDir, socketPath, config)
if err != nil {
Expand Down
Loading

0 comments on commit 2fe7581

Please sign in to comment.