Skip to content

Commit

Permalink
Channels: Fix ON CLUSTER behavior (#4801)
Browse files Browse the repository at this point in the history
* Channels: Fix cluster behavior

CREATE TABLE AS SELECT syntax did not work on cluster.

Instead, let's do a normal insert. For safety and to avoid timing
issues, ensure that INSERT waits for data to be inserted on all active
replicas.

* Proper replicated tables
  • Loading branch information
macobo authored Nov 11, 2024
1 parent b22b357 commit 3759db9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 15 deletions.
5 changes: 4 additions & 1 deletion lib/plausible/data_migration/acquisition_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do

def run(opts \\ []) do
on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2")
# In distributed environments, wait for insert to all temporary tables.
insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2")

run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params()
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
],
params: %{
"source_categories" =>
Expand Down
12 changes: 8 additions & 4 deletions lib/plausible/ingest_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ defmodule Plausible.IngestRepo do
end

def clustered_table?(table) do
case query("SELECT 1 FROM system.replicas WHERE table = '#{table}'") do
{:ok, %{rows: []}} -> false
{:ok, _} -> true
end
replica_count(table) > 1
end

def replica_count(table) do
{:ok, %{rows: [[count]]}} =
query("SELECT sum(active_replicas) FROM system.replicas WHERE table = '#{table}'")

count
end
end
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
CREATE OR REPLACE TABLE acquisition_channel_source_category(referrer_source String, category LowCardinality(String))
CREATE TABLE IF NOT EXISTS acquisition_channel_source_category
<%= @on_cluster_statement %>
Engine = MergeTree()
ORDER BY referrer_source
AS
(
referrer_source String,
category LowCardinality(String)
)
<%= if @on_cluster_statement != "" do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_source_category', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;

TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2;

INSERT INTO acquisition_channel_source_category(referrer_source, category)
SELECT t.1 AS referrer_source, t.2 AS category
FROM (
SELECT arrayJoin({source_categories:Array(Tuple(String, String))}) AS t
);
)
SETTINGS insert_quorum = <%= @insert_quorum %>;

CREATE OR REPLACE DICTIONARY acquisition_channel_source_category_dict
<%= @on_cluster_statement %>
Expand All @@ -19,12 +31,23 @@ SOURCE(CLICKHOUSE(TABLE acquisition_channel_source_category <%= @dictionary_conn
LIFETIME(0)
LAYOUT(hashed());

CREATE OR REPLACE TABLE acquisition_channel_paid_sources(referrer_source String)
CREATE TABLE IF NOT EXISTS acquisition_channel_paid_sources
<%= @on_cluster_statement %>
Engine = MergeTree()
ORDER BY referrer_source
AS
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source;
(
referrer_source String
)
<%= if @on_cluster_statement != "" do %>
ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_paid_sources', '{replica}')
<% else %>
ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;

TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2;

INSERT INTO acquisition_channel_paid_sources(referrer_source)
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source
SETTINGS insert_quorum = <%= @insert_quorum %>;

CREATE OR REPLACE DICTIONARY acquisition_channel_paid_sources_dict
<%= @on_cluster_statement %>
Expand Down

0 comments on commit 3759db9

Please sign in to comment.