diff --git a/backup.go b/backup.go new file mode 100644 index 0000000..bfd2eee --- /dev/null +++ b/backup.go @@ -0,0 +1,80 @@ +package pogreb + +import ( + "io" + "os" + + "github.com/akrylysov/pogreb/fs" +) + +func touchFile(fsys fs.FileSystem, path string) error { + f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640)) + if err != nil { + return err + } + return f.Close() +} + +// Backup creates a database backup at the specified path. +func (db *DB) Backup(path string) error { + // Make sure the compaction is not running during backup. + db.maintenanceMu.Lock() + defer db.maintenanceMu.Unlock() + + if err := db.opts.rootFS.MkdirAll(path, 0755); err != nil { + return err + } + + db.mu.RLock() + var segments []*segment + activeSegmentSizes := make(map[uint16]int64) + for _, seg := range db.datalog.segmentsBySequenceID() { + segments = append(segments, seg) + if !seg.meta.Full { + // Save the size of the active segments to copy only the data persisted up to the point + // of when the backup started. + activeSegmentSizes[seg.id] = seg.size + } + } + db.mu.RUnlock() + + srcFS := db.opts.FileSystem + dstFS := fs.Sub(db.opts.rootFS, path) + + for _, seg := range segments { + name := segmentName(seg.id, seg.sequenceID) + mode := os.FileMode(0640) + srcFile, err := srcFS.OpenFile(name, os.O_RDONLY, mode) + if err != nil { + return err + } + + dstFile, err := dstFS.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, mode) + if err != nil { + return err + } + + if srcSize, ok := activeSegmentSizes[seg.id]; ok { + if _, err := io.CopyN(dstFile, srcFile, srcSize); err != nil { + return err + } + } else { + if _, err := io.Copy(dstFile, srcFile); err != nil { + return err + } + } + + if err := srcFile.Close(); err != nil { + return err + } + if err := dstFile.Close(); err != nil { + return err + } + } + + if err := touchFile(dstFS, lockName); err != nil { + return err + } + + return nil +} diff --git a/backup_test.go b/backup_test.go new file mode 100644 index 0000000..56f703b --- /dev/null +++ b/backup_test.go @@ -0,0 +1,63 @@ +package pogreb + +import ( + "testing" + + "github.com/akrylysov/pogreb/internal/assert" +) + +const testDBBackupName = testDBName + ".backup" + +func TestBackup(t *testing.T) { + opts := &Options{ + maxSegmentSize: 1024, + compactionMinSegmentSize: 520, + compactionMinFragmentation: 0.02, + } + + run := func(name string, f func(t *testing.T, db *DB)) bool { + return t.Run(name, func(t *testing.T) { + db, err := createTestDB(opts) + assert.Nil(t, err) + f(t, db) + assert.Nil(t, db.Close()) + _ = cleanDir(testDBBackupName) + }) + } + + run("empty", func(t *testing.T, db *DB) { + assert.Nil(t, db.Backup(testDBBackupName)) + db2, err := Open(testDBBackupName, opts) + assert.Nil(t, err) + assert.Nil(t, db2.Close()) + }) + + run("single segment", func(t *testing.T, db *DB) { + assert.Nil(t, db.Put([]byte{0}, []byte{0})) + assert.Equal(t, 1, countSegments(t, db)) + assert.Nil(t, db.Backup(testDBBackupName)) + db2, err := Open(testDBBackupName, opts) + assert.Nil(t, err) + v, err := db2.Get([]byte{0}) + assert.Equal(t, []byte{0}, v) + assert.Nil(t, err) + assert.Nil(t, db2.Close()) + }) + + run("multiple segments", func(t *testing.T, db *DB) { + for i := byte(0); i < 100; i++ { + assert.Nil(t, db.Put([]byte{i}, []byte{i})) + } + assert.Equal(t, 3, countSegments(t, db)) + assert.Nil(t, db.Backup(testDBBackupName)) + db2, err := Open(testDBBackupName, opts) + assert.Equal(t, 3, countSegments(t, db2)) + assert.Nil(t, err) + for i := byte(0); i < 100; i++ { + v, err := db2.Get([]byte{i}) + assert.Nil(t, err) + assert.Equal(t, []byte{i}, v) + } + assert.Nil(t, db2.Close()) + }) +} diff --git a/compaction.go b/compaction.go index 5823ef3..dddd5e0 100644 --- a/compaction.go +++ b/compaction.go @@ -1,8 +1,6 @@ package pogreb import ( - "sync/atomic" - "github.com/akrylysov/pogreb/internal/errors" ) @@ -135,11 +133,11 @@ func (db *DB) Compact() (CompactionResult, error) { cr := CompactionResult{} // Run only a single compaction at a time. - if !atomic.CompareAndSwapInt32(&db.compactionRunning, 0, 1) { + if !db.maintenanceMu.TryLock() { return cr, errBusy } defer func() { - atomic.StoreInt32(&db.compactionRunning, 0) + db.maintenanceMu.Unlock() }() db.mu.RLock() diff --git a/compaction_test.go b/compaction_test.go index 3a91d48..5d12f89 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -3,6 +3,7 @@ package pogreb import ( "os" "path/filepath" + "runtime" "sync" "sync/atomic" "testing" @@ -224,15 +225,20 @@ func TestCompaction(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) db.mu.Lock() + var goroutineRunning int32 go func() { // The compaction is blocked until we unlock the mutex. defer wg.Done() + atomic.StoreInt32(&goroutineRunning, 1) _, err := db.Compact() assert.Nil(t, err) }() - // Make sure the compaction is running. + // Make sure the compaction goroutine is running. + // It's unlikely, but still possible the compaction goroutine doesn't reach the maintenance + // lock, which makes the test flaky. + runtime.Gosched() assert.CompleteWithin(t, time.Minute, func() bool { - return atomic.LoadInt32(&db.compactionRunning) == 1 + return atomic.LoadInt32(&goroutineRunning) == 1 }) _, err := db.Compact() assert.Equal(t, errBusy, err) diff --git a/db.go b/db.go index 03a9c74..2ca9eb6 100644 --- a/db.go +++ b/db.go @@ -30,17 +30,17 @@ const ( // DB represents the key-value storage. // All DB methods are safe for concurrent use by multiple goroutines. type DB struct { - mu sync.RWMutex // Allows multiple database readers or a single writer. - opts *Options - index *index - datalog *datalog - lock fs.LockFile // Prevents opening multiple instances of the same database. - hashSeed uint32 - metrics *Metrics - syncWrites bool - cancelBgWorker context.CancelFunc - closeWg sync.WaitGroup - compactionRunning int32 // Prevents running compactions concurrently. + mu sync.RWMutex // Allows multiple database readers or a single writer. + opts *Options + index *index + datalog *datalog + lock fs.LockFile // Prevents opening multiple instances of the same database. + hashSeed uint32 + metrics *Metrics + syncWrites bool + cancelBgWorker context.CancelFunc + closeWg sync.WaitGroup + maintenanceMu sync.Mutex // Ensures there only one maintenance task running at a time. } type dbMeta struct { @@ -52,7 +52,7 @@ type dbMeta struct { func Open(path string, opts *Options) (*DB, error) { opts = opts.copyWithDefaults(path) - if err := os.MkdirAll(path, 0755); err != nil { + if err := opts.rootFS.MkdirAll(path, 0755); err != nil { return nil, err } diff --git a/db_test.go b/db_test.go index bd29b84..831be4e 100644 --- a/db_test.go +++ b/db_test.go @@ -41,19 +41,12 @@ func TestMain(m *testing.M) { fmt.Printf("DEBUG\tFS=%T\n", fsys) os.Exit(exitCode) } + _ = cleanDir(testDBName) + _ = cleanDir(testDBBackupName) } - _ = cleanDir(testDBName) os.Exit(0) } -func touchFile(fsys fs.FileSystem, path string) error { - f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640)) - if err != nil { - return err - } - return f.Close() -} - func appendFile(path string, data []byte) error { f, err := testFS.OpenFile(path, os.O_RDWR, os.FileMode(0640)) if err != nil { @@ -93,7 +86,7 @@ func cleanDir(path string) error { return err } for _, file := range files { - _ = testFS.Remove(filepath.Join(testDBName, file.Name())) + _ = testFS.Remove(filepath.Join(path, file.Name())) } return nil } diff --git a/file_test.go b/file_test.go index 41ce6e1..1009e5e 100644 --- a/file_test.go +++ b/file_test.go @@ -34,6 +34,10 @@ func (fs *errfs) ReadDir(name string) ([]os.DirEntry, error) { return nil, errfileError } +func (fs *errfs) MkdirAll(path string, perm os.FileMode) error { + return errfileError +} + type errfile struct{} var errfileError = errors.New("errfile error") diff --git a/fs/fs.go b/fs/fs.go index 5c5153f..9c62cbe 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -14,6 +14,7 @@ var ( ) // File is the interface compatible with os.File. +// All methods are not thread-safe, except for ReadAt, Slice and Stat. type File interface { io.Closer io.Reader @@ -60,4 +61,7 @@ type FileSystem interface { // CreateLockFile creates a lock file. CreateLockFile(name string, perm os.FileMode) (LockFile, bool, error) + + // MkdirAll creates a directory named path. + MkdirAll(path string, perm os.FileMode) error } diff --git a/fs/mem.go b/fs/mem.go index 9ce08f0..6306900 100644 --- a/fs/mem.go +++ b/fs/mem.go @@ -12,6 +12,7 @@ type memFS struct { } // Mem is a file system backed by memory. +// It should be used for testing only. var Mem FileSystem = &memFS{files: map[string]*memFile{}} func (fs *memFS) OpenFile(name string, flag int, perm os.FileMode) (File, error) { @@ -21,24 +22,33 @@ func (fs *memFS) OpenFile(name string, flag int, perm os.FileMode) (File, error) return nil, errAppendModeNotSupported } f := fs.files[name] - if f == nil || (flag&os.O_TRUNC) != 0 { + if f == nil { + // The file doesn't exist. + if (flag & os.O_CREATE) == 0 { + return nil, os.ErrNotExist + } f = &memFile{ name: name, perm: perm, // Perm is saved to return it in Mode, but don't do anything else with it yet. + refs: 1, } fs.files[name] = f - } else if !f.closed { - return nil, os.ErrExist } else { - f.offset = 0 - f.closed = false + if (flag & os.O_TRUNC) != 0 { + f.size = 0 + f.buf = nil + } + f.refs += 1 } - return f, nil + return &seekableMemFile{memFile: f}, nil } func (fs *memFS) CreateLockFile(name string, perm os.FileMode) (LockFile, bool, error) { - _, exists := fs.files[name] - _, err := fs.OpenFile(name, 0, perm) + f, exists := fs.files[name] + if f != nil && f.refs > 0 { + return nil, false, os.ErrExist + } + _, err := fs.OpenFile(name, os.O_CREATE, perm) if err != nil { return nil, false, err } @@ -81,20 +91,25 @@ func (fs *memFS) ReadDir(dir string) ([]os.DirEntry, error) { return entries, nil } +func (fs *memFS) MkdirAll(path string, perm os.FileMode) error { + // FIXME: the implementation is incomplete. + // memFS lets create a file even when the parent directory doesn't exist. + return nil +} + type memFile struct { - name string - perm os.FileMode - buf []byte - size int64 - offset int64 - closed bool + name string + perm os.FileMode + buf []byte + size int64 + refs int } func (f *memFile) Close() error { - if f.closed { + if f.refs == 0 { return os.ErrClosed } - f.closed = true + f.refs -= 1 return nil } @@ -106,7 +121,7 @@ func (f *memFile) Unlock() error { } func (f *memFile) ReadAt(p []byte, off int64) (int, error) { - if f.closed { + if f.refs == 0 { return 0, os.ErrClosed } if off >= f.size { @@ -121,17 +136,8 @@ func (f *memFile) ReadAt(p []byte, off int64) (int, error) { return int(n), nil } -func (f *memFile) Read(p []byte) (int, error) { - n, err := f.ReadAt(p, f.offset) - if err != nil { - return n, err - } - f.offset += int64(n) - return n, err -} - func (f *memFile) WriteAt(p []byte, off int64) (int, error) { - if f.closed { + if f.refs == 0 { return 0, os.ErrClosed } n := int64(len(p)) @@ -142,39 +148,15 @@ func (f *memFile) WriteAt(p []byte, off int64) (int, error) { return int(n), nil } -func (f *memFile) Write(p []byte) (int, error) { - n, err := f.WriteAt(p, f.offset) - if err != nil { - return n, err - } - f.offset += int64(n) - return n, err -} - -func (f *memFile) Seek(offset int64, whence int) (int64, error) { - if f.closed { - return 0, os.ErrClosed - } - switch whence { - case io.SeekEnd: - f.offset = f.size + offset - case io.SeekStart: - f.offset = offset - case io.SeekCurrent: - f.offset += offset - } - return f.offset, nil -} - func (f *memFile) Stat() (os.FileInfo, error) { - if f.closed { + if f.refs == 0 { return f, os.ErrClosed } return f, nil } func (f *memFile) Sync() error { - if f.closed { + if f.refs == 0 { return os.ErrClosed } return nil @@ -191,7 +173,7 @@ func (f *memFile) truncate(size int64) { } func (f *memFile) Truncate(size int64) error { - if f.closed { + if f.refs == 0 { return os.ErrClosed } f.truncate(size) @@ -232,7 +214,7 @@ func (f *memFile) Info() (os.FileInfo, error) { } func (f *memFile) Slice(start int64, end int64) ([]byte, error) { - if f.closed { + if f.refs == 0 { return nil, os.ErrClosed } if end > f.size { @@ -240,3 +222,41 @@ func (f *memFile) Slice(start int64, end int64) ([]byte, error) { } return f.buf[start:end], nil } + +type seekableMemFile struct { + *memFile + offset int64 +} + +func (f *seekableMemFile) Read(p []byte) (int, error) { + n, err := f.ReadAt(p, f.offset) + if err != nil { + return n, err + } + f.offset += int64(n) + return n, err +} + +func (f *seekableMemFile) Write(p []byte) (int, error) { + n, err := f.WriteAt(p, f.offset) + if err != nil { + return n, err + } + f.offset += int64(n) + return n, err +} + +func (f *seekableMemFile) Seek(offset int64, whence int) (int64, error) { + if f.refs == 0 { + return 0, os.ErrClosed + } + switch whence { + case io.SeekEnd: + f.offset = f.size + offset + case io.SeekStart: + f.offset = offset + case io.SeekCurrent: + f.offset += offset + } + return f.offset, nil +} diff --git a/fs/os.go b/fs/os.go index e778abc..04272fd 100644 --- a/fs/os.go +++ b/fs/os.go @@ -37,6 +37,10 @@ func (fs *osFS) ReadDir(name string) ([]os.DirEntry, error) { return os.ReadDir(name) } +func (fs *osFS) MkdirAll(path string, perm os.FileMode) error { + return os.MkdirAll(path, perm) +} + type osFile struct { *os.File } diff --git a/fs/sub.go b/fs/sub.go index 678653c..3508554 100644 --- a/fs/sub.go +++ b/fs/sub.go @@ -49,4 +49,9 @@ func (fs *subFS) CreateLockFile(name string, perm os.FileMode) (LockFile, bool, return fs.fsys.CreateLockFile(subName, perm) } +func (fs *subFS) MkdirAll(path string, perm os.FileMode) error { + subPath := filepath.Join(fs.root, path) + return fs.fsys.MkdirAll(subPath, perm) +} + var _ FileSystem = &subFS{} diff --git a/options.go b/options.go index dc37332..78e7138 100644 --- a/options.go +++ b/options.go @@ -26,6 +26,7 @@ type Options struct { // // Default: fs.OSMMap. FileSystem fs.FileSystem + rootFS fs.FileSystem maxSegmentSize uint32 compactionMinSegmentSize uint32 @@ -40,6 +41,7 @@ func (src *Options) copyWithDefaults(path string) *Options { if opts.FileSystem == nil { opts.FileSystem = fs.OSMMap } + opts.rootFS = opts.FileSystem opts.FileSystem = fs.Sub(opts.FileSystem, path) if opts.maxSegmentSize == 0 { opts.maxSegmentSize = math.MaxUint32