Skip to content

Commit

Permalink
Merge branch 'master' into pierre/eap-spans-cleanup-unused-tables
Browse files Browse the repository at this point in the history
  • Loading branch information
phacops authored Nov 11, 2024
2 parents f1b0280 + 7ea53d8 commit 42dce9d
Show file tree
Hide file tree
Showing 26 changed files with 887 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/admin-sourcemaps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: 3.8
- uses: actions/setup-node@v3
- uses: actions/setup-node@v4
with:
node-version: ${{env.NODE_VERSION}}
- name: Build admin sourcemaps
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- id: 'auth'
uses: google-github-actions/auth@v1
uses: google-github-actions/auth@v2
with:
workload_identity_provider: 'projects/868781662168/locations/global/workloadIdentityPools/prod-github/providers/github-oidc-pool'
service_account: '[email protected]'
Expand Down
1 change: 1 addition & 0 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ services:
ENABLE_ISSUE_OCCURRENCE_CONSUMER: ${ENABLE_ISSUE_OCCURRENCE_CONSUMER:-}
ENABLE_AUTORUN_MIGRATION_SEARCH_ISSUES: 1
ENABLE_GROUP_ATTRIBUTES_CONSUMER: ${ENABLE_GROUP_ATTRIBUTES_CONSUMER:-}
ENABLE_LW_DELETIONS_CONSUMER: ${ENABLE_LW_DELETIONS_CONSUMER:-}
platform: linux/amd64
extra_hosts:
host.docker.internal: host-gateway
Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ googleapis-common-protos==1.63.2
google-api-core==2.19.1
honcho==1.1.0
python-jose[cryptography]==3.3.0
jsonschema==4.16.0
jsonschema==4.23.0
fastjsonschema==2.16.2
packaging==21.3
packaging==24.1
parsimonious==0.10.0
progressbar2==4.0.0
progressbar2==4.2.0
protobuf==5.27.3
proto-plus==1.24.0
pytest==7.1.3
Expand All @@ -44,6 +44,6 @@ Werkzeug==3.0.5
PyYAML==6.0
sqlparse==0.5.0
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
sentry-usage-accountant==0.0.11
freezegun==1.2.2
sentry-protos==0.1.31
2 changes: 1 addition & 1 deletion rust_snuba/src/processors/eap_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl From<FromSpanMessage> for EAPSpan {

// lower precision to compensate floating point errors
res.sampling_factor = (res.sampling_factor * 1e9).round() / 1e9;
res.sampling_weight = (1.0 / res.sampling_factor) as u64;
res.sampling_weight = (1.0 / res.sampling_factor).round() as u64;

if let Some(data) = from.data {
for (k, v) in data {
Expand Down
18 changes: 18 additions & 0 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,24 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
),
]

if settings.ENABLE_LW_DELETIONS_CONSUMER:
daemons += [
(
"lw-deletions-consumer",
[
"snuba",
"lw-deletions-consumer",
"--storage=search_issues",
"--consumer-group=search_issues_deletes_group",
"--max-rows-batch-size=10",
"--max-batch-time-ms=1000",
"--auto-offset-reset=latest",
"--no-strict-offset-reset",
"--log-level=debug",
],
),
]

manager = Manager()
for name, cmd in daemons:
manager.add_process(
Expand Down
174 changes: 174 additions & 0 deletions snuba/cli/lw_deletions_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import logging
import signal
from typing import Any, Optional, Sequence

import click
import sentry_sdk
from arroyo import configure_metrics
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing import StreamProcessor

from snuba import environment, settings
from snuba.consumers.consumer_builder import (
ConsumerBuilder,
KafkaParameters,
ProcessingParameters,
)
from snuba.consumers.consumer_config import resolve_consumer_config
from snuba.datasets.deletion_settings import MAX_ROWS_TO_DELETE_DEFAULT
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.environment import setup_logging, setup_sentry
from snuba.lw_deletions.formatters import STORAGE_FORMATTER
from snuba.lw_deletions.strategy import LWDeletionsConsumerStrategyFactory
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter
from snuba.web.bulk_delete_query import STORAGE_TOPIC

# A longer batch time for deletes is reasonable
# since we want fewer mutations
DEFAULT_DELETIONS_MAX_BATCH_TIME_MS = 60000 * 2

logger = logging.getLogger(__name__)


@click.command()
@click.option(
"--consumer-group",
help="Consumer group use for consuming the deletion topic.",
required=True,
)
@click.option(
"--bootstrap-server",
multiple=True,
help="Kafka bootstrap server to use for consuming.",
)
@click.option("--storage", help="Storage name to consume from", required=True)
@click.option(
"--max-rows-batch-size",
default=MAX_ROWS_TO_DELETE_DEFAULT,
type=int,
help="Max amount of rows to delete at one time.",
)
@click.option(
"--max-batch-time-ms",
default=DEFAULT_DELETIONS_MAX_BATCH_TIME_MS,
type=int,
help="Max duration to buffer messages in memory for.",
)
@click.option(
"--auto-offset-reset",
default="earliest",
type=click.Choice(["error", "earliest", "latest"]),
help="Kafka consumer auto offset reset.",
)
@click.option(
"--no-strict-offset-reset",
is_flag=True,
help="Forces the kafka consumer auto offset reset.",
)
@click.option(
"--queued-max-messages-kbytes",
default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES,
type=int,
help="Maximum number of kilobytes per topic+partition in the local consumer queue.",
)
@click.option(
"--queued-min-messages",
default=settings.DEFAULT_QUEUED_MIN_MESSAGES,
type=int,
help="Minimum number of messages per topic+partition the local consumer queue should contain before messages are sent to kafka.",
)
@click.option("--log-level", help="Logging level to use.")
def lw_deletions_consumer(
*,
consumer_group: str,
bootstrap_server: Sequence[str],
storage: str,
max_rows_batch_size: int,
max_batch_time_ms: int,
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
queued_min_messages: int,
log_level: str,
) -> None:
setup_logging(log_level)
setup_sentry()

logger.info("Consumer Starting")

sentry_sdk.set_tag("storage", storage)
shutdown_requested = False
consumer: Optional[StreamProcessor[KafkaPayload]] = None

def handler(signum: int, frame: Any) -> None:
nonlocal shutdown_requested
shutdown_requested = True

if consumer is not None:
consumer.signal_shutdown()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

topic = STORAGE_TOPIC[storage]

while not shutdown_requested:
metrics_tags = {
"consumer_group": consumer_group,
"storage": storage,
}
metrics = MetricsWrapper(
environment.metrics, "lw_deletions_consumer", tags=metrics_tags
)
configure_metrics(StreamMetricsAdapter(metrics), force=True)
consumer_config = resolve_consumer_config(
storage_names=[storage],
raw_topic=topic.value,
commit_log_topic=None,
replacements_topic=None,
bootstrap_servers=bootstrap_server,
commit_log_bootstrap_servers=[],
replacement_bootstrap_servers=[],
slice_id=None,
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
group_instance_id=consumer_group,
)

consumer_builder = ConsumerBuilder(
consumer_config=consumer_config,
kafka_params=KafkaParameters(
group_id=consumer_group,
auto_offset_reset=auto_offset_reset,
strict_offset_reset=not no_strict_offset_reset,
queued_max_messages_kbytes=queued_max_messages_kbytes,
queued_min_messages=queued_min_messages,
),
processing_params=ProcessingParameters(None, None, None),
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
max_insert_batch_size=0,
max_insert_batch_time_ms=0,
metrics=metrics,
slice_id=None,
join_timeout=None,
enforce_schema=False,
metrics_tags=metrics_tags,
)

writable_storage = get_writable_storage(StorageKey(storage))
formatter = STORAGE_FORMATTER[storage]()
strategy_factory = LWDeletionsConsumerStrategyFactory(
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
storage=writable_storage,
formatter=formatter,
metrics=metrics,
)

consumer = consumer_builder.build_lw_deletions_consumer(strategy_factory)

consumer.run()
consumer_builder.flush()
9 changes: 9 additions & 0 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ def build_dlq_consumer(
dlq_policy,
)

def build_lw_deletions_consumer(
self, strategy_factory: ProcessingStrategyFactory[KafkaPayload]
) -> StreamProcessor[KafkaPayload]:
return self.__build_consumer(
strategy_factory,
self.raw_topic,
self.__build_default_dlq_policy(),
)

def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]:
"""
Default DLQ policy applies to the base consumer or the DLQ consumer when
Expand Down
Empty file added snuba/lw_deletions/__init__.py
Empty file.
Loading

0 comments on commit 42dce9d

Please sign in to comment.