-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathpartial_msg.go
413 lines (386 loc) · 15.6 KB
/
partial_msg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
package f3
import (
"context"
"fmt"
"time"
"github.com/filecoin-project/go-f3/chainexchange"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"
lru "github.com/hashicorp/golang-lru/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
var _ chainexchange.Listener = (*partialMessageManager)(nil)
type PartialGMessage struct {
*gpbft.GMessage
VoteValueKey gpbft.ECChainKey `cborgen:"maxlen=32"`
}
type partialMessageKey struct {
sender gpbft.ActorID
instant gpbft.Instant
}
type discoveredChain struct {
instance uint64
chain *gpbft.ECChain
}
type partialMessageManager struct {
chainex *chainexchange.PubSubChainExchange
// pmByInstance is a map of instance to a buffer of partial messages that are
// keyed by sender+instance+round+phase.
pmByInstance map[uint64]*lru.Cache[partialMessageKey, *PartiallyValidatedMessage]
// pmkByInstanceByChainKey is used for an auxiliary lookup of all partial
// messages for a given vote value at an instance.
pmkByInstanceByChainKey map[uint64]map[gpbft.ECChainKey][]partialMessageKey
// pendingPartialMessages is a channel of partial messages that are pending to be buffered.
pendingPartialMessages chan *PartiallyValidatedMessage
// pendingDiscoveredChains is a channel of chains discovered by chainexchange
// that are pending to be processed.
pendingDiscoveredChains chan *discoveredChain
// pendingChainBroadcasts is a channel of chains that are pending to be broadcasted.
pendingChainBroadcasts chan chainexchange.Message
// pendingInstanceRemoval is a channel of instances that are pending to be removed.
pendingInstanceRemoval chan uint64
// rebroadcastInterval is the interval at which chains are re-broadcasted.
rebroadcastInterval time.Duration
stop func()
}
func newPartialMessageManager(progress gpbft.Progress, ps *pubsub.PubSub, m *manifest.Manifest) (*partialMessageManager, error) {
pmm := &partialMessageManager{
pmByInstance: make(map[uint64]*lru.Cache[partialMessageKey, *PartiallyValidatedMessage]),
pmkByInstanceByChainKey: make(map[uint64]map[gpbft.ECChainKey][]partialMessageKey),
pendingDiscoveredChains: make(chan *discoveredChain, 100), // TODO: parameterize buffer size.
pendingPartialMessages: make(chan *PartiallyValidatedMessage, 100), // TODO: parameterize buffer size.
pendingChainBroadcasts: make(chan chainexchange.Message, 100), // TODO: parameterize buffer size.
pendingInstanceRemoval: make(chan uint64, 10),
rebroadcastInterval: m.ChainExchange.RebroadcastInterval,
}
var err error
pmm.chainex, err = chainexchange.NewPubSubChainExchange(
chainexchange.WithListener(pmm),
chainexchange.WithProgress(progress),
chainexchange.WithPubSub(ps),
chainexchange.WithMaxChainLength(m.ChainExchange.MaxChainLength),
chainexchange.WithMaxDiscoveredChainsPerInstance(m.ChainExchange.MaxDiscoveredChainsPerInstance),
chainexchange.WithMaxInstanceLookahead(m.ChainExchange.MaxInstanceLookahead),
chainexchange.WithMaxWantedChainsPerInstance(m.ChainExchange.MaxWantedChainsPerInstance),
chainexchange.WithMaxTimestampAge(m.ChainExchange.MaxTimestampAge),
chainexchange.WithSubscriptionBufferSize(m.ChainExchange.SubscriptionBufferSize),
chainexchange.WithTopicName(manifest.ChainExchangeTopicFromNetworkName(m.NetworkName)),
)
if err != nil {
return nil, err
}
return pmm, nil
}
func (pmm *partialMessageManager) Start(ctx context.Context) (<-chan *PartiallyValidatedMessage, error) {
if err := pmm.chainex.Start(ctx); err != nil {
return nil, fmt.Errorf("starting chain exchange: %w", err)
}
completedMessages := make(chan *PartiallyValidatedMessage, 100) // TODO: parameterize buffer size.
ctx, pmm.stop = context.WithCancel(context.Background())
go func() {
defer func() {
close(completedMessages)
log.Debugw("Partial message manager stopped.")
}()
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case discovered, ok := <-pmm.pendingDiscoveredChains:
if !ok {
return
}
partialMessageKeysAtInstance, found := pmm.pmkByInstanceByChainKey[discovered.instance]
if !found {
// There's no known instance with a partial message. Ignore the discovered chain.
// There's also no need to optimistically store them here. Because, chainexchange
// does this with safe caps on max future instances.
continue
}
chainkey := discovered.chain.Key()
partialMessageKeys, found := partialMessageKeysAtInstance[chainkey]
if !found {
// There's no known partial message at the instance for the discovered chain.
// Ignore the discovery for the same reason as above.
continue
}
buffer := pmm.getOrInitPartialMessageBuffer(discovered.instance)
for _, messageKey := range partialMessageKeys {
if pgmsg, found := buffer.Get(messageKey); found {
pgmsg.Vote.Value = discovered.chain
inferJustificationVoteValue(pgmsg.PartialGMessage)
select {
case <-ctx.Done():
return
case completedMessages <- pgmsg:
default:
log.Warnw("Dropped completed message as the gpbft runner is too slow to consume them.", "msg", pgmsg.GMessage)
}
buffer.Remove(messageKey)
metrics.partialMessages.Add(ctx, -1)
}
}
delete(partialMessageKeysAtInstance, chainkey)
case pgmsg, ok := <-pmm.pendingPartialMessages:
if !ok {
return
}
key := partialMessageKey{
sender: pgmsg.Sender,
instant: gpbft.Instant{
ID: pgmsg.Vote.Instance,
Round: pgmsg.Vote.Round,
Phase: pgmsg.Vote.Phase,
},
}
buffer := pmm.getOrInitPartialMessageBuffer(pgmsg.Vote.Instance)
if known, found, _ := buffer.PeekOrAdd(key, pgmsg); !found {
pmkByChainKey := pmm.pmkByInstanceByChainKey[pgmsg.Vote.Instance]
pmkByChainKey[pgmsg.VoteValueKey] = append(pmkByChainKey[pgmsg.VoteValueKey], key)
metrics.partialMessages.Add(ctx, 1)
} else {
// The message is a duplicate. This can happen when a message is re-broadcasted.
// But the vote value key must remain consistent for the same instance, sender,
// round and phase. If it's not, then it's an equivocation.
equivocation := known.VoteValueKey != pgmsg.VoteValueKey
metrics.partialMessageDuplicates.Add(ctx, 1,
metric.WithAttributes(attribute.Bool("equivocation", equivocation)))
}
case instance, ok := <-pmm.pendingInstanceRemoval:
if !ok {
return
}
for i, pmsgs := range pmm.pmByInstance {
if i < instance {
delete(pmm.pmByInstance, i)
metrics.partialMessageInstances.Add(ctx, -1)
metrics.partialMessages.Add(ctx, -int64(pmsgs.Len()))
}
}
for i := range pmm.pmkByInstanceByChainKey {
if i < instance {
delete(pmm.pmkByInstanceByChainKey, i)
}
}
if err := pmm.chainex.RemoveChainsByInstance(ctx, instance); err != nil {
log.Errorw("Failed to remove chains by instance form chainexchange.", "instance", instance, "error", err)
}
}
}
}()
// Use a dedicated goroutine for chain broadcast to avoid any delay in
// broadcasting chains as it can fundamentally affect progress across the system.
go func() {
ticker := time.NewTicker(pmm.rebroadcastInterval)
defer func() {
ticker.Stop()
log.Debugw("Partial message manager rebroadcast stopped.")
}()
var current *chainexchange.Message
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
if current != nil {
current.Timestamp = roundDownToUnixTime(t, pmm.rebroadcastInterval)
if err := pmm.chainex.Broadcast(ctx, *current); err != nil {
log.Debugw("Failed to re-broadcast chain.", "instance", current.Instance, "chain", current.Chain, "error", err)
}
}
case pending, ok := <-pmm.pendingChainBroadcasts:
if !ok {
return
}
switch {
case current == nil, pending.Instance > current.Instance:
// Either there's no prior chain broadcast or a new instance has started.
// Broadcast immediately and reset the timer to tick from now onwards. This is to
// re-align the chain rebroadcast relative to instance start.
current = &pending
current.Timestamp = roundDownToUnixTime(time.Now(), pmm.rebroadcastInterval)
if err := pmm.chainex.Broadcast(ctx, *current); err != nil {
log.Debugw("Failed to immediately re-broadcast chain.", "instance", current.Instance, "chain", current.Chain, "error", err)
}
ticker.Reset(pmm.rebroadcastInterval)
case pending.Instance == current.Instance:
// When the instance is the same as the current instance, only switch if the
// current chain doesn't contain the pending chain as a prefix. Because,
// broadcasting the longer chain offers more information to the network and
// improves the chances of consensus on a longer chain at the price of slightly
// higher bandwidth usage.
if !current.Chain.HasPrefix(pending.Chain) {
current = &pending
}
// TODO: Maybe parameterise this in manifest in case we need to save as much
// bandwidth as possible?
case pending.Instance < current.Instance:
log.Debugw("Dropped chain broadcast message as it's too old.", "messageInstance", pending.Instance, "latestInstance", current.Instance)
continue
}
}
}
}()
return completedMessages, nil
}
func roundDownToUnixTime(t time.Time, interval time.Duration) int64 {
return (t.Unix() / int64(interval)) * int64(interval)
}
func (pmm *partialMessageManager) BroadcastChain(ctx context.Context, instance uint64, chain *gpbft.ECChain) error {
if chain.IsZero() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case pmm.pendingChainBroadcasts <- chainexchange.Message{Instance: instance, Chain: chain}:
default:
// The chain broadcast is too slow. Drop the request and rely on GPBFT
// re-broadcast to request chain broadcast again instead of blocking.
log.Debugw("Dropped chain broadcast as chain rebroadcast is too slow.", "instance", instance, "chain", chain)
}
return nil
}
func (pmm *partialMessageManager) toPartialGMessage(msg *gpbft.GMessage) (*PartialGMessage, error) {
msgCopy := *(msg)
pmsg := &PartialGMessage{
GMessage: &msgCopy,
}
if !pmsg.Vote.Value.IsZero() {
pmsg.VoteValueKey = pmsg.Vote.Value.Key()
pmsg.Vote.Value = &gpbft.ECChain{}
}
if msg.Justification != nil && !pmsg.Justification.Vote.Value.IsZero() {
justificationCopy := *(msg.Justification)
pmsg.Justification = &justificationCopy
// The justification vote value is either zero or the same as the vote value.
// Anything else is considered to be an in invalid justification. In fact, for
// any given phase and round we can always infer:
// 1. whether a message should have a justification, and
// 2. if so, for what chain at which round.
//
// Therefore, we can entirely ignore the justification vote value as far as
// chainexchange is concerned and override it with a zero value. Upon arrival of
// a partial message, the receiver can always infer the justification chain from
// the message phase, round. In a case where justification is invalid, the
// signature won't match anyway, so it seems harmless to always infer the
// justification chain.
//
// In fact, it probably should have been omitted altogether at the time of
// protocol design.
pmsg.Justification.Vote.Value = &gpbft.ECChain{}
}
return pmsg, nil
}
func (pmm *partialMessageManager) NotifyChainDiscovered(ctx context.Context, instance uint64, chain *gpbft.ECChain) {
discovery := &discoveredChain{instance: instance, chain: chain}
select {
case <-ctx.Done():
return
case pmm.pendingDiscoveredChains <- discovery:
default:
// The message completion looks up the key on chain exchange anyway. The net
// effect of this is delay in delivering messages assuming they're re-boradcasted
// by GPBFT. This is probably the best we can do if the partial message manager
// is too slow.
log.Warnw("Dropped chain discovery notification as partial messge manager is too slow.", "instance", instance, "chain", chain)
}
}
func (pmm *partialMessageManager) bufferPartialMessage(ctx context.Context, msg *PartiallyValidatedMessage) {
select {
case <-ctx.Done():
return
case pmm.pendingPartialMessages <- msg:
default:
// Choosing to rely on GPBFT re-boradcast to compensate for a partial message
// being dropped. The key thing here is that partial message manager should never
// be too slow. If it is, then there are bigger problems to solve. Hence, the
// failsafe is to drop the message instead of halting further message processing.
log.Warnw("Dropped partial message as partial message manager is too slow.", "msg", msg)
}
}
func (pmm *partialMessageManager) getOrInitPartialMessageBuffer(instance uint64) *lru.Cache[partialMessageKey, *PartiallyValidatedMessage] {
buffer, found := pmm.pmByInstance[instance]
if !found {
// TODO: parameterize this in the manifest?
// Based on 5 phases, 2K network size at a couple of rounds plus some headroom.
const maxBufferedMessagesPerInstance = 25_000
var err error
buffer, err = lru.New[partialMessageKey, *PartiallyValidatedMessage](maxBufferedMessagesPerInstance)
if err != nil {
log.Fatalf("Failed to create buffer for instance %d: %s", instance, err)
panic(err)
}
pmm.pmByInstance[instance] = buffer
metrics.partialMessageInstances.Add(context.Background(), 1)
}
if _, ok := pmm.pmkByInstanceByChainKey[instance]; !ok {
pmm.pmkByInstanceByChainKey[instance] = make(map[gpbft.ECChainKey][]partialMessageKey)
}
return buffer
}
func (pmm *partialMessageManager) CompleteMessage(ctx context.Context, pgmsg *PartialGMessage) (*gpbft.GMessage, bool) {
if pgmsg == nil {
// For sanity assert that the message isn't nil.
return nil, false
}
if pgmsg.VoteValueKey.IsZero() {
// A zero VoteValueKey indicates that there's no partial chain value, for
// example, COMMIT for bottom. Return the message as is.
return pgmsg.GMessage, true
}
chain, found := pmm.chainex.GetChainByInstance(ctx, pgmsg.Vote.Instance, pgmsg.VoteValueKey)
if !found {
return nil, false
}
pgmsg.Vote.Value = chain
inferJustificationVoteValue(pgmsg)
return pgmsg.GMessage, true
}
func inferJustificationVoteValue(pgmsg *PartialGMessage) {
// Infer what the value of justification should be based on the vote phase. A
// valid message with non-nil justification must justify the vote value chain
// at:
// * CONVERGE_PHASE, with justification of PREPARE.
// * PREPARE_PHASE, with justification of PREPARE.
// * COMMIT_PHASE, with justification of PREPARE.
// * DECIDE_PHASE, with justification of COMMIT.
//
// Future work should get rid of chains in justification entirely. See:
// * https://github.com/filecoin-project/go-f3/issues/806
if pgmsg.Justification != nil {
switch pgmsg.Vote.Phase {
case
gpbft.CONVERGE_PHASE,
gpbft.PREPARE_PHASE,
gpbft.COMMIT_PHASE:
if pgmsg.Justification.Vote.Phase == gpbft.PREPARE_PHASE {
pgmsg.Justification.Vote.Value = pgmsg.Vote.Value
}
case gpbft.DECIDE_PHASE:
if pgmsg.Justification.Vote.Phase == gpbft.COMMIT_PHASE {
pgmsg.Justification.Vote.Value = pgmsg.Vote.Value
}
default:
// The message must be invalid. But let the flow proceed and have the validator
// reject it.
}
}
}
func (pmm *partialMessageManager) RemoveMessagesBeforeInstance(ctx context.Context, instance uint64) {
select {
case <-ctx.Done():
return
case pmm.pendingInstanceRemoval <- instance:
default:
log.Warnw("Dropped instance removal request as partial message manager is too slow.", "instance", instance)
}
}
func (pmm *partialMessageManager) Shutdown(ctx context.Context) error {
if pmm.stop != nil {
pmm.stop()
}
return pmm.chainex.Shutdown(ctx)
}