Skip to content

Commit

Permalink
Backup (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrylysov authored Jan 16, 2025
1 parent f07a773 commit 76e9512
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 83 deletions.
80 changes: 80 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pogreb

import (
"io"
"os"

"github.com/akrylysov/pogreb/fs"
)

func touchFile(fsys fs.FileSystem, path string) error {
f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640))
if err != nil {
return err
}
return f.Close()
}

// Backup creates a database backup at the specified path.
func (db *DB) Backup(path string) error {
// Make sure the compaction is not running during backup.
db.maintenanceMu.Lock()
defer db.maintenanceMu.Unlock()

if err := db.opts.rootFS.MkdirAll(path, 0755); err != nil {
return err
}

db.mu.RLock()
var segments []*segment
activeSegmentSizes := make(map[uint16]int64)
for _, seg := range db.datalog.segmentsBySequenceID() {
segments = append(segments, seg)
if !seg.meta.Full {
// Save the size of the active segments to copy only the data persisted up to the point
// of when the backup started.
activeSegmentSizes[seg.id] = seg.size
}
}
db.mu.RUnlock()

srcFS := db.opts.FileSystem
dstFS := fs.Sub(db.opts.rootFS, path)

for _, seg := range segments {
name := segmentName(seg.id, seg.sequenceID)
mode := os.FileMode(0640)
srcFile, err := srcFS.OpenFile(name, os.O_RDONLY, mode)
if err != nil {
return err
}

dstFile, err := dstFS.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, mode)
if err != nil {
return err
}

if srcSize, ok := activeSegmentSizes[seg.id]; ok {
if _, err := io.CopyN(dstFile, srcFile, srcSize); err != nil {
return err
}
} else {
if _, err := io.Copy(dstFile, srcFile); err != nil {
return err
}
}

if err := srcFile.Close(); err != nil {
return err
}
if err := dstFile.Close(); err != nil {
return err
}
}

if err := touchFile(dstFS, lockName); err != nil {
return err
}

return nil
}
63 changes: 63 additions & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package pogreb

import (
"testing"

"github.com/akrylysov/pogreb/internal/assert"
)

const testDBBackupName = testDBName + ".backup"

func TestBackup(t *testing.T) {
opts := &Options{
maxSegmentSize: 1024,
compactionMinSegmentSize: 520,
compactionMinFragmentation: 0.02,
}

run := func(name string, f func(t *testing.T, db *DB)) bool {
return t.Run(name, func(t *testing.T) {
db, err := createTestDB(opts)
assert.Nil(t, err)
f(t, db)
assert.Nil(t, db.Close())
_ = cleanDir(testDBBackupName)
})
}

run("empty", func(t *testing.T, db *DB) {
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Nil(t, err)
assert.Nil(t, db2.Close())
})

run("single segment", func(t *testing.T, db *DB) {
assert.Nil(t, db.Put([]byte{0}, []byte{0}))
assert.Equal(t, 1, countSegments(t, db))
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Nil(t, err)
v, err := db2.Get([]byte{0})
assert.Equal(t, []byte{0}, v)
assert.Nil(t, err)
assert.Nil(t, db2.Close())
})

run("multiple segments", func(t *testing.T, db *DB) {
for i := byte(0); i < 100; i++ {
assert.Nil(t, db.Put([]byte{i}, []byte{i}))
}
assert.Equal(t, 3, countSegments(t, db))
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Equal(t, 3, countSegments(t, db2))
assert.Nil(t, err)
for i := byte(0); i < 100; i++ {
v, err := db2.Get([]byte{i})
assert.Nil(t, err)
assert.Equal(t, []byte{i}, v)
}
assert.Nil(t, db2.Close())
})
}
6 changes: 2 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pogreb

import (
"sync/atomic"

"github.com/akrylysov/pogreb/internal/errors"
)

Expand Down Expand Up @@ -135,11 +133,11 @@ func (db *DB) Compact() (CompactionResult, error) {
cr := CompactionResult{}

// Run only a single compaction at a time.
if !atomic.CompareAndSwapInt32(&db.compactionRunning, 0, 1) {
if !db.maintenanceMu.TryLock() {
return cr, errBusy
}
defer func() {
atomic.StoreInt32(&db.compactionRunning, 0)
db.maintenanceMu.Unlock()
}()

db.mu.RLock()
Expand Down
10 changes: 8 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pogreb
import (
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -224,15 +225,20 @@ func TestCompaction(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
db.mu.Lock()
var goroutineRunning int32
go func() {
// The compaction is blocked until we unlock the mutex.
defer wg.Done()
atomic.StoreInt32(&goroutineRunning, 1)
_, err := db.Compact()
assert.Nil(t, err)
}()
// Make sure the compaction is running.
// Make sure the compaction goroutine is running.
// It's unlikely, but still possible the compaction goroutine doesn't reach the maintenance
// lock, which makes the test flaky.
runtime.Gosched()
assert.CompleteWithin(t, time.Minute, func() bool {
return atomic.LoadInt32(&db.compactionRunning) == 1
return atomic.LoadInt32(&goroutineRunning) == 1
})
_, err := db.Compact()
assert.Equal(t, errBusy, err)
Expand Down
24 changes: 12 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ const (
// DB represents the key-value storage.
// All DB methods are safe for concurrent use by multiple goroutines.
type DB struct {
mu sync.RWMutex // Allows multiple database readers or a single writer.
opts *Options
index *index
datalog *datalog
lock fs.LockFile // Prevents opening multiple instances of the same database.
hashSeed uint32
metrics *Metrics
syncWrites bool
cancelBgWorker context.CancelFunc
closeWg sync.WaitGroup
compactionRunning int32 // Prevents running compactions concurrently.
mu sync.RWMutex // Allows multiple database readers or a single writer.
opts *Options
index *index
datalog *datalog
lock fs.LockFile // Prevents opening multiple instances of the same database.
hashSeed uint32
metrics *Metrics
syncWrites bool
cancelBgWorker context.CancelFunc
closeWg sync.WaitGroup
maintenanceMu sync.Mutex // Ensures there only one maintenance task running at a time.
}

type dbMeta struct {
Expand All @@ -52,7 +52,7 @@ type dbMeta struct {
func Open(path string, opts *Options) (*DB, error) {
opts = opts.copyWithDefaults(path)

if err := os.MkdirAll(path, 0755); err != nil {
if err := opts.rootFS.MkdirAll(path, 0755); err != nil {
return nil, err
}

Expand Down
13 changes: 3 additions & 10 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,12 @@ func TestMain(m *testing.M) {
fmt.Printf("DEBUG\tFS=%T\n", fsys)
os.Exit(exitCode)
}
_ = cleanDir(testDBName)
_ = cleanDir(testDBBackupName)
}
_ = cleanDir(testDBName)
os.Exit(0)
}

func touchFile(fsys fs.FileSystem, path string) error {
f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640))
if err != nil {
return err
}
return f.Close()
}

func appendFile(path string, data []byte) error {
f, err := testFS.OpenFile(path, os.O_RDWR, os.FileMode(0640))
if err != nil {
Expand Down Expand Up @@ -93,7 +86,7 @@ func cleanDir(path string) error {
return err
}
for _, file := range files {
_ = testFS.Remove(filepath.Join(testDBName, file.Name()))
_ = testFS.Remove(filepath.Join(path, file.Name()))
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (fs *errfs) ReadDir(name string) ([]os.DirEntry, error) {
return nil, errfileError
}

func (fs *errfs) MkdirAll(path string, perm os.FileMode) error {
return errfileError
}

type errfile struct{}

var errfileError = errors.New("errfile error")
Expand Down
4 changes: 4 additions & 0 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
)

// File is the interface compatible with os.File.
// All methods are not thread-safe, except for ReadAt, Slice and Stat.
type File interface {
io.Closer
io.Reader
Expand Down Expand Up @@ -60,4 +61,7 @@ type FileSystem interface {

// CreateLockFile creates a lock file.
CreateLockFile(name string, perm os.FileMode) (LockFile, bool, error)

// MkdirAll creates a directory named path.
MkdirAll(path string, perm os.FileMode) error
}
Loading

0 comments on commit 76e9512

Please sign in to comment.