Skip to content

Commit

Permalink
Channels: Migration to add materialized column, backfill code (#4798)
Browse files Browse the repository at this point in the history
* Channels: Migration to add column, backfill code

This change adds `acqusition_channel` columns to events_v2 and
sessions_v2 tables. These columns are materialized - we don't ingest
into them directly. Instead they're calculated based on other columns.

The data migration changes now allow to also backfill the column.

Tested the ability to change definitions by changing the function
definitions and re-running the migration with backfill. Confirmed that
the underlying data changed as expected.

* quiet option

* Exclude data migrations from validation

* Migration consistency
  • Loading branch information
macobo authored Nov 12, 2024
1 parent 3759db9 commit 4aa7dec
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 17 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/migrations-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@ jobs:
id: changes
with:
list-files: json
predicate-quantifier: 'every'
filters: |
lib:
- 'lib/**'
- '!lib/plausible/data_migration/**'
extra:
- 'extra/**'
config:
- 'config/**'
- if: steps.changes.outputs.lib == 'true'
- if: steps.changes.outputs.lib == 'true' || steps.changes.outputs.extra == 'true' || steps.changes.outputs.config == 'true'
run: |
echo "::error file=${{ fromJSON(steps.changes.outputs.lib_files)[0] }}::Code and migrations shouldn't be changed at the same time"
exit 1
89 changes: 75 additions & 14 deletions lib/plausible/data_migration/acquisition_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ defmodule Plausible.DataMigration.AcquisitionChannel do
@moduledoc """
Creates dictionaries and functions to calculate acquisition channel in ClickHouse
Creates `acquisition_channel` columns in `events_v2` and `sessions_v2` tables.
Run via `Plausible.DataMigration.AcquisitionChannel.run(options)`
Options:
- `add_column` - creates the materialized column. Already done in a migration
- `update_column` - Updates the column definition to use new function definitions. Defaults to true.
Note that historical data is only updated if `backfill` is set to true or if it was never materialized.
- `backfill` - backfills the data for the column. Speeds up calculations on historical data.
SQL files available at: priv/data_migrations/AcquisitionChannel/sql
"""
use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo
Expand All @@ -11,19 +20,71 @@ defmodule Plausible.DataMigration.AcquisitionChannel do
# In distributed environments, wait for insert to all temporary tables.
insert_quorum = Plausible.IngestRepo.replica_count("sessions_v2")

run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
],
params: %{
"source_categories" =>
Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(),
"paid_sources" => Plausible.Ingestion.Source.paid_sources()
},
quiet: Keyword.get(opts, :quiet, false)
)
:ok =
run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement,
dictionary_connection_params: Plausible.MigrationUtils.dictionary_connection_params(),
insert_quorum: insert_quorum
],
params: %{
"source_categories" =>
Plausible.Ingestion.Acquisition.source_categories() |> Map.to_list(),
"paid_sources" => Plausible.Ingestion.Source.paid_sources()
},
quiet: Keyword.get(opts, :quiet, false)
)

cond do
Keyword.get(opts, :add_column) ->
alter_data_tables(
"acquisition_channel_add_materialized_column",
on_cluster_statement,
opts
)

Keyword.get(opts, :update_column, true) ->
alter_data_tables(
"acquisition_channel_update_materialized_column",
on_cluster_statement,
opts
)

true ->
nil
end

if Keyword.get(opts, :backfill) do
alter_data_tables(
"acquisition_channel_backfill_materialized_column",
on_cluster_statement,
opts
)
end

:ok
end

defp alter_data_tables(sql_name, on_cluster_statement, opts) do
{:ok, _} =
run_sql(
sql_name,
[
table: "events_v2",
on_cluster_statement: on_cluster_statement
],
quiet: Keyword.get(opts, :quiet, false)
)

{:ok, _} =
run_sql(
sql_name,
[
table: "sessions_v2",
on_cluster_statement: on_cluster_statement
],
quiet: Keyword.get(opts, :quiet, false)
)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE <%= @table %>
<%= @on_cluster_statement %>
ADD COLUMN IF NOT EXISTS acquisition_channel LowCardinality(String)
MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param)
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE <%= @table %>
MATERIALIZE COLUMN acquisition_channel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;

TRUNCATE TABLE acquisition_channel_source_category SETTINGS alter_sync=2;
TRUNCATE TABLE acquisition_channel_source_category <%= @on_cluster_statement %>;

INSERT INTO acquisition_channel_source_category(referrer_source, category)
SELECT t.1 AS referrer_source, t.2 AS category
Expand Down Expand Up @@ -43,7 +43,7 @@ ENGINE = MergeTree()
<% end %>
ORDER BY referrer_source;

TRUNCATE TABLE acquisition_channel_paid_sources SETTINGS alter_sync=2;
TRUNCATE TABLE acquisition_channel_paid_sources <%= @on_cluster_statement %>;

INSERT INTO acquisition_channel_paid_sources(referrer_source)
SELECT arrayJoin({paid_sources:Array(String)}) AS referrer_source
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE <%= @table %>
<%= @on_cluster_statement %>
MODIFY COLUMN acquisition_channel LowCardinality(String)
MATERIALIZED acquisition_channel(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Plausible.IngestRepo.Migrations.CreateAcquisitionChannelColumn do
use Ecto.Migration

def up do
Plausible.DataMigration.AcquisitionChannel.run(add_column: true, backfill: false)
end

def down do
raise "irreversible"
end
end

0 comments on commit 4aa7dec

Please sign in to comment.