Skip to content

Commit

Permalink
IGNITE-23747 Support timeouts for RO transactions
Browse files Browse the repository at this point in the history
* Timeouts for RO transactions are supported
* Data availability time is used as a default RO transaction timeout
* After the timeout expires, an RO transaction gets aborted; we rely on existing cleanup mechanism to close remote cursors of this transaction and unlock its LWM locks
* RO transaction is guaranteed to not be aborted automatically until its timeout expires, but it might live longer than its timeout
* TxManager API has been revamped to make distinction between implicit and explicit transaction initiation more visible
  • Loading branch information
rpuch committed Jan 14, 2025
1 parent 86fd823 commit 4800554
Show file tree
Hide file tree
Showing 50 changed files with 1,117 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@
* Ignite transaction options.
*/
public class TransactionOptions {
/** Transaction timeout. */
/** Transaction timeout. 0 means 'use default timeout'. */
private long timeoutMillis = 0;

/** Read-only transaction. */
private boolean readOnly = false;

/**
* Returns transaction timeout, in milliseconds.
* Returns transaction timeout, in milliseconds. 0 means 'use default timeout'.
*
* @return Transaction timeout, in milliseconds.
*/
Expand All @@ -39,10 +39,17 @@ public long timeoutMillis() {
/**
* Sets transaction timeout, in milliseconds.
*
* @param timeoutMillis Transaction timeout, in milliseconds.
* @param timeoutMillis Transaction timeout, in milliseconds. Cannot be negative; 0 means 'use default timeout'.
* For RO transactions, the default timeout is data availability time configured via ignite.gc.lowWatermark.dataAvailabilityTime
* configuration setting.
* For RW transactions, timeouts are not supported yet. TODO: IGNITE-15936
* @return {@code this} for chaining.
*/
public TransactionOptions timeoutMillis(long timeoutMillis) {
if (timeoutMillis < 0) {
throw new IllegalArgumentException("Negative timeoutMillis: " + timeoutMillis);
}

this.timeoutMillis = timeoutMillis;

return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.tx;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.junit.jupiter.api.Test;

class TransactionOptionsTest {
@Test
void readOnlyIsFalseByDefault() {
assertThat(new TransactionOptions().readOnly(), is(false));
}

@Test
void readOnlyStatusIsSet() {
var options = new TransactionOptions();

options.readOnly(true);

assertThat(options.readOnly(), is(true));
}

@Test
void readOnlySetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.readOnly(true);

assertSame(options, afterSetting);
}

@Test
void timeoutIsZeroByDefault() {
assertThat(new TransactionOptions().timeoutMillis(), is(0L));
}

@Test
void timeoutIsSet() {
var options = new TransactionOptions();

options.timeoutMillis(3333);

assertThat(options.timeoutMillis(), is(3333L));
}

@Test
void timeoutSetterReturnsSameObject() {
var options = new TransactionOptions();

TransactionOptions afterSetting = options.timeoutMillis(3333);

assertSame(options, afterSetting);
}

@Test
void positiveTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void zeroTimeoutIsAllowed() {
assertDoesNotThrow(() -> new TransactionOptions().timeoutMillis(0));
}

@Test
void negativeTimeoutIsRejected() {
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> new TransactionOptions().timeoutMillis(-1));

assertThat(ex.getMessage(), is("Negative timeoutMillis: -1"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.handler.ssl.SslContext;
Expand Down Expand Up @@ -130,7 +129,7 @@ public class ClientHandlerModule implements IgniteComponent {

@TestOnly
@SuppressWarnings("unused")
private volatile ChannelHandler handler;
private volatile ClientInboundMessageHandler handler;

/**
* Constructor.
Expand Down Expand Up @@ -396,4 +395,9 @@ private ClientInboundMessageHandler createInboundMessageHandler(ClientConnectorV
partitionOperationsExecutor
);
}

@TestOnly
public ClientInboundMessageHandler handler() {
return handler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;

/**
* Handles messages from thin clients.
Expand Down Expand Up @@ -1139,4 +1140,9 @@ private static Set<AuthenticationEvent> authenticationEventsToSubscribe() {
AuthenticationEvent.USER_REMOVED
);
}

@TestOnly
public ClientResourceRegistry resources() {
return resources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.lang.CancelHandle;
Expand Down Expand Up @@ -189,8 +190,8 @@ public HybridTimestampTracker getTimestampTracker() {
}

private static SqlProperties createProperties(
JdbcStatementType stmtType,
boolean multiStatement,
JdbcStatementType stmtType,
boolean multiStatement,
ZoneId timeZoneId,
long queryTimeoutMillis
) {
Expand Down Expand Up @@ -455,7 +456,7 @@ ZoneId timeZoneId() {
* @return Transaction associated with the current connection.
*/
InternalTransaction getOrStartTransaction(HybridTimestampTracker timestampProvider) {
return tx == null ? tx = txManager.begin(timestampProvider, false) : tx;
return tx == null ? tx = txManager.beginExplicitRw(timestampProvider, InternalTxOptions.defaults()) : tx;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.type.DecimalNativeType;
import org.apache.ignite.internal.type.NativeType;
Expand Down Expand Up @@ -439,32 +440,53 @@ public static TableNotFoundException tableIdNotFoundException(Integer tableId) {
if (tx == null) {
// Implicit transactions do not use an observation timestamp because RW never depends on it, and implicit RO is always direct.
// The direct transaction uses a current timestamp on the primary replica by definition.
tx = startTx(out, txManager, null, true, readOnly);
tx = startImplicitTx(out, txManager, null, readOnly);
}

return tx;
}

/**
* Start a transaction.
* Starts an explicit transaction.
*
* @param out Packer.
* @param txManager Ignite transactions.
* @param currentTs Current observation timestamp or {@code null} if it is not defined.
* @param implicit Implicit transaction flag.
* @param readOnly Read only flag.
* @param options Transaction options.
* @return Transaction.
*/
public static InternalTransaction startTx(
public static InternalTransaction startExplicitTx(
ClientMessagePacker out,
TxManager txManager,
@Nullable HybridTimestamp currentTs,
boolean readOnly,
InternalTxOptions options
) {
return txManager.beginExplicit(
HybridTimestampTracker.clientTracker(currentTs, ts -> {}),
readOnly,
options
);
}

/**
* Starts an implicit transaction.
*
* @param out Packer.
* @param txManager Ignite transactions.
* @param currentTs Current observation timestamp or {@code null} if it is not defined.
* @param readOnly Read only flag.
* @return Transaction.
*/
public static InternalTransaction startImplicitTx(
ClientMessagePacker out,
TxManager txManager,
@Nullable HybridTimestamp currentTs,
boolean implicit,
boolean readOnly
) {
return txManager.begin(
HybridTimestampTracker.clientTracker(currentTs, implicit ? out::meta : ts -> {}),
implicit,
return txManager.beginImplicit(
HybridTimestampTracker.clientTracker(currentTs, out::meta),
readOnly
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.ignite.client.handler.requests.tx;

import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.startTx;
import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
Expand All @@ -27,6 +27,7 @@
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.TxManager;
import org.jetbrains.annotations.Nullable;

Expand All @@ -52,12 +53,20 @@ public class ClientTransactionBeginRequest {
ClientHandlerMetricSource metrics
) throws IgniteInternalCheckedException {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();

// Timestamp makes sense only for read-only transactions.
HybridTimestamp observableTs = readOnly ? HybridTimestamp.nullableHybridTimestamp(in.unpackLong()) : null;
HybridTimestamp observableTs = null;
if (readOnly) {
// Timestamp makes sense only for read-only transactions.
observableTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
}

InternalTxOptions txOptions = InternalTxOptions.builder()
.timeoutMillis(timeoutMillis)
.build();

// NOTE: we don't use beginAsync here because it is synchronous anyway.
var tx = startTx(out, txManager, observableTs, false, readOnly);
var tx = startExplicitTx(out, txManager, observableTs, readOnly, txOptions);

if (readOnly) {
// For read-only tx, override observable timestamp that we send to the client:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -133,7 +132,7 @@ public void contextClosedDuringBatchQuery() throws Exception {
CountDownLatch registryCloseLatch = new CountDownLatch(1);
long connectionId = acquireConnectionId();

when(txManager.begin(any(), eq(false))).thenAnswer(v -> {
when(txManager.beginExplicitRw(any(), any())).thenAnswer(v -> {
registryCloseLatch.countDown();
assertThat(startTxLatch.await(timeout, TimeUnit.SECONDS), is(true));

Expand Down Expand Up @@ -161,13 +160,13 @@ public void explicitTxRollbackOnCloseRegistry() {
InternalTransaction tx = mock(InternalTransaction.class);

when(tx.rollbackAsync()).thenReturn(nullCompletedFuture());
when(txManager.begin(any(), eq(false))).thenReturn(tx);
when(txManager.beginExplicitRw(any(), any())).thenReturn(tx);

long connectionId = acquireConnectionId();

await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("x", "UPDATE 1")));

verify(txManager).begin(any(), eq(false));
verify(txManager).beginExplicitRw(any(), any());
verify(tx, times(0)).rollbackAsync();

resourceRegistry.close();
Expand All @@ -183,32 +182,32 @@ public void singleTxUsedForMultipleOperations() {
InternalTransaction tx = mock(InternalTransaction.class);
when(tx.commitAsync()).thenReturn(nullCompletedFuture());
when(tx.rollbackAsync()).thenReturn(nullCompletedFuture());
when(txManager.begin(any(), eq(false))).thenReturn(tx);
when(txManager.beginExplicitRw(any(), any())).thenReturn(tx);

long connectionId = acquireConnectionId();
verify(txManager, times(0)).begin(any(), eq(false));
verify(txManager, times(0)).beginExplicitRw(any(), any());

String schema = "schema";
JdbcStatementType type = JdbcStatementType.SELECT_STATEMENT_TYPE;

await(eventHandler.queryAsync(
connectionId, createExecuteRequest(schema, "SELECT 1", type)
));
verify(txManager, times(1)).begin(any(), eq(false));
verify(txManager, times(1)).beginExplicitRw(any(), any());
await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 1", "UPDATE 2")));
verify(txManager, times(1)).begin(any(), eq(false));
verify(txManager, times(1)).beginExplicitRw(any(), any());

await(eventHandler.finishTxAsync(connectionId, false));
verify(tx).rollbackAsync();

await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 1", "UPDATE 2")));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());
await(eventHandler.queryAsync(
connectionId, createExecuteRequest(schema, "SELECT 2", type)
));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());
await(eventHandler.batchAsync(connectionId, createExecuteBatchRequest("schema", "UPDATE 3", "UPDATE 4")));
verify(txManager, times(2)).begin(any(), eq(false));
verify(txManager, times(2)).beginExplicitRw(any(), any());

await(eventHandler.finishTxAsync(connectionId, true));
verify(tx).commitAsync();
Expand All @@ -223,7 +222,7 @@ void simpleQueryCancellation() {

long connectionId = acquireConnectionId();

JdbcQueryExecuteRequest executeRequest = createExecuteRequest("schema", "SELECT 1", JdbcStatementType.SELECT_STATEMENT_TYPE);
JdbcQueryExecuteRequest executeRequest = createExecuteRequest("schema", "SELECT 1", JdbcStatementType.SELECT_STATEMENT_TYPE);

CompletableFuture<? extends Response> resultFuture = eventHandler.queryAsync(connectionId, executeRequest);

Expand Down
Loading

0 comments on commit 4800554

Please sign in to comment.