Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions config/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
-->

<!--
Every time you add an entry to this file, you must specify its MongoDB status and SpotBugs rank.

To determine the rank of a newly-detected bug that you would like to exclude,
run spotbugsMain task with xml.enabled, then examine the XML report in
<module>/build/reports/spotbugs/main.xml, where the rankings are
run the `spotbugsMain` Gradle task with `xmlReports.enabled`
(example: `./gradlew spotbugsMain -PxmlReports.enabled=true`),
then examine the XML report in
`<module>/build/reports/spotbugs/main.xml`, where the rankings are
included as part of the report, e.g.

<BugInstance type="PA_PUBLIC_PRIMITIVE_ATTRIBUTE" priority="2" rank="16" ...
Expand Down Expand Up @@ -289,5 +293,22 @@
<Class name="com.mongodb.internal.crypt.capi.CAPI$cstring"/>
<Bug pattern="NM_CLASS_NAMING_CONVENTION"/>
</Match>

<Match>
<!-- MongoDB status: "False Positive", SpotBugs rank: 18 -->
<!-- The second parameter of the Java SE API method we override is not annotated,
but we know the argument may be `null`, so we annotate the corresponding parameter
in the overriding method with `@Nullable`. -->
<Class name="com.mongodb.internal.thread.MongoScheduledThreadPoolExecutor"/>
<Method name="afterExecute"/>
<Bug pattern="NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION"/>
</Match>
<Match>
<!-- MongoDB status: "False Positive", SpotBugs rank: 18 -->
<!-- The second parameter of the Java SE API method we override is not annotated,
but we know the argument may be `null`, so we annotate the corresponding parameter
in the overriding method with `@Nullable`. -->
<Class name="com.mongodb.internal.thread.MongoThreadPoolExecutor"/>
<Method name="afterExecute"/>
<Bug pattern="NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

/**
* A driver-specific non-checked counterpart to {@link InterruptedException}.
* Before this exception is thrown, the {@linkplain Thread#isInterrupted() interrupt status} of the thread will have been set
* Before this exception is thrown, the {@linkplain Thread#isInterrupted() interrupted status} of the thread will have been set
* unless the {@linkplain #getCause() cause} is {@link InterruptedIOException}, in which case the driver leaves the status as is.
* <p>
* The Java SE API uses exceptions different from {@link InterruptedException} to communicate the same information:</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ private Builder() {
}

/**
* The executor service, intended to be used exclusively by the mongo
* client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown}
* of the executor service.
*
* <p>When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
* The {@link ExecutorService}, intended to be used exclusively by the {@code MongoClient}.
* <p>
* {@linkplain AutoCloseable#close() Closing} the {@code MongoClient} results in
* {@linkplain ExecutorService#shutdown() orderly shutdown} of the {@code executorService}.
* The application must not directly shut down the {@code executorService}.
* <p>
* When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
* {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
* for additional requirements for the executor service.
* for additional requirements for the {@code executorService}.
*
* @param executorService the executor service
* @return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public Builder socketChannelClass(final Class<? extends SocketChannel> socketCha

/**
* Sets the event loop group.
*
* <p>The application is responsible for shutting down the provided {@code eventLoopGroup}</p>
* <p>
* The application is responsible for shutting down the provided {@code eventLoopGroup}.
* It must not be shut down before or concurrently with {@linkplain AutoCloseable#close() closing} the {@code MongoClient}.
*
* @param eventLoopGroup the event loop group that all channels created by this factory will be a part of
* @return this
Expand Down
5 changes: 2 additions & 3 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
* This class is not part of the public API and may be removed or changed at any time.
*/
public final class Locks {
public static void withLock(final Lock lock, final Runnable action) {
Expand All @@ -41,8 +41,7 @@ public static void withInterruptibleLock(final StampedLock lock, final Runnable
try {
stamp = lock.writeLockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MongoInterruptedException("Interrupted waiting for lock", e);
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for lock", e);
}
try {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.internal.async.function.LoopControl;
import com.mongodb.internal.async.function.RetryControl;
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;
import com.mongodb.internal.thread.AsyncClientExecutor;

import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
Expand Down Expand Up @@ -233,6 +234,8 @@ default <R> AsyncSupplier<R> thenSupply(final AsyncSupplier<R> supplier) {
default AsyncRunnable thenRunRetryingWhile(final AsyncRunnable runnable, final Predicate<Throwable> shouldRetry) {
return thenRun(callback -> {
new RetryingAsyncCallbackSupplier<Void>(
// `AsyncClientExecutor` is not needed, given the contract of `SimpleRetryPolicy`, `RetryingAsyncCallbackSupplier`
AsyncClientExecutor.unimplemented(),
new RetryControl<>(new SimpleRetryPolicy(shouldRetry)),
// `finish` is required here instead of `unsafeFinish`
// because only `finish` meets the contract of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
import com.mongodb.internal.async.function.RetryPolicy;
import com.mongodb.internal.async.function.RetryPolicy.Decision.RetryAttemptInfo;

import java.time.Duration;
import java.util.function.Predicate;

/**
* {@link RetryAttemptInfo#getBackoff()} is always {@link Duration#isZero() zero}.
*/
final class SimpleRetryPolicy implements RetryPolicy {
private final Predicate<Throwable> shouldRetry;

Expand All @@ -30,6 +34,6 @@ final class SimpleRetryPolicy implements RetryPolicy {

@Override
public Decision onAttemptFailure(final RetryContext retryContext, final Throwable attemptFailedResult) {
return new Decision(attemptFailedResult, shouldRetry.test(attemptFailedResult) ? new RetryAttemptInfo() : null);
return new Decision(attemptFailedResult, shouldRetry.test(attemptFailedResult) ? new RetryAttemptInfo(Duration.ZERO) : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface SingleResultCallback<T> {
* @param result the result, which may be null. Always null if e is not null.
* @param t the throwable, or null if the operation completed normally
* @throws RuntimeException Never.
* @throws Error Never, on the best effort basis.
* @throws Error Never, on the best-effort basis.
*/
void onResult(@Nullable T result, @Nullable Throwable t);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public interface AsyncCallbackFunction<P, R> {
* @param callback A consumer of a result, {@link SingleResultCallback#onResult(Object, Throwable) completed} after
* (in the happens-before order) the asynchronous function completes.
* @throws RuntimeException Never. Exceptions must be relayed to the {@code callback}.
* @throws Error Never, on the best effort basis. Errors should be relayed to the {@code callback}.
* @throws Error Never, on the best-effort basis. Errors should be relayed to the {@code callback}.
*/
void apply(P a, SingleResultCallback<R> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;

import java.time.Duration;
import java.util.Optional;
import java.util.function.Supplier;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;

/**
Expand Down Expand Up @@ -87,12 +89,25 @@ public String toString() {
* The information needed to start a retry attempt.
*/
public static final class RetryAttemptInfo {
public RetryAttemptInfo() {
private final Duration backoff;

public RetryAttemptInfo(final Duration backoff) {
assertFalse(backoff.isNegative());
this.backoff = backoff;
}

/**
* A non-{@linkplain Duration#isNegative() negative} backoff.
*/
public Duration getBackoff() {
return backoff;
}

@Override
public String toString() {
return "RetryAttemptInfo{}";
return "RetryAttemptInfo{"
+ "backoff=" + backoff
+ '}';
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package com.mongodb.internal.async.function;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.internal.async.MutableValue;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;
import com.mongodb.internal.async.function.RetryPolicy.Decision.RetryAttemptInfo;
import com.mongodb.internal.thread.AsyncClientExecutor;

import static com.mongodb.assertions.Assertions.assertNotNull;
import java.time.Duration;

import static com.mongodb.internal.async.AsyncRunnable.beginAsync;

/**
* A decorator that implements automatic retrying of failed executions of an {@link AsyncCallbackSupplier}.
Expand All @@ -34,53 +38,45 @@
*/
@NotThreadSafe
public final class RetryingAsyncCallbackSupplier<R> implements AsyncCallbackSupplier<R> {
private final AsyncClientExecutor clientExecutor;
private final RetryControl<?> control;
private final AsyncCallbackSupplier<R> asyncFunction;

/**
* @param clientExecutor For {@linkplain AsyncClientExecutor#sleepAsync(Duration, SingleResultCallback) delaying} attempts
* according to {@link RetryAttemptInfo#getBackoff()}.
* @param control The {@link RetryControl} to control the new {@link RetryingAsyncCallbackSupplier}.
* @param asyncFunction The retryable {@link AsyncCallbackSupplier} to be decorated.
*/
public RetryingAsyncCallbackSupplier(final RetryControl<?> control, final AsyncCallbackSupplier<R> asyncFunction) {
public RetryingAsyncCallbackSupplier(
final AsyncClientExecutor clientExecutor,
final RetryControl<?> control,
final AsyncCallbackSupplier<R> asyncFunction) {
this.clientExecutor = clientExecutor;
this.control = control;
this.asyncFunction = asyncFunction;
}

@Override
public void get(final SingleResultCallback<R> callback) {
// `asyncFunction` and `callback` are the only externally provided pieces of code for which we do not need to care about
// them throwing exceptions. If they do, that violates their contract and there is nothing we should do about it.
asyncFunction.get(new RetryingCallback(callback));
}

/**
* This callback is allowed to be completed more than once.
*/
@NotThreadSafe
private class RetryingCallback implements SingleResultCallback<R> {
private final SingleResultCallback<R> wrapped;

RetryingCallback(final SingleResultCallback<R> callback) {
wrapped = callback;
}

@Override
public void onResult(@Nullable final R attemptSuccessfulResult, @Nullable final Throwable attemptFailedResult) {
if (attemptFailedResult != null) {
MutableValue<MutableValue<R>> asyncFunctionSuccessfulResult = new MutableValue<>();
beginAsync().thenRunWhileLoop(() -> asyncFunctionSuccessfulResult.getNullable() == null, iterationCallback -> {
beginAsync().<R>thenSupply(asyncFunctionCallback -> {
asyncFunction.get(asyncFunctionCallback);
}).thenConsume((attemptSuccessfulResult, onAttemptSuccessCallback) -> {
// `attemptSuccessfulResult` may be `null`, so we have to wrap it in `MutableValue` for the while check to notice it
asyncFunctionSuccessfulResult.set(new MutableValue<>(attemptSuccessfulResult));
onAttemptSuccessCallback.complete(onAttemptSuccessCallback);
}).onErrorIf(e -> true, (attemptFailedResult, onAttemptFailureCallback) -> {
if (attemptFailedResult instanceof Error) {
wrapped.onResult(null, attemptFailedResult);
return;
}
try {
assertNotNull(control.advanceOrThrow(attemptFailedResult));
} catch (Throwable retryingSupplierFailedResult) {
wrapped.onResult(null, retryingSupplierFailedResult);
return;
onAttemptFailureCallback.completeExceptionally(attemptFailedResult);
} else {
RetryAttemptInfo retryAttemptInfo = control.advanceOrThrow(attemptFailedResult);
clientExecutor.sleepAsync(retryAttemptInfo.getBackoff(), onAttemptFailureCallback);
}
asyncFunction.get(this);
} else {
wrapped.onResult(attemptSuccessfulResult, null);
}
}
}).finish(iterationCallback);
}).<R>thenSupply(c -> {
c.complete(asyncFunctionSuccessfulResult.get().getNullable());
}).finish(callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
package com.mongodb.internal.async.function;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.internal.async.MutableValue;
import com.mongodb.internal.async.function.RetryPolicy.Decision.RetryAttemptInfo;
import com.mongodb.internal.thread.AsyncClientExecutor;
import com.mongodb.lang.Nullable;

import java.time.Duration;
import java.util.function.Supplier;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* A decorator that implements automatic retrying of failed executions of a {@link Supplier}.
Expand All @@ -37,23 +43,37 @@ public final class RetryingSyncSupplier<R> implements Supplier<R> {
private final Supplier<R> syncFunction;

/**
* See {@link RetryingAsyncCallbackSupplier#RetryingAsyncCallbackSupplier(RetryControl, AsyncCallbackSupplier)}.
* See {@link RetryingAsyncCallbackSupplier#RetryingAsyncCallbackSupplier(AsyncClientExecutor, RetryControl, AsyncCallbackSupplier)}.
*/
public RetryingSyncSupplier(final RetryControl<?> control, final Supplier<R> syncFunction) {
this.control = control;
this.syncFunction = syncFunction;
}

@Override
@Nullable
public R get() {
while (true) {
MutableValue<MutableValue<R>> asyncFunctionSuccessfulResult = new MutableValue<>();
while (asyncFunctionSuccessfulResult.getNullable() == null) {
try {
return syncFunction.get();
R attemptSuccessfulResult = syncFunction.get();
// `attemptSuccessfulResult` may be `null`, so we have to wrap it in `MutableValue` for the while check to notice it
asyncFunctionSuccessfulResult.set(new MutableValue<>(attemptSuccessfulResult));
} catch (Error attemptFailedResult) {
throw attemptFailedResult;
} catch (Throwable attemptFailedResult) {
assertNotNull(control.advanceOrThrow(attemptFailedResult));
RetryAttemptInfo retryAttemptInfo = control.advanceOrThrow(attemptFailedResult);
sleep(retryAttemptInfo.getBackoff());
}
}
return asyncFunctionSuccessfulResult.get().getNullable();
}

private void sleep(final Duration duration) {
try {
NANOSECONDS.sleep(duration.toNanos());
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException(null, e);
}
}
}
Loading