diff --git a/lib/plausible/data_migration/acquisition_channel.ex b/lib/plausible/data_migration/acquisition_channel.ex index 48992140cae3..6366aaeb37ff 100644 --- a/lib/plausible/data_migration/acquisition_channel.ex +++ b/lib/plausible/data_migration/acquisition_channel.ex @@ -8,12 +8,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do def run(opts \\ []) do on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2") + # In distributed environments, wait for insert to all temporary tables. + insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2") run_sql_multi( "acquisition_channel_functions", [ on_cluster_statement: on_cluster_statement, - dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params() + dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(), + insert_quorum: insert_quorum ], params: %{ "source_categories" => diff --git a/lib/plausible/ingest_repo.ex b/lib/plausible/ingest_repo.ex index 703082e1ce20..79c825647251 100644 --- a/lib/plausible/ingest_repo.ex +++ b/lib/plausible/ingest_repo.ex @@ -16,9 +16,13 @@ defmodule Plausible.IngestRepo do end def clustered_table?(table) do - case query("SELECT 1 FROM system.replicas WHERE table = '#{table}'") do - {:ok, %{rows: []}} -> false - {:ok, _} -> true - end + replica_count(table) > 1 + end + + def replica_count(table) do + {:ok, %{rows: [[count]]}} = + query("SELECT sum(active_replicas) FROM system.replicas WHERE table = '#{table}'") + + count end end diff --git a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex index 48250998b641..f324edf4defe 100644 --- a/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex +++ b/priv/data_migrations/AcquisitionChannel/sql/acquisition_channel_functions.sql.eex @@ -1,12 +1,24 @@ -CREATE OR REPLACE TABLE acquisition_channel_source_category(referrer_source String, category LowCardinality(String)) +CREATE TABLE IF NOT EXISTS acquisition_channel_source_category <%= @on_cluster_statement %> -Engine = MergeTree() -ORDER BY referrer_source -AS +( + referrer_source String, + category LowCardinality(String) +) +<%= if @on_cluster_statement != "" do %> +ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_source_category', '{replica}') +<% else %> +ENGINE = MergeTree() +<% end %> +ORDER BY referrer_source; + +TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2; + +INSERT INTO acquisition_channel_source_category(referrer_source, category) SELECT t.1 AS referrer_source, t.2 AS category FROM ( SELECT arrayJoin({source_categories:Array(Tuple(String, String))}) AS t -); +) +SETTINGS insert_quorum = <%= @insert_quorum %>; CREATE OR REPLACE DICTIONARY acquisition_channel_source_category_dict <%= @on_cluster_statement %> @@ -19,12 +31,23 @@ SOURCE(CLICKHOUSE(TABLE acquisition_channel_source_category <%= @dictionary_conn LIFETIME(0) LAYOUT(hashed()); -CREATE OR REPLACE TABLE acquisition_channel_paid_sources(referrer_source String) +CREATE TABLE IF NOT EXISTS acquisition_channel_paid_sources <%= @on_cluster_statement %> -Engine = MergeTree() -ORDER BY referrer_source -AS -SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source; +( + referrer_source String +) +<%= if @on_cluster_statement != "" do %> +ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{shard}/plausible_prod/acquisition_channel_paid_sources', '{replica}') +<% else %> +ENGINE = MergeTree() +<% end %> +ORDER BY referrer_source; + +TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2; + +INSERT INTO acquisition_channel_paid_sources(referrer_source) +SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source +SETTINGS insert_quorum = <%= @insert_quorum %>; CREATE OR REPLACE DICTIONARY acquisition_channel_paid_sources_dict <%= @on_cluster_statement %>