Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup #71

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pogreb

import (
"io"
"os"

"github.com/akrylysov/pogreb/fs"
)

func touchFile(fsys fs.FileSystem, path string) error {
f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640))
if err != nil {
return err
}
return f.Close()
}

// Backup creates a database backup at the specified path.
func (db *DB) Backup(path string) error {
// Make sure the compaction is not running during backup.
db.maintenanceMu.Lock()
defer db.maintenanceMu.Unlock()

if err := db.opts.rootFS.MkdirAll(path, 0755); err != nil {
return err
}

db.mu.RLock()
var segments []*segment
activeSegmentSizes := make(map[uint16]int64)
for _, seg := range db.datalog.segmentsBySequenceID() {
segments = append(segments, seg)
if !seg.meta.Full {
// Save the size of the active segments to copy only the data persisted up to the point
// of when the backup started.
activeSegmentSizes[seg.id] = seg.size
}
}
db.mu.RUnlock()

srcFS := db.opts.FileSystem
dstFS := fs.Sub(db.opts.rootFS, path)

for _, seg := range segments {
name := segmentName(seg.id, seg.sequenceID)
mode := os.FileMode(0640)
srcFile, err := srcFS.OpenFile(name, os.O_RDONLY, mode)
if err != nil {
return err
}

dstFile, err := dstFS.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, mode)
if err != nil {
return err
}

if srcSize, ok := activeSegmentSizes[seg.id]; ok {
if _, err := io.CopyN(dstFile, srcFile, srcSize); err != nil {
return err
}
} else {
if _, err := io.Copy(dstFile, srcFile); err != nil {
return err
}
}

if err := srcFile.Close(); err != nil {
return err
}
if err := dstFile.Close(); err != nil {
return err
}
}

if err := touchFile(dstFS, lockName); err != nil {
return err
}

return nil
}
63 changes: 63 additions & 0 deletions backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package pogreb

import (
"testing"

"github.com/akrylysov/pogreb/internal/assert"
)

const testDBBackupName = testDBName + ".backup"

func TestBackup(t *testing.T) {
opts := &Options{
maxSegmentSize: 1024,
compactionMinSegmentSize: 520,
compactionMinFragmentation: 0.02,
}

run := func(name string, f func(t *testing.T, db *DB)) bool {
return t.Run(name, func(t *testing.T) {
db, err := createTestDB(opts)
assert.Nil(t, err)
f(t, db)
assert.Nil(t, db.Close())
_ = cleanDir(testDBBackupName)
})
}

run("empty", func(t *testing.T, db *DB) {
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Nil(t, err)
assert.Nil(t, db2.Close())
})

run("single segment", func(t *testing.T, db *DB) {
assert.Nil(t, db.Put([]byte{0}, []byte{0}))
assert.Equal(t, 1, countSegments(t, db))
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Nil(t, err)
v, err := db2.Get([]byte{0})
assert.Equal(t, []byte{0}, v)
assert.Nil(t, err)
assert.Nil(t, db2.Close())
})

run("multiple segments", func(t *testing.T, db *DB) {
for i := byte(0); i < 100; i++ {
assert.Nil(t, db.Put([]byte{i}, []byte{i}))
}
assert.Equal(t, 3, countSegments(t, db))
assert.Nil(t, db.Backup(testDBBackupName))
db2, err := Open(testDBBackupName, opts)
assert.Equal(t, 3, countSegments(t, db2))
assert.Nil(t, err)
for i := byte(0); i < 100; i++ {
v, err := db2.Get([]byte{i})
assert.Nil(t, err)
assert.Equal(t, []byte{i}, v)
}
assert.Nil(t, db2.Close())
})
}
6 changes: 2 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package pogreb

import (
"sync/atomic"

"github.com/akrylysov/pogreb/internal/errors"
)

Expand Down Expand Up @@ -135,11 +133,11 @@ func (db *DB) Compact() (CompactionResult, error) {
cr := CompactionResult{}

// Run only a single compaction at a time.
if !atomic.CompareAndSwapInt32(&db.compactionRunning, 0, 1) {
if !db.maintenanceMu.TryLock() {
return cr, errBusy
}
defer func() {
atomic.StoreInt32(&db.compactionRunning, 0)
db.maintenanceMu.Unlock()
}()

db.mu.RLock()
Expand Down
10 changes: 8 additions & 2 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pogreb
import (
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -224,15 +225,20 @@ func TestCompaction(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)
db.mu.Lock()
var goroutineRunning int32
go func() {
// The compaction is blocked until we unlock the mutex.
defer wg.Done()
atomic.StoreInt32(&goroutineRunning, 1)
_, err := db.Compact()
assert.Nil(t, err)
}()
// Make sure the compaction is running.
// Make sure the compaction goroutine is running.
// It's unlikely, but still possible the compaction goroutine doesn't reach the maintenance
// lock, which makes the test flaky.
runtime.Gosched()
assert.CompleteWithin(t, time.Minute, func() bool {
return atomic.LoadInt32(&db.compactionRunning) == 1
return atomic.LoadInt32(&goroutineRunning) == 1
})
_, err := db.Compact()
assert.Equal(t, errBusy, err)
Expand Down
24 changes: 12 additions & 12 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ const (
// DB represents the key-value storage.
// All DB methods are safe for concurrent use by multiple goroutines.
type DB struct {
mu sync.RWMutex // Allows multiple database readers or a single writer.
opts *Options
index *index
datalog *datalog
lock fs.LockFile // Prevents opening multiple instances of the same database.
hashSeed uint32
metrics *Metrics
syncWrites bool
cancelBgWorker context.CancelFunc
closeWg sync.WaitGroup
compactionRunning int32 // Prevents running compactions concurrently.
mu sync.RWMutex // Allows multiple database readers or a single writer.
opts *Options
index *index
datalog *datalog
lock fs.LockFile // Prevents opening multiple instances of the same database.
hashSeed uint32
metrics *Metrics
syncWrites bool
cancelBgWorker context.CancelFunc
closeWg sync.WaitGroup
maintenanceMu sync.Mutex // Ensures there only one maintenance task running at a time.
}

type dbMeta struct {
Expand All @@ -52,7 +52,7 @@ type dbMeta struct {
func Open(path string, opts *Options) (*DB, error) {
opts = opts.copyWithDefaults(path)

if err := os.MkdirAll(path, 0755); err != nil {
if err := opts.rootFS.MkdirAll(path, 0755); err != nil {
return nil, err
}

Expand Down
13 changes: 3 additions & 10 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,12 @@ func TestMain(m *testing.M) {
fmt.Printf("DEBUG\tFS=%T\n", fsys)
os.Exit(exitCode)
}
_ = cleanDir(testDBName)
_ = cleanDir(testDBBackupName)
}
_ = cleanDir(testDBName)
os.Exit(0)
}

func touchFile(fsys fs.FileSystem, path string) error {
f, err := fsys.OpenFile(path, os.O_CREATE|os.O_TRUNC, os.FileMode(0640))
if err != nil {
return err
}
return f.Close()
}

func appendFile(path string, data []byte) error {
f, err := testFS.OpenFile(path, os.O_RDWR, os.FileMode(0640))
if err != nil {
Expand Down Expand Up @@ -93,7 +86,7 @@ func cleanDir(path string) error {
return err
}
for _, file := range files {
_ = testFS.Remove(filepath.Join(testDBName, file.Name()))
_ = testFS.Remove(filepath.Join(path, file.Name()))
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (fs *errfs) ReadDir(name string) ([]os.DirEntry, error) {
return nil, errfileError
}

func (fs *errfs) MkdirAll(path string, perm os.FileMode) error {
return errfileError
}

type errfile struct{}

var errfileError = errors.New("errfile error")
Expand Down
4 changes: 4 additions & 0 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
)

// File is the interface compatible with os.File.
// All methods are not thread-safe, except for ReadAt, Slice and Stat.
type File interface {
io.Closer
io.Reader
Expand Down Expand Up @@ -60,4 +61,7 @@ type FileSystem interface {

// CreateLockFile creates a lock file.
CreateLockFile(name string, perm os.FileMode) (LockFile, bool, error)

// MkdirAll creates a directory named path.
MkdirAll(path string, perm os.FileMode) error
}
Loading
Loading