-
Notifications
You must be signed in to change notification settings - Fork 637
/
watch.go
102 lines (87 loc) · 1.85 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package rosedb
import (
"sync"
"time"
)
type WatchActionType = byte
const (
WatchActionPut WatchActionType = iota
WatchActionDelete
)
// Event is the event that occurs when the database is modified.
// It is used to synchronize the watch of the database.
type Event struct {
Action WatchActionType
Key []byte
Value []byte
BatchId uint64
}
// Watcher temporarily stores event information,
// as it is generated until it is synchronized to DB's watch.
//
// If the event is overflow, It will remove the oldest data,
// even if event hasn't been read yet.
type Watcher struct {
queue eventQueue
mu sync.RWMutex
}
func NewWatcher(capacity uint64) *Watcher {
return &Watcher{
queue: eventQueue{
Events: make([]*Event, capacity),
Capacity: capacity,
},
}
}
func (w *Watcher) putEvent(e *Event) {
w.mu.Lock()
w.queue.push(e)
if w.queue.isFull() {
w.queue.frontTakeAStep()
}
w.mu.Unlock()
}
// getEvent if queue is empty, it will return nil.
func (w *Watcher) getEvent() *Event {
w.mu.RLock()
defer w.mu.RUnlock()
if w.queue.isEmpty() {
return nil
}
return w.queue.pop()
}
// sendEvent send events to DB's watch
func (w *Watcher) sendEvent(c chan *Event) {
for {
event := w.getEvent()
if event == nil {
time.Sleep(100 * time.Millisecond)
continue
}
c <- event
}
}
type eventQueue struct {
Events []*Event
Capacity uint64
Front uint64 // read point
Back uint64 // write point
}
func (eq *eventQueue) push(e *Event) {
eq.Events[eq.Back] = e
eq.Back = (eq.Back + 1) % eq.Capacity
}
func (eq *eventQueue) pop() *Event {
e := eq.Events[eq.Front]
eq.frontTakeAStep()
return e
}
func (eq *eventQueue) isFull() bool {
return (eq.Back+1)%eq.Capacity == eq.Front
}
func (eq *eventQueue) isEmpty() bool {
return eq.Back == eq.Front
}
func (eq *eventQueue) frontTakeAStep() {
eq.Front = (eq.Front + 1) % eq.Capacity
}