-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathstatemodel.go
57 lines (47 loc) · 1.43 KB
/
statemodel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package kinesumer
import (
"fmt"
"time"
)
var buildKeyFn = fmt.Sprintf
const (
shardCacheKeyFmt = "shard_cache#%s" // shard_cache#<app>.
clientKeyFmt = "client#%s" // client#<app>.
checkPointKeyFmt = "check_point#%s#%s" // check_point#<app>#<stream>.
)
// stateShardCache manages shard id list cache.
type stateShardCache struct {
ShardCacheKey string `dynamo:"pk,pk"`
Stream string `dynamo:"sk,sk"`
Shards Shards `dynamo:"shards"`
ShardIDs []string `dynamo:"shard_ids"` // Deprecated.
}
func buildShardCacheKey(app string) string {
return buildKeyFn(shardCacheKeyFmt, app)
}
// stateClient manages consumer client.
type stateClient struct {
ClientKey string `dynamo:"pk,pk"`
ClientID string `dynamo:"sk,sk"`
LastUpdate time.Time `dynamo:"last_update,lsi1"`
}
func buildClientKey(app string) string {
return buildKeyFn(clientKeyFmt, app)
}
// ShardCheckPoint manages a shard check point.
type ShardCheckPoint struct {
Stream string
ShardID string
SequenceNumber string
UpdatedAt time.Time
}
// stateCheckPoint manages record check points.
type stateCheckPoint struct {
StreamKey string `dynamo:"pk,pk"`
ShardID string `dynamo:"sk,sk"`
SequenceNumber string `dynamo:"sequence_number"`
LastUpdate time.Time `dynamo:"last_update"`
}
func buildCheckPointKey(app, stream string) string {
return buildKeyFn(checkPointKeyFmt, app, stream)
}