Skip to content

Commit

Permalink
Add back reverse migration so tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops committed Nov 11, 2024
1 parent e93c50c commit f1b0280
Showing 1 changed file with 144 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
from __future__ import annotations

from typing import Sequence
from typing import List, Sequence

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation
from snuba.utils.schemas import (
UUID,
Column,
DateTime,
DateTime64,
Float,
Int,
Map,
String,
UInt,
)

unused_tables = {
"eap_spans_local",
Expand All @@ -14,6 +26,102 @@
"spans_str_attrs_2_mv",
}

storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
local_table_name = "eap_spans_local"
dist_table_name = "eap_spans_dist"
num_attr_buckets = 20

columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("service", String(Modifiers(codecs=["ZSTD(1)"]))),
Column("trace_id", UUID()),
Column("span_id", UInt(64)),
Column("parent_span_id", UInt(64, Modifiers(codecs=["ZSTD(1)"]))),
Column("segment_id", UInt(64, Modifiers(codecs=["ZSTD(1)"]))),
Column("segment_name", String(Modifiers(codecs=["ZSTD(1)"]))),
Column("is_segment", UInt(8, Modifiers(codecs=["T64", "ZSTD(1)"]))),
Column("_sort_timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column(
"start_timestamp",
DateTime64(6, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column(
"end_timestamp",
DateTime64(6, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column(
"duration_ms",
UInt(32, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column("exclusive_time_ms", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column(
"retention_days",
UInt(16, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column("name", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_weight", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sign", Int(8, modifiers=Modifiers(codecs=["DoubleDelta"]))),
]
columns.extend(
[
Column(
f"attr_str_{i}",
Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
]
)

columns.extend(
[
Column(
f"attr_num_{i}",
Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
]
)

indices: Sequence[AddIndicesData] = (
[
AddIndicesData(
name="bf_trace_id",
expression="trace_id",
type="bloom_filter",
granularity=1,
)
]
+ [
AddIndicesData(
name=f"bf_attr_str_{i}",
expression=f"mapKeys(attr_str_{i})",
type="bloom_filter",
granularity=1,
)
for i in range(num_attr_buckets)
]
+ [
AddIndicesData(
name=f"bf_attr_str_val_{i}",
expression=f"mapValues(attr_str_{i})",
type="ngrambf_v1(4, 1024, 10, 1)",
granularity=1,
)
for i in range(num_attr_buckets)
]
+ [
AddIndicesData(
name=f"bf_attr_num_{i}",
expression=f"mapKeys(attr_num_{i})",
type="bloom_filter",
granularity=1,
)
for i in range(num_attr_buckets)
]
)


class Migration(migration.ClickhouseNodeMigration):
blocking = False
Expand All @@ -30,4 +138,36 @@ def forwards_ops(self) -> Sequence[SqlOperation]:
]

def backwards_ops(self) -> Sequence[SqlOperation]:
return []
return [
operations.CreateTable(
storage_set=storage_set_name,
table_name=local_table_name,
columns=columns,
engine=table_engines.CollapsingMergeTree(
primary_key="(organization_id, _sort_timestamp, trace_id)",
order_by="(organization_id, _sort_timestamp, trace_id, span_id)",
sign_column="sign",
partition_by="(toMonday(_sort_timestamp))",
settings={"index_granularity": "8192"},
storage_set=storage_set_name,
ttl="_sort_timestamp + toIntervalDay(retention_days)",
),
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=storage_set_name,
table_name=dist_table_name,
columns=columns,
engine=table_engines.Distributed(
local_table_name=local_table_name,
sharding_key="cityHash64(reinterpretAsUInt128(trace_id))", # sharding keys must be at most 64 bits
),
target=OperationTarget.DISTRIBUTED,
),
operations.AddIndices(
storage_set=storage_set_name,
table_name=local_table_name,
indices=indices,
target=OperationTarget.LOCAL,
),
]

0 comments on commit f1b0280

Please sign in to comment.