-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
merge.go
181 lines (166 loc) · 4.94 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
stderrors "errors"
"sync"
"time"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/v2/z"
)
// MergeOperator represents a Badger merge operator.
type MergeOperator struct {
sync.RWMutex
f MergeFunc
db *DB
key []byte
closer *z.Closer
}
// MergeFunc accepts two byte slices, one representing an existing value, and
// another representing a new value that needs to be ‘merged’ into it. MergeFunc
// contains the logic to perform the ‘merge’ and return an updated value.
// MergeFunc could perform operations like integer addition, list appends etc.
// Note that the ordering of the operands is maintained.
type MergeFunc func(existingVal, newVal []byte) []byte
// GetMergeOperator creates a new MergeOperator for a given key and returns a
// pointer to it. It also fires off a goroutine that performs a compaction using
// the merge function that runs periodically, as specified by dur.
func (db *DB) GetMergeOperator(key []byte,
f MergeFunc, dur time.Duration) *MergeOperator {
op := &MergeOperator{
f: f,
db: db,
key: key,
closer: z.NewCloser(1),
}
go op.runCompactions(dur)
return op
}
var errNoMerge = stderrors.New("No need for merge")
func (op *MergeOperator) iterateAndMerge() (newVal []byte, latest uint64, err error) {
txn := op.db.NewTransaction(false)
defer txn.Discard()
opt := DefaultIteratorOptions
opt.AllVersions = true
it := txn.NewKeyIterator(op.key, opt)
defer it.Close()
var numVersions int
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
if item.IsDeletedOrExpired() {
break
}
numVersions++
if numVersions == 1 {
// This should be the newVal, considering this is the latest version.
newVal, err = item.ValueCopy(newVal)
if err != nil {
return nil, 0, err
}
latest = item.Version()
} else {
if err := item.Value(func(oldVal []byte) error {
// The merge should always be on the newVal considering it has the merge result of
// the latest version. The value read should be the oldVal.
newVal = op.f(oldVal, newVal)
return nil
}); err != nil {
return nil, 0, err
}
}
if item.DiscardEarlierVersions() {
break
}
}
if numVersions == 0 {
return nil, latest, ErrKeyNotFound
} else if numVersions == 1 {
return newVal, latest, errNoMerge
}
return newVal, latest, nil
}
func (op *MergeOperator) compact() error {
op.Lock()
defer op.Unlock()
val, version, err := op.iterateAndMerge()
if err == ErrKeyNotFound || err == errNoMerge {
return nil
} else if err != nil {
return err
}
entries := []*Entry{
{
Key: y.KeyWithTs(op.key, version),
Value: val,
meta: bitDiscardEarlierVersions,
},
}
// Write value back to the DB. It is important that we do not set the bitMergeEntry bit
// here. When compaction happens, all the older merged entries will be removed.
return op.db.batchSetAsync(entries, func(err error) {
if err != nil {
op.db.opt.Errorf("failed to insert the result of merge compaction: %s", err)
}
})
}
func (op *MergeOperator) runCompactions(dur time.Duration) {
ticker := time.NewTicker(dur)
defer op.closer.Done()
var stop bool
for {
select {
case <-op.closer.HasBeenClosed():
stop = true
case <-ticker.C: // wait for tick
}
if err := op.compact(); err != nil {
op.db.opt.Errorf("failure while running merge operation: %s", err)
}
if stop {
ticker.Stop()
break
}
}
}
// Add records a value in Badger which will eventually be merged by a background
// routine into the values that were recorded by previous invocations to Add().
func (op *MergeOperator) Add(val []byte) error {
return op.db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry(op.key, val).withMergeBit())
})
}
// Get returns the latest value for the merge operator, which is derived by
// applying the merge function to all the values added so far.
//
// If Add has not been called even once, Get will return ErrKeyNotFound.
func (op *MergeOperator) Get() ([]byte, error) {
op.RLock()
defer op.RUnlock()
var existing []byte
err := op.db.View(func(txn *Txn) (err error) {
existing, _, err = op.iterateAndMerge()
return err
})
if err == errNoMerge {
return existing, nil
}
return existing, err
}
// Stop waits for any pending merge to complete and then stops the background
// goroutine.
func (op *MergeOperator) Stop() {
op.closer.SignalAndWait()
}