-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
compute: persistent worker command channel #31523
base: main
Are you sure you want to change the base?
Conversation
2523011
to
75f7aae
Compare
/// | ||
/// The [`Worker`] expects a pair of persistent channels, with punctuation marking reconnects, | ||
/// while the [`ClusterClient`] provides a new pair of channels on each reconnect. | ||
fn spawn_channel_adapter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered making this a wrapper around client_rx
instead to transform the command/response stream without spawning additional threads, similar to how the GenericClients
work. But the thread version ends up much simpler to implement, so I stuck with that.
If we are worried with the overhead of having one additional thread per worker (no clue if we should be), we could make this a tokio task instead.
75f7aae
to
8d4647f
Compare
This commit makes the compute worker reuse its broadcast command channel across client reconnects, rather than creating a new one for each connection. Doing so avoids bugs caused by workers rendering the channel dataflow with different operator IDs upon reconnecting multiple times. This commit also includes a small refactor pulling the command channel dataflow into its own module and cleaning up its implementation a bit. Note that in the current state compute workers are not able to recognize reconnects and thus to handle those correctly. This will be fixed in the next commit.
Now that compute's internal command channel isn't recreated for each client connection anymore, we need something else to distinguish client connections and know when a reconciliation must be performed. To this end, the `ClusterClient` is modified to pass through `CreateTimely` commands to the workers. The channel adapter is modified to extract for each new connection its epoch from the `CreateTimely` command and uses it to tag commands sent through the command channel. The compute worker learns to monitor for epoch changes and initiate a reconciliation when it observes one. Additionally, the compute worker is modified to tag responses with the current epoch and the channel adapter learns to filter out responses with outdated epochs, which were intended for previous client connections.
8d4647f
to
9a18e4b
Compare
This PR makes the compute workers reuse their broadcast command channels across client reconnects, rather than creating a new ones for each connection. Doing so avoids bugs caused by workers rendering the channel dataflow with different operator IDs upon reconnecting multiple times.
The change only applies to the compute cluster, so the
ClusterClient
, which is shared between both compute and storage, still retains its previous behavior of using a new set of channels for each client connection. To interface between theClusterClient
and the compute workers, a "channel adapter" is introduced to bridge between the two.Now that the command channel isn't recreated for each client connection anymore, channel disconnects can't be used to detect client disconnects anymore. Instead, the
CreateTimely
command is used as punctuation. TheClusterClient
is modified to pass throughCreateTimely
commands to the workers. The compute worker is modified to monitor the command stream forCreateTimely
commands and initiate a reconciliation when it observes one. Additionally, the epoch is also used to tag responses internally, so they can be discarded if they were not meant for the current connection.Now that the command channel isn't recreated for each client connection anymore, channel disconnects can't be used to detect client disconnects anymore. Instead, the
CreateTimely
command is used as punctuation. TheClusterClient
is modified to pass throughCreateTimely
commands to the workers. The channel adapter is modified to extract for each new connection its epoch from theCreateTimely
command and use it to tag commands sent through the command channel. The compute worker learns to monitor for epoch changes and initiate a reconciliation when it observes one. Likewise, the compute worker is modified to tag responses with the current epoch and the channel adapter learns to filter out responses with outdated epochs, which were intended for previous client connections.Motivation
Fixes https://github.com/MaterializeInc/database-issues/issues/8964
Also connected to https://github.com/MaterializeInc/incidents-and-escalations/issues/174
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.