Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor indexer version config #332

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rust/processor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
number_concurrent_processing_tasks: 10
auth_token: AUTH_TOKEN
starting_version: 0 # optional
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: update the document

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for being late on this.

a question about this: if starting_version is specific, does it also mean that processing start from this version regardless of db version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a new field called "override_starting_version." The logic is straightforward:

  1. If "override_starting_version" exists and "starting_version" is configured, use it regardless of the database version.
  2. If "override_starting_version" doesn't exist, use the database version.

starting_version_if_nothing_in_db: 0 # optional
ending_version: 0 # optional
transaction_filter:
# Only allow transactions from these contract addresses
Expand All @@ -46,7 +47,8 @@ Indexer GRPC parser is to indexer data processor that leverages the indexer grpc
- `indexer_grpc_http2_ping_interval_in_secs`: client-side grpc HTTP2 ping interval.
- `indexer_grpc_http2_ping_timeout_in_secs`: client-side grpc HTTP2 ping timeout.
- `auth_token`: Auth token used for connection.
- `starting_version`: start processor at starting_version.
- `starting_version`: start processor at starting_version regardless of the database version.
- `starting_version_if_nothing_in_db`: start processor at the starting_version if nothing is set in the database.
- `ending_version`: stop processor after ending_version.
- `number_concurrent_processing_tasks`: number of tasks to parse and insert; 1 means sequential processing, otherwise,
transactions are splitted into tasks and inserted with random order.
Expand Down
3 changes: 3 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct IndexerGrpcProcessorConfig {
pub auth_token: String,
// Version to start indexing from
pub starting_version: Option<u64>,
// Version to start indexing from if nothing in db
pub starting_version_if_nothing_in_db: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want a better name..

also, for these two parameters, it's not clear which one will take precedence. we probably should either use an enum here, or ensure they are mutually exclusive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should avoid using enums to maintain backward compatibility. The current implementation works as follows:

  • If starting_version exists in the config, it will override all existing values (both starting_version_if_nothing_in_db and the database value).
  • If starting_version is not configured, we'll take a look to the database value. If the database has no value and starting_version_if_nothing_in_db is configured, we'll use that value; otherwise, we'll default to 0.

// Version to end indexing at
pub ending_version: Option<u64>,
// Number of tasks waiting to pull transaction batches from the channel and process them
Expand Down Expand Up @@ -77,6 +79,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.grpc_http2_config.clone(),
self.auth_token.clone(),
self.starting_version,
self.starting_version_if_nothing_in_db,
self.ending_version,
self.number_concurrent_processing_tasks,
self.db_pool_size,
Expand Down
27 changes: 20 additions & 7 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct Worker {
pub grpc_http2_config: IndexerGrpcHttp2Config,
pub auth_token: String,
pub starting_version: Option<u64>,
pub starting_version_if_nothing_in_db: Option<u64>,
pub ending_version: Option<u64>,
pub number_concurrent_processing_tasks: usize,
pub gap_detection_batch_size: u64,
Expand All @@ -73,6 +74,7 @@ impl Worker {
grpc_http2_config: IndexerGrpcHttp2Config,
auth_token: String,
starting_version: Option<u64>,
starting_version_if_nothing_in_db: Option<u64>,
ending_version: Option<u64>,
number_concurrent_processing_tasks: Option<usize>,
db_pool_size: Option<u32>,
Expand Down Expand Up @@ -107,6 +109,7 @@ impl Worker {
indexer_grpc_data_service_address,
grpc_http2_config,
starting_version,
starting_version_if_nothing_in_db,
ending_version,
auth_token,
number_concurrent_processing_tasks,
Expand Down Expand Up @@ -141,27 +144,37 @@ impl Worker {
"[Parser] Finished migrations"
);

let starting_version_from_db = self
let mut starting_version_from_db = None;

let mut starting_version = match self
.get_start_version()
.await
.expect("[Parser] Database error when getting starting version")
.unwrap_or_else(|| {
.expect("[Parser] Database error when getting starting version") {
None => {
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
"[Parser] No starting version from db so starting from version 0"
);
0
});
self.starting_version_if_nothing_in_db.unwrap_or(0)
}
Some(version) => {
starting_version_from_db = Some(version);
version
}
};

let starting_version = self.starting_version.unwrap_or(starting_version_from_db);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the change! This line is still necessary to override with starting_version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Fixed

if let Some(force_start_version) = self.starting_version {
starting_version = force_start_version
}

info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = self.indexer_grpc_data_service_address.to_string(),
final_start_version = starting_version,
start_version_from_config = self.starting_version,
starting_version_if_nothing_in_db = self.starting_version_if_nothing_in_db,
start_version_from_db = starting_version_from_db,
"[Parser] Building processor",
);
Expand Down Expand Up @@ -746,4 +759,4 @@ pub fn build_processor(
UserTransactionProcessor::new(db_pool, per_table_chunk_sizes),
),
}
}
}