-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathGrpcClient.java
More file actions
133 lines (107 loc) · 4.63 KB
/
GrpcClient.java
File metadata and controls
133 lines (107 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package io.kurrent.dbclient;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.kurrent.dbclient.serialization.MessageSerializer;
import io.kurrent.dbclient.serialization.MessageSerializerBuilder;
import io.kurrent.dbclient.serialization.OperationSerializationSettings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
class GrpcClient {
private static final Logger logger = LoggerFactory.getLogger(GrpcClient.class);
private final AtomicBoolean closed;
private final LinkedBlockingQueue<Msg> queue;
private final KurrentDBClientSettings settings;
private final MessageSerializer serializer;
GrpcClient(KurrentDBClientSettings settings, AtomicBoolean closed, LinkedBlockingQueue<Msg> queue) {
this.settings = settings;
this.closed = closed;
this.queue = queue;
this.serializer = MessageSerializerBuilder.get(settings.getSerializationSettings());
}
public boolean isShutdown() {
return this.closed.get();
}
private void push(Msg msg) {
try {
if (this.closed.get()) {
if (msg instanceof RunWorkItem) {
RunWorkItem args = (RunWorkItem) msg;
args.reportError(new ConnectionShutdownException());
}
if (msg instanceof Shutdown) {
((Shutdown) msg).complete();
}
return;
}
this.queue.put(msg);
logger.debug("Scheduled msg: {}", msg);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public CompletableFuture<WorkItemArgs> getWorkItemArgs() {
final CompletableFuture<WorkItemArgs> result = new CompletableFuture<>();
this.push(new RunWorkItem(UUID.randomUUID(), (args, error) -> {
if (error != null) {
result.completeExceptionally(error);
return;
}
result.complete(args);
}));
return result;
}
public CompletableFuture<Optional<ServerVersion>> getServerVersion() {
return runWithArgs(args -> CompletableFuture.completedFuture(args.getServerVersion()));
}
public <A> CompletableFuture<A> run(Function<ManagedChannel, CompletableFuture<A>> action) {
return runWithArgs(args -> action.apply(args.getChannel()));
}
public <A> CompletableFuture<A> runWithArgs(Function<WorkItemArgs, CompletableFuture<A>> action) {
return getWorkItemArgs().thenComposeAsync((args) -> {
return action.apply(args).handleAsync((outcome, e) -> {
if (e != null) {
if (e instanceof CompletionException) {
e = e.getCause();
}
if (e instanceof NotLeaderException) {
NotLeaderException ex = (NotLeaderException) e;
// TODO - Currently we don't retry on not leader exception but we might consider
// allowing this on a case-by-case basis.
this.push(new CreateChannel(args.getId(), ex.getLeaderEndpoint()));
}
if (e instanceof StatusRuntimeException) {
StatusRuntimeException ex = (StatusRuntimeException) e;
if (ex.getStatus().getCode().equals(Status.Code.UNAVAILABLE)) {
this.push(new CreateChannel(args.getId()));
}
}
logger.debug("RunWorkItem[{}] completed exceptionally: {}", args.getId(), e.toString());
if (e instanceof RuntimeException)
throw (RuntimeException) e;
else
throw new RuntimeException(e);
}
return outcome;
});
});
}
public CompletableFuture<Void> shutdown() {
final CompletableFuture<Void> completion = new CompletableFuture<>();
this.push(new Shutdown(completion::complete));
return completion;
}
public KurrentDBClientSettings getSettings() {
return this.settings;
}
public MessageSerializer getSerializer(OperationSerializationSettings serializationSettings) {
return this.serializer.with(serializationSettings);
}
}