Skip to content

Commit

Permalink
Acquisition channels: Functions to calculate channels in clickhouse (#…
Browse files Browse the repository at this point in the history
…4701)

* Expose a few data migration functions, add quiet option to do_run

* Create functions and test acquisition channel logic in clickhouse

Tests were lifted from test/plausible_web/controllers/api/external_controller_test.exs

* Clean up test code a bit

* Property test for acquisition channels

* Handle empty strings properly in reference implementation

* Fix spelling, minor issues

* Revert "Property test for acquisition channels"

This reverts commit 3fa0e0e.

* Only test clickhouse functions

* Solve minor code issue

* update channels logic

* Revert "Only test clickhouse functions"

This reverts commit e127840.

* Add more tests

* Add small result assertion

* Make query options explicit in data migrations

* Move multi-query running logic to within datamigration lib

* Unbreak numeric ids migration

* Named params directly to Clickhouse

* Update reference test implementation

---------

Co-authored-by: Uku Taht <[email protected]>
  • Loading branch information
macobo and ukutaht authored Nov 6, 2024
1 parent c130c2a commit dbf7a09
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 25 deletions.
55 changes: 44 additions & 11 deletions lib/plausible/data_migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@ defmodule Plausible.DataMigration do
@dir dir
@repo repo

def run_sql_confirm(name, options \\ []) do
{prompt_options, assigns} =
Keyword.split(options, [:prompt_message, :prompt_default_choice])

def run_sql_confirm(name, assigns \\ [], options \\ []) do
query = unwrap_with_io(name, assigns)
message = prompt_options[:prompt_message] || "Execute?"
default_choice = Keyword.get(prompt_options, :prompt_default_choice, :yes)
message = Keyword.get(options, :prompt_message, "Execute?")
default_choice = Keyword.get(options, :prompt_default_choice, :yes)
confirm(message, fn -> do_run(name, query) end, default_choice)
end

Expand Down Expand Up @@ -52,7 +49,7 @@ defmodule Plausible.DataMigration do
end
end

defp unwrap(name, assigns) do
def unwrap(name, assigns \\ []) do
:plausible
|> :code.priv_dir()
|> Path.join("data_migrations")
Expand All @@ -62,16 +59,52 @@ defmodule Plausible.DataMigration do
|> EEx.eval_file(assigns: assigns)
end

@doc """
Runs a single SQL query in a file.
Valid options:
- `quiet` - reduces output from running the SQL
- `params` - List of query parameters.
- `query_options` - passed to Repo.query
"""
def run_sql(name, assigns \\ [], options \\ []) do
query = unwrap(name, assigns)

do_run(name, query, options)
end

defp do_run(name, query, options \\ []) do
case @repo.query(query, [], [timeout: :infinity] ++ options) do
@doc """
Runs multiple SQL queries from a single file.
Note that each query must be separated by semicolons.
"""
def run_sql_multi(name, assigns \\ [], options \\ []) do
unwrap(name, assigns)
|> String.trim()
|> String.split(";", trim: true)
|> Enum.with_index(1)
|> Enum.reduce_while(:ok, fn {query, index}, _ ->
case do_run("name-#{index}", query, options) do
{:ok, _} -> {:cont, :ok}
error -> {:halt, error}
end
end)
end

def do_run(name, query, options \\ []) do
params = Keyword.get(options, :params, [])
query_options = Keyword.get(options, :query_options, [])

case @repo.query(query, params, [timeout: :infinity] ++ query_options) do
{:ok, res} ->
IO.puts(" #{IO.ANSI.yellow()}#{name} #{IO.ANSI.green()}Done!#{IO.ANSI.reset()}\n")
IO.puts(String.duplicate("-", 78))
if not Keyword.get(options, :quiet, false) do
IO.puts(
" #{IO.ANSI.yellow()}#{name} #{IO.ANSI.green()}Done!#{IO.ANSI.reset()}\n"
)

IO.puts(String.duplicate("-", 78))
end

{:ok, res}

result ->
Expand Down
40 changes: 40 additions & 0 deletions lib/plausible/data_migration/acquisition_channel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Plausible.DataMigration.AcquisitionChannel do
@moduledoc """
Creates functions to calculate acquisition channel in ClickHouse
SQL files available at: priv/data_migrations/AcquisitionChannel/sql
"""
use Plausible.DataMigration, dir: "AcquisitionChannel", repo: Plausible.IngestRepo

def run(opts \\ []) do
source_categories =
Plausible.Ingestion.Acquisition.source_categories()
|> invert_map()

on_cluster_statement = Plausible.MigrationUtils.on_cluster_statement("sessions_v2")

run_sql_multi(
"acquisition_channel_functions",
[
on_cluster_statement: on_cluster_statement
],
params: %{
"source_category_shopping" => source_categories["SOURCE_CATEGORY_SHOPPING"],
"source_category_social" => source_categories["SOURCE_CATEGORY_SOCIAL"],
"source_category_video" => source_categories["SOURCE_CATEGORY_VIDEO"],
"source_category_search" => source_categories["SOURCE_CATEGORY_SEARCH"],
"source_category_email" => source_categories["SOURCE_CATEGORY_EMAIL"],
"paid_sources" => Plausible.Ingestion.Source.paid_sources()
},
quiet: Keyword.get(opts, :quiet, false)
)
end

defp invert_map(source_categories) do
source_categories
|> Enum.group_by(
fn {_source, category} -> category end,
fn {source, _category} -> source end
)
end
end
25 changes: 13 additions & 12 deletions lib/plausible/data_migration/numeric_ids.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ defmodule Plausible.DataMigration.NumericIDs do

run_sql_fn =
if interactive? do
&run_sql_confirm/2
&run_sql_confirm/3
else
&run_sql/2
&run_sql/3
end

confirm_fn =
Expand Down Expand Up @@ -101,24 +101,25 @@ defmodule Plausible.DataMigration.NumericIDs do
end

{:ok, _} =
run_sql_fn.("drop-events-v2", [cluster?: cluster?] ++ drop_v2_extra_opts.("events_v2"))
run_sql_fn.("drop-events-v2", [cluster?: cluster?], drop_v2_extra_opts.("events_v2"))

{:ok, _} =
run_sql_fn.("drop-sessions-v2", [cluster?: cluster?] ++ drop_v2_extra_opts.("sessions_v2"))
run_sql_fn.("drop-sessions-v2", [cluster?: cluster?], drop_v2_extra_opts.("sessions_v2"))

{:ok, _} = run_sql_fn.("drop-tmp-events-v2", [])
{:ok, _} = run_sql_fn.("drop-tmp-sessions-v2", [])
{:ok, _} = run_sql_fn.("drop-domains-lookup", [])
{:ok, _} = run_sql_fn.("drop-tmp-events-v2", [], [])
{:ok, _} = run_sql_fn.("drop-tmp-sessions-v2", [], [])
{:ok, _} = run_sql_fn.("drop-domains-lookup", [], [])

{:ok, _} = run_sql_fn.("create-events-v2", table_settings: table_settings, cluster?: cluster?)
{:ok, _} =
run_sql_fn.("create-events-v2", [table_settings: table_settings, cluster?: cluster?], [])

{:ok, _} =
run_sql_fn.("create-sessions-v2", table_settings: table_settings, cluster?: cluster?)
run_sql_fn.("create-sessions-v2", [table_settings: table_settings, cluster?: cluster?], [])

{:ok, _} = run_sql_fn.("create-tmp-events-v2", table_settings: table_settings)
{:ok, _} = run_sql_fn.("create-tmp-sessions-v2", table_settings: table_settings)
{:ok, _} = run_sql_fn.("create-tmp-events-v2", [table_settings: table_settings], [])
{:ok, _} = run_sql_fn.("create-tmp-sessions-v2", [table_settings: table_settings], [])

case run_sql_fn.("create-domains-lookup", table_settings: table_settings) do
case run_sql_fn.("create-domains-lookup", [table_settings: table_settings], []) do
{:ok, _} ->
confirm_fn.("Populate domains-lookup with postgres sites", fn ->
mappings =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Plausible.DataMigration.PopulateEventSessionColumns do
for partition <- partitions do
{:ok, _} =
run_sql("update-table", [cluster?: cluster?, partition: partition],
settings: [allow_nondeterministic_mutations: 1]
query_options: [settings: [allow_nondeterministic_mutations: 1]]
)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/plausible/ingestion/acquisition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ defmodule Plausible.Ingestion.Acquisition do
|> then(&(@custom_source_categories ++ &1))
|> Enum.into(%{})

def source_categories(), do: @source_categories

def get_channel(source, utm_medium, utm_campaign, utm_source, click_id_param) do
get_channel_lowered(
String.downcase(source || ""),
Expand Down
6 changes: 5 additions & 1 deletion lib/plausible/ingestion/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ defmodule Plausible.Ingestion.Source do
end)
end

def paid_sources() do
@paid_sources |> MapSet.to_list()
end

def paid_source?(source) do
MapSet.member?(@paid_sources, source)
end
Expand Down Expand Up @@ -129,7 +133,7 @@ defmodule Plausible.Ingestion.Source do

defp valid_referrer?(_), do: false

defp has_referral?(%Request{referrer: nil}), do: nil
defp has_referral?(%Request{referrer: nil}), do: false

defp has_referral?(%Request{referrer: referrer, uri: uri}) do
referrer_uri = URI.parse(referrer)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
CREATE OR REPLACE FUNCTION acquisition_channel_has_category_shopping <%= @on_cluster_statement %> AS
(referrer_source) ->
has({source_category_shopping:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_has_category_social <%= @on_cluster_statement %> AS
(referrer_source) ->
has({source_category_social:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_has_category_video <%= @on_cluster_statement %> AS
(referrer_source) ->
has({source_category_video:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_has_category_search <%= @on_cluster_statement %> AS
(referrer_source) ->
has({source_category_search:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_has_category_email <%= @on_cluster_statement %> AS
(referrer_source) ->
has({source_category_email:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_paid_utm_source <%= @on_cluster_statement %> AS
(referrer_source) ->
has({paid_sources:Array(String)}, referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_cross_network <%= @on_cluster_statement %> AS
(utm_campaign) ->
position(utm_campaign, 'cross-network') > 0;

CREATE OR REPLACE FUNCTION acquisition_channel_paid_shopping <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_campaign) ->
acquisition_channel_paid_medium(utm_medium) AND
(
acquisition_channel_has_category_shopping(referrer_source)
OR acquisition_channel_shopping_campaign(utm_campaign)
);

CREATE OR REPLACE FUNCTION acquisition_channel_paid_search <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_source, click_id_param) ->
(
acquisition_channel_has_category_search(referrer_source)
AND (
acquisition_channel_paid_medium(utm_medium)
OR acquisition_channel_paid_utm_source(utm_source)
)
) OR (
referrer_source == 'google'
AND click_id_param == 'gclid'
) OR (
referrer_source == 'bing'
AND click_id_param == 'msclkid'
);

CREATE OR REPLACE FUNCTION acquisition_channel_paid_social <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_source) ->
acquisition_channel_has_category_social(referrer_source)
AND (
acquisition_channel_paid_medium(utm_medium)
OR acquisition_channel_paid_utm_source(utm_source)
);

CREATE OR REPLACE FUNCTION acquisition_channel_paid_video <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_source) ->
acquisition_channel_has_category_video(referrer_source)
AND (
acquisition_channel_paid_medium(utm_medium)
OR acquisition_channel_paid_utm_source(utm_source)
);

CREATE OR REPLACE FUNCTION acquisition_channel_display <%= @on_cluster_statement %> AS
(utm_medium) ->
utm_medium IN ('display', 'banner', 'expandable', 'interstitial', 'cpm');

CREATE OR REPLACE FUNCTION acquisition_channel_paid_medium <%= @on_cluster_statement %> AS
(utm_medium) ->
match(utm_medium, '^(.*cp.*|ppc|retargeting|paid.*)$');

CREATE OR REPLACE FUNCTION acquisition_channel_shopping_campaign <%= @on_cluster_statement %> AS
(utm_campaign) ->
match(utm_campaign, '^(.*(([^a-df-z]|^)shop|shopping).*)$');

CREATE OR REPLACE FUNCTION acquisition_channel_organic_shopping <%= @on_cluster_statement %> AS
(referrer_source, utm_campaign) ->
acquisition_channel_has_category_shopping(referrer_source)
OR acquisition_channel_shopping_campaign(utm_campaign);

CREATE OR REPLACE FUNCTION acquisition_channel_organic_social <%= @on_cluster_statement %> AS
(referrer_source, utm_medium) ->
acquisition_channel_has_category_social(referrer_source)
OR utm_medium IN (
'social',
'social-network',
'social-media',
'sm',
'social network',
'social media'
);

CREATE OR REPLACE FUNCTION acquisition_channel_organic_video <%= @on_cluster_statement %> AS
(referrer_source, utm_medium) ->
acquisition_channel_has_category_video(referrer_source) OR position(utm_medium, 'video') > 0;

CREATE OR REPLACE FUNCTION acquisition_channel_email <%= @on_cluster_statement %> AS
(referrer_source, utm_source, utm_medium) ->
acquisition_channel_has_category_email(referrer_source)
OR acquisition_channel_contains_email(utm_source)
OR acquisition_channel_contains_email(utm_medium);

CREATE OR REPLACE FUNCTION acquisition_channel_affiliates <%= @on_cluster_statement %> AS
(utm_medium) ->
utm_medium == 'affiliate';

CREATE OR REPLACE FUNCTION acquisition_channel_audio <%= @on_cluster_statement %> AS
(utm_medium) ->
utm_medium == 'audio';

CREATE OR REPLACE FUNCTION acquisition_channel_sms <%= @on_cluster_statement %> AS
(column) ->
column == 'sms';

CREATE OR REPLACE FUNCTION acquisition_channel_mobile_push_notifications <%= @on_cluster_statement %> AS
(utm_medium, referrer_source) ->
endsWith(utm_medium, 'push') OR
multiSearchAny(utm_medium, ['mobile', 'notification']) OR
referrer_source == 'firebase';

CREATE OR REPLACE FUNCTION acquisition_channel_referral <%= @on_cluster_statement %> AS
(utm_medium, referrer_source) ->
utm_medium IN ('referral', 'app', 'link') OR
not empty(referrer_source);

CREATE OR REPLACE FUNCTION acquisition_channel_contains_email <%= @on_cluster_statement %> AS
(column) ->
match(column, 'e[-_ ]?mail|newsletter');

CREATE OR REPLACE FUNCTION acquisition_channel <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) ->
acquisition_channel_lowered(
lower(referrer_source),
lower(utm_medium),
lower(utm_campaign),
lower(utm_source),
click_id_param
);

CREATE OR REPLACE FUNCTION acquisition_channel_lowered <%= @on_cluster_statement %> AS
(referrer_source, utm_medium, utm_campaign, utm_source, click_id_param) ->
multiIf(
acquisition_channel_cross_network(utm_campaign), 'Cross-network',
acquisition_channel_paid_shopping(referrer_source, utm_medium, utm_campaign), 'Paid Shopping',
acquisition_channel_paid_search(referrer_source, utm_medium, utm_source, click_id_param), 'Paid Search',
acquisition_channel_paid_social(referrer_source, utm_medium, utm_source), 'Paid Social',
acquisition_channel_paid_video(referrer_source, utm_medium, utm_source), 'Paid Video',
acquisition_channel_display(utm_medium), 'Display',
acquisition_channel_paid_medium(utm_medium), 'Paid Other',
acquisition_channel_organic_shopping(referrer_source, utm_campaign), 'Organic Shopping',
acquisition_channel_organic_social(referrer_source, utm_medium), 'Organic Social',
acquisition_channel_organic_video(referrer_source, utm_medium), 'Organic Video',
acquisition_channel_has_category_search(referrer_source), 'Organic Search',
acquisition_channel_email(referrer_source, utm_source, utm_medium), 'Email',
acquisition_channel_affiliates(utm_medium), 'Affiliates',
acquisition_channel_audio(utm_medium), 'Audio',
acquisition_channel_sms(utm_source), 'SMS',
acquisition_channel_sms(utm_medium), 'SMS',
acquisition_channel_mobile_push_notifications(utm_medium, referrer_source), 'Mobile Push Notifications',
acquisition_channel_referral(utm_medium, referrer_source), 'Referral',
'Direct'
);
Loading

0 comments on commit dbf7a09

Please sign in to comment.