Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -321,6 +324,8 @@ static class Builder extends AbstractReadContext.Builder<Builder, MultiUseReadOn
private TimestampBound bound;
private Timestamp timestamp;
private ByteString transactionId;
private Options.BeginTransactionOption beginTransactionOption =
Options.BeginTransactionOption.EXPLICIT;

private Builder() {}

Expand All @@ -339,6 +344,11 @@ Builder setTransactionId(ByteString transactionId) {
return this;
}

Builder setBeginTransactionOption(Options.BeginTransactionOption beginTransactionOption) {
this.beginTransactionOption = beginTransactionOption;
return this;
}

@Override
MultiUseReadOnlyTransaction build() {
return new MultiUseReadOnlyTransaction(this);
Expand All @@ -359,9 +369,15 @@ static Builder newBuilder() {
@GuardedBy("txnLock")
private ByteString transactionId;

@GuardedBy("txnLock")
private SettableApiFuture<ByteString> transactionIdFuture;

private final AtomicInteger pendingStarts = new AtomicInteger(0);

private static final long WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS = 60_000L;

private final Map<SpannerRpc.Option, ?> channelHint;
private final Options.BeginTransactionOption beginTransactionOption;

MultiUseReadOnlyTransaction(Builder builder) {
super(builder);
Expand All @@ -386,6 +402,7 @@ static Builder newBuilder() {
session.getOptions(),
ThreadLocalRandom.current().nextLong(Long.MAX_VALUE),
session.getSpanner().getOptions().isGrpcGcpExtensionEnabled());
this.beginTransactionOption = builder.beginTransactionOption;
}

@Override
Expand All @@ -398,21 +415,68 @@ protected boolean isRouteToLeader() {
return false;
}

private boolean shouldUseInlinedBegin() {
return beginTransactionOption == Options.BeginTransactionOption.INLINE;
}

@Override
void beforeReadOrQuery() {
super.beforeReadOrQuery();
initTransaction();
if (shouldUseInlinedBegin()) {
// Keep the same nested transaction guard as the explicit BeginTransaction path. This checks
// TransactionRunner's thread-local pending state, not the session's active transaction.
SessionImpl.throwIfTransactionsPending();
} else {
initTransaction();
}
Comment on lines +425 to +431
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The call to SessionImpl.throwIfTransactionsPending() is problematic for several reasons:

  1. Correctness: throwIfTransactionsPending() is an instance method in SessionImpl. Calling it as a static method will result in a compilation error.
  2. Logic Error: Even if called on the session instance, this check will likely throw an IllegalStateException because the current MultiUseReadOnlyTransaction is already set as the active transaction on the session.
  3. Redundancy: The check for pending transactions is already performed in SessionImpl.setActive() when the transaction is initialized.

For inlined begin, we should simply skip initTransaction() without adding this extra check.

      if (!shouldUseInlinedBegin()) {
        initTransaction();
      }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stale review

}

@Override
@Nullable
TransactionSelector getTransactionSelector() {
// No need for synchronization: super.readInternal() is always preceded by a check of
// "transactionId" that provides a happens-before from initialization, and the value is never
// changed afterwards.
@SuppressWarnings("GuardedByChecker")
TransactionSelector selector = TransactionSelector.newBuilder().setId(transactionId).build();
return selector;
if (!shouldUseInlinedBegin()) {
// No need for synchronization: super.readInternal() is always preceded by a check of
// "transactionId" that provides a happens-before from initialization, and the value is
// never changed afterwards.
@SuppressWarnings("GuardedByChecker")
TransactionSelector selector =
TransactionSelector.newBuilder().setId(transactionId).build();
return selector;
}

ApiFuture<ByteString> futureToWaitFor = null;
txnLock.lock();
try {
if (transactionId != null) {
return TransactionSelector.newBuilder().setId(transactionId).build();
}
if (transactionIdFuture == null) {
transactionIdFuture = SettableApiFuture.create();
return TransactionSelector.newBuilder()
.setBegin(createReadOnlyTransactionOptions())
.build();
}
Comment on lines +453 to +458
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a potential for transactionIdFuture to be leaked (never completed) if the thread that returns the begin selector fails after creating the future but before the RPC is successfully initiated. In readInternal or executeQueryInternal, if an exception is thrown after getTransactionSelector() returns but before the RPC consumer is registered, onError or onDone will never be called. Consider adding a mechanism to ensure the future is failed if the 'begin' operation fails to start.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After creating an inline-begin selector, both read and query start paths wrap
the RPC start in a try/catch and call onStartFailed(withBeginTransaction, t), which completes transactionIdFuture exceptionally. onError() and onDone() also fail the future if the stream starts but no transaction id is returned.

futureToWaitFor = transactionIdFuture;
} finally {
txnLock.unlock();
}

try {
return TransactionSelector.newBuilder()
.setId(futureToWaitFor.get(WAIT_FOR_INLINE_BEGIN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that when using inline-begin-tx and you run multiple queries concurrently at the start of the transaction, the queries that do not include the BeginTransaction option effectively get a different timeout behavior than today; if the query that starts the transaction is slow, then the other queries could fail. It might be good to include this in the documentation for this option (or maybe more in general for the documentation: Basically advice against using this option when the read-only transaction executes queries in parallel)

.build();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.asSpannerException(e.getCause());
} catch (TimeoutException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED,
"Timeout while waiting for an inlined read-only transaction to be returned by another"
+ " statement.",
e);
Comment thread
rahul2393 marked this conversation as resolved.
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
}
}

private void decrementPendingStartsAndSignal() {
Expand Down Expand Up @@ -503,6 +567,80 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti
bufferRows);
}

@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
Timestamp readTimestamp = null;
if (transaction.hasReadTimestamp()) {
try {
readTimestamp = Timestamp.fromProto(transaction.getReadTimestamp());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this change does introduce a behavioral difference between inline-begin and explicit-begin, albeit one that is perfectly valid. With explicit-begin, you can call getReadTimestamp() right away starting the transaction. With inline-begin, the getReadTimestamp() method will throw an error if you try to call it before at least one PartialResultSet has been returned by the first query. This is an additional reason why we cannot make this behavior the default without a major version bump (or we would need to change this behavior into something that would be safe).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated the comment

} catch (IllegalArgumentException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
}
if (shouldIncludeId && transaction.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, NO_TRANSACTION_RETURNED_MSG);
}
txnLock.lock();
try {
if (timestamp == null) {
if (readTimestamp == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
}
timestamp = readTimestamp;
}
if (shouldIncludeId && transactionId == null) {
transactionId = transaction.getId();
if (transactionIdFuture != null && !transactionIdFuture.isDone()) {
transactionIdFuture.set(transactionId);
}
}
} finally {
txnLock.unlock();
}
}

@Override
public SpannerException onError(
SpannerException e, boolean withBeginTransaction, boolean lastStatement) {
e = super.onError(e, withBeginTransaction, lastStatement);
if (withBeginTransaction) {
failTransactionIdFuture(e);
}
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {
if (withBeginTransaction) {
failTransactionIdFuture(
SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"ResultSet was closed before a read-only transaction id was returned"));
}
super.onDone(withBeginTransaction);
}

@Override
void onStartFailed(boolean withBeginTransaction, Throwable t) {
if (withBeginTransaction) {
failTransactionIdFuture(t);
}
}

private void failTransactionIdFuture(Throwable t) {
txnLock.lock();
try {
if (transactionIdFuture != null && !transactionIdFuture.isDone()) {
transactionIdFuture.setException(t);
}
} finally {
txnLock.unlock();
}
}

@Override
public Timestamp getReadTimestamp() {
txnLock.lock();
Expand Down Expand Up @@ -544,6 +682,19 @@ public void close() {
super.close();
}

private TransactionOptions createReadOnlyTransactionOptions() {
TransactionOptions.Builder options = TransactionOptions.newBuilder();
if (timestamp != null) {
options
.getReadOnlyBuilder()
.setReadTimestamp(timestamp.toProto())
.setReturnReadTimestamp(true);
} else {
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
}
return options.build();
}

/**
* Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction.
* This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with
Expand All @@ -553,19 +704,10 @@ void initFallbackTransaction() {
txnLock.lock();
try {
span.addAnnotation("Creating Transaction");
TransactionOptions.Builder options = TransactionOptions.newBuilder();
if (timestamp != null) {
options
.getReadOnlyBuilder()
.setReadTimestamp(timestamp.toProto())
.setReturnReadTimestamp(true);
} else {
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
}
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.setOptions(createReadOnlyTransactionOptions())
.build();
initTransactionInternal(request);
} finally {
Expand All @@ -589,12 +731,10 @@ void initTransaction() {
return;
}
span.addAnnotation("Creating Transaction");
TransactionOptions.Builder options = TransactionOptions.newBuilder();
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.setOptions(createReadOnlyTransactionOptions())
.build();
initTransactionInternal(request);
} finally {
Expand Down Expand Up @@ -976,15 +1116,22 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
boolean withBeginTransaction = request.getTransaction().hasBegin();
SpannerRpc.StreamingCall call;
try {
call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
} catch (RuntimeException | Error t) {
onStartFailed(withBeginTransaction, t);
throw t;
}
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
stream.setCall(call, withBeginTransaction);
return stream;
}

Expand Down Expand Up @@ -1100,6 +1247,8 @@ public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

void onStartFailed(boolean withBeginTransaction, Throwable t) {}

/**
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
* present in the RPC response. In such cases, this method will be a no-op.
Expand Down Expand Up @@ -1199,15 +1348,22 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
boolean withBeginTransaction = builder.getTransaction().hasBegin();
SpannerRpc.StreamingCall call;
try {
call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
requestId,
isRouteToLeader());
} catch (RuntimeException | Error t) {
onStartFailed(withBeginTransaction, t);
throw t;
}
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());
stream.setCall(call, withBeginTransaction);
return stream;
}

Expand Down
Loading
Loading