Skip to content

Commit

Permalink
chore: fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CookiePieWw committed Feb 14, 2025
1 parent 8458cb4 commit 5328ee4
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 34 deletions.
4 changes: 2 additions & 2 deletions src/common/meta/src/kv_backend/rds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ impl<T: Executor> ExecutorImpl<'_, T> {
#[allow(dead_code)]
async fn execute(&mut self, query: &str, params: &Vec<&Vec<u8>>) -> Result<()> {
match self {
Self::Default(executor) => executor.default_execute(query, params).await,
Self::Txn(executor) => executor.txn_execute(query, params).await,
Self::Default(executor) => executor.execute(query, params).await,
Self::Txn(executor) => executor.execute(query, params).await,
}
}

Expand Down
60 changes: 28 additions & 32 deletions src/common/meta/src/kv_backend/rds/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ use common_telemetry::debug;
use snafu::ResultExt;
use sqlx::mysql::MySqlRow;
use sqlx::pool::Pool;
use sqlx::{MySql, MySqlPool, Row, Transaction};
use sqlx::{MySql, MySqlPool, Row, Transaction as MySqlTransaction};

use crate::error::{CreateMySqlPoolSnafu, MySqlExecutionSnafu, MySqlTransactionSnafu, Result};
use crate::kv_backend::rds::{
DefaultQueryExecutor, ExecutorFactory, KvQueryExecutor, RdsQueryExecutor, RdsStore,
TxnQueryExecutor, RDS_STORE_TXN_RETRY_COUNT,
Executor, ExecutorFactory, ExecutorImpl, KvQueryExecutor, RdsStore, Transaction,
RDS_STORE_TXN_RETRY_COUNT,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::{
Expand All @@ -34,7 +34,7 @@ use crate::rpc::store::{
use crate::rpc::KeyValue;

type MySqlClient = Arc<Pool<MySql>>;
type MySqlTxnClient = Transaction<'static, MySql>;
pub struct MySqlTxnClient(MySqlTransaction<'static, MySql>);

fn key_value_from_row(row: MySqlRow) -> KeyValue {
// Safety: key and value are the first two columns in the row
Expand Down Expand Up @@ -219,8 +219,8 @@ impl MySqlTemplateSet {
}

#[async_trait::async_trait]
impl DefaultQueryExecutor for MySqlClient {
type TxnExecutor<'a>
impl Executor for MySqlClient {
type Transaction<'a>
= MySqlTxnClient
where
Self: 'a;
Expand All @@ -229,11 +229,7 @@ impl DefaultQueryExecutor for MySqlClient {
"MySql"
}

async fn default_query(
&mut self,
raw_query: &str,
params: &[&Vec<u8>],
) -> Result<Vec<KeyValue>> {
async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
let rows = query
Expand All @@ -243,7 +239,7 @@ impl DefaultQueryExecutor for MySqlClient {
Ok(rows.into_iter().map(key_value_from_row).collect())
}

async fn default_execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
query
Expand All @@ -253,7 +249,7 @@ impl DefaultQueryExecutor for MySqlClient {
Ok(())
}

async fn txn_executor<'a>(&'a mut self) -> Result<Self::TxnExecutor<'a>> {
async fn txn_executor<'a>(&'a mut self) -> Result<Self::Transaction<'a>> {
// sqlx has no isolation level support for now, so we have to set it manually.
// TODO(CookiePie): Waiting for https://github.com/launchbadge/sqlx/pull/3614 and remove this.
sqlx::query("SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE")
Expand All @@ -266,38 +262,38 @@ impl DefaultQueryExecutor for MySqlClient {
.begin()
.await
.context(MySqlExecutionSnafu { sql: "begin" })?;
Ok(txn)
Ok(MySqlTxnClient(txn))
}
}

#[async_trait::async_trait]
impl TxnQueryExecutor<'_> for MySqlTxnClient {
async fn txn_query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
impl Transaction<'_> for MySqlTxnClient {
async fn query(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<Vec<KeyValue>> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
// As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
let rows = query
.fetch_all(&mut **self)
.fetch_all(&mut *(self.0))
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(rows.into_iter().map(key_value_from_row).collect())
}

async fn txn_execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
async fn execute(&mut self, raw_query: &str, params: &[&Vec<u8>]) -> Result<()> {
let query = sqlx::query(raw_query);
let query = params.iter().fold(query, |query, param| query.bind(param));
// As said in https://docs.rs/sqlx/latest/sqlx/trait.Executor.html, we need a `&mut *transaction`. Weird.
query
.execute(&mut **self)
.execute(&mut *(self.0))
.await
.context(MySqlExecutionSnafu { sql: raw_query })?;
Ok(())
}

/// Caution: sqlx will stuck on the query if two transactions conflict with each other.
/// Don't know if it's a feature or it depends on the database. Be careful.
async fn txn_commit(self) -> Result<()> {
self.commit().await.context(MySqlTransactionSnafu {
async fn commit(self) -> Result<()> {
self.0.commit().await.context(MySqlTransactionSnafu {
operation: "commit",
})?;
Ok(())
Expand Down Expand Up @@ -330,7 +326,7 @@ pub type MySqlStore = RdsStore<MySqlClient, MySqlExecutorFactory, MySqlTemplateS
impl KvQueryExecutor<MySqlClient> for MySqlStore {
async fn range_with_query_executor(
&self,
query_executor: &mut RdsQueryExecutor<'_, MySqlClient>,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: RangeRequest,
) -> Result<RangeResponse> {
let template_type = range_template(&req.key, &req.range_end);
Expand Down Expand Up @@ -358,7 +354,7 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {

async fn batch_put_with_query_executor(
&self,
query_executor: &mut RdsQueryExecutor<'_, MySqlClient>,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchPutRequest,
) -> Result<BatchPutResponse> {
let mut in_params = Vec::with_capacity(req.kvs.len() * 3);
Expand All @@ -384,9 +380,9 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
return Ok(BatchPutResponse::default());
}
// Should use transaction to ensure atomicity.
if let RdsQueryExecutor::Default(query_executor) = query_executor {
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = RdsQueryExecutor::Txn(txn);
let mut txn = ExecutorImpl::Txn(txn);
let res = self.batch_put_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
Expand All @@ -398,7 +394,7 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {

async fn batch_get_with_query_executor(
&self,
query_executor: &mut RdsQueryExecutor<'_, MySqlClient>,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchGetRequest,
) -> Result<BatchGetResponse> {
if req.keys.is_empty() {
Expand All @@ -414,14 +410,14 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {

async fn delete_range_with_query_executor(
&self,
query_executor: &mut RdsQueryExecutor<'_, MySqlClient>,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse> {
// Since we need to know the number of deleted keys, we have no fast path here.
// Should use transaction to ensure atomicity.
if let RdsQueryExecutor::Default(query_executor) = query_executor {
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = RdsQueryExecutor::Txn(txn);
let mut txn = ExecutorImpl::Txn(txn);
let res = self.delete_range_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
Expand Down Expand Up @@ -450,7 +446,7 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {

async fn batch_delete_with_query_executor(
&self,
query_executor: &mut RdsQueryExecutor<'_, MySqlClient>,
query_executor: &mut ExecutorImpl<'_, MySqlClient>,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse> {
if req.keys.is_empty() {
Expand All @@ -466,9 +462,9 @@ impl KvQueryExecutor<MySqlClient> for MySqlStore {
return Ok(BatchDeleteResponse::default());
}
// Should use transaction to ensure atomicity.
if let RdsQueryExecutor::Default(query_executor) = query_executor {
if let ExecutorImpl::Default(query_executor) = query_executor {
let txn = query_executor.txn_executor().await?;
let mut txn = RdsQueryExecutor::Txn(txn);
let mut txn = ExecutorImpl::Txn(txn);
let res = self.batch_delete_with_query_executor(&mut txn, req).await;
txn.commit().await?;
return res;
Expand Down

0 comments on commit 5328ee4

Please sign in to comment.