-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathGrpcUtils.java
More file actions
157 lines (124 loc) · 6.67 KB
/
GrpcUtils.java
File metadata and controls
157 lines (124 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package com.eventstore.dbclient;
import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
final class GrpcUtils {
static public <ReqT, RespT> ClientResponseObserver<ReqT, RespT> convertSingleResponse(
CompletableFuture<RespT> dest) {
return convertSingleResponse(dest, x -> x);
}
static public <ReqT, RespT, TargetT, ExceptionT extends Throwable> ClientResponseObserver<ReqT, RespT> convertSingleResponse(
CompletableFuture<TargetT> dest, ThrowingFunction<RespT, TargetT, ExceptionT> converter) {
return new ClientResponseObserver<ReqT, RespT>() {
@Override
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
}
@Override
public void onNext(RespT value) {
try {
TargetT converted = converter.apply(value);
dest.complete(converted);
} catch (Throwable e) {
dest.completeExceptionally(e);
}
}
@Override
public void onError(Throwable t) {
if (t instanceof StatusRuntimeException) {
StatusRuntimeException e = (StatusRuntimeException) t;
if (e.getStatus().getCode().equals(Status.FAILED_PRECONDITION.getCode())) {
String reason = e.getTrailers().get(Metadata.Key.of("exception", Metadata.ASCII_STRING_MARSHALLER));
String streamName = e.getTrailers().get(Metadata.Key.of("stream-name", Metadata.ASCII_STRING_MARSHALLER));
if (reason != null && reason.equals("stream-deleted")) {
dest.completeExceptionally(new StreamDeletedException(streamName));
return;
}
if (reason != null && reason.equals("wrong-expected-version")) {
String expectedStr = e.getTrailers().get(Metadata.Key.of("expected-version", Metadata.ASCII_STRING_MARSHALLER));
String actualStr = e.getTrailers().get(Metadata.Key.of("actual-version", Metadata.ASCII_STRING_MARSHALLER));
// Some old versions of the server, actual-version is not provided.
if (actualStr == null || actualStr.isEmpty()) {
// Equivalent of NoStream internally.
actualStr = "-1";
}
try {
StreamState expected = StreamState.fromRawLong(Long.parseLong(expectedStr));
StreamState actual = StreamState.fromRawLong(Long.parseLong(actualStr));
dest.completeExceptionally(new WrongExpectedVersionException(streamName, expected, actual));
return;
} catch (NumberFormatException ex) {
// Nothing to do here as it would mean the server sent
// invalid expected version numbers.
}
}
}
String leaderHost = e.getTrailers().get(Metadata.Key.of("leader-endpoint-host", Metadata.ASCII_STRING_MARSHALLER));
String leaderPort = e.getTrailers().get(Metadata.Key.of("leader-endpoint-port", Metadata.ASCII_STRING_MARSHALLER));
if (leaderHost != null && leaderPort != null) {
NotLeaderException reason = new NotLeaderException(leaderHost, Integer.valueOf(leaderPort));
dest.completeExceptionally(reason);
return;
}
}
dest.completeExceptionally(t);
}
@Override
public void onCompleted() {
}
};
}
static public StreamsOuterClass.ReadReq.Options.StreamOptions toStreamOptions(String streamName, StreamPosition<Long> revision) {
StreamsOuterClass.ReadReq.Options.StreamOptions.Builder builder = StreamsOuterClass.ReadReq.Options.StreamOptions.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
.setStreamName(ByteString.copyFromUtf8(streamName))
.build());
if (revision.isEnd()) {
return builder.setEnd(Shared.Empty.getDefaultInstance())
.build();
}
if (revision.isStart()) {
return builder.setStart(Shared.Empty.getDefaultInstance())
.build();
}
return builder.setRevision(revision.getPositionOrThrow())
.build();
}
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, EventStoreDBClientSettings settings, OptionsBase<O> options) {
return configureStub(stub, settings, options, null);
}
static public <S extends AbstractAsyncStub<S>, O> S configureStub(S stub, EventStoreDBClientSettings settings, OptionsBase<O> options, Long forceDeadlineInMs) {
S finalStub = stub;
ConnectionMetadata metadata = new ConnectionMetadata();
if (options.getKind() != OperationKind.Streaming) {
long deadlineInMs = 10_000;
if (forceDeadlineInMs != null) {
deadlineInMs = forceDeadlineInMs;
} else if (options.getDeadline() != null) {
deadlineInMs = options.getDeadline();
} else if (settings.getDefaultDeadline() != null) {
deadlineInMs = settings.getDefaultDeadline();
}
finalStub = finalStub.withDeadlineAfter(deadlineInMs, TimeUnit.MILLISECONDS);
}
UserCredentials credentials = null;
if (options.hasUserCredentials()) {
credentials = options.getCredentials();
} else if (settings.getDefaultCredentials() != null) {
credentials = settings.getDefaultCredentials();
}
if (credentials != null) {
metadata.authenticated(credentials);
}
if (options.isLeaderRequired() || settings.getNodePreference() == NodePreference.LEADER) {
metadata.requiresLeader();
}
metadata.headers(options.getHeaders());
return finalStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata.build()));
}
}