Skip to content

Commit

Permalink
Add override_starting_version cfg option
Browse files Browse the repository at this point in the history
  • Loading branch information
keyliaran committed Mar 25, 2024
1 parent e20ecb2 commit 7de84f4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
7 changes: 5 additions & 2 deletions rust/processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
indexer_grpc_http2_ping_timeout_in_secs: 10
number_concurrent_processing_tasks: 10
auth_token: AUTH_TOKEN
starting_version: 0 # optional
ending_version: 0 # optional
version:
starting: 0
ending: 0 # optional
override_starting: false
transaction_filter:
# Only allow transactions from these contract addresses
# focus_contract_addresses:
Expand All @@ -47,6 +49,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
- `indexer_grpc_http2_ping_timeout_in_secs`: client-side grpc HTTP2 ping timeout.
- `auth_token`: Auth token used for connection.
- `starting_version`: start processor at starting_version.
- `override_starting_version`: indicates whether to enforce the starting version from the configuration, disregarding any existing database values.
- `ending_version`: stop processor after ending_version.
- `number_concurrent_processing_tasks`: number of tasks to parse and insert; 1 means sequential processing, otherwise,
transactions are splitted into tasks and inserted with random order.
Expand Down
3 changes: 3 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct IndexerGrpcProcessorConfig {
pub auth_token: String,
// Version to start indexing from
pub starting_version: Option<u64>,
// Indicates whether to enforce the starting version from the configuration, disregarding any existing database values
pub override_starting_version: Option<bool>,
// Version to end indexing at
pub ending_version: Option<u64>,
// Number of tasks waiting to pull transaction batches from the channel and process them
Expand Down Expand Up @@ -77,6 +79,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.grpc_http2_config.clone(),
self.auth_token.clone(),
self.starting_version,
self.override_starting_version,
self.ending_version,
self.number_concurrent_processing_tasks,
self.db_pool_size,
Expand Down
12 changes: 11 additions & 1 deletion rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct Worker {
pub grpc_http2_config: IndexerGrpcHttp2Config,
pub auth_token: String,
pub starting_version: Option<u64>,
pub override_starting_version: Option<bool>,
pub ending_version: Option<u64>,
pub number_concurrent_processing_tasks: usize,
pub gap_detection_batch_size: u64,
Expand All @@ -73,6 +74,7 @@ impl Worker {
grpc_http2_config: IndexerGrpcHttp2Config,
auth_token: String,
starting_version: Option<u64>,
override_starting_version: Option<bool>,
ending_version: Option<u64>,
number_concurrent_processing_tasks: Option<usize>,
db_pool_size: Option<u32>,
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Worker {
indexer_grpc_data_service_address,
grpc_http2_config,
starting_version,
override_starting_version,
ending_version,
auth_token,
number_concurrent_processing_tasks,
Expand Down Expand Up @@ -154,14 +157,21 @@ impl Worker {
0
});

let starting_version = self.starting_version.unwrap_or(starting_version_from_db);
let mut starting_version = self.starting_version.unwrap_or(starting_version_from_db);

if let Some(override_starting_version) = self.override_starting_version {
if override_starting_version && self.starting_version.is_some() {
starting_version = self.starting_version.unwrap();
}
}

info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = self.indexer_grpc_data_service_address.to_string(),
final_start_version = starting_version,
start_version_from_config = self.starting_version,
override_starting_version = self.override_starting_version,
start_version_from_db = starting_version_from_db,
"[Parser] Building processor",
);
Expand Down

0 comments on commit 7de84f4

Please sign in to comment.