-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathAppendToStream.java
More file actions
122 lines (101 loc) · 5.87 KB
/
AppendToStream.java
File metadata and controls
122 lines (101 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package io.kurrent.dbclient;
import io.kurrent.dbclient.proto.shared.Shared;
import io.kurrent.dbclient.proto.streams.StreamsGrpc;
import io.kurrent.dbclient.proto.streams.StreamsOuterClass;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
class AppendToStream {
private final GrpcClient client;
private final String streamName;
private final List<EventData> events;
private final AppendToStreamOptions options;
public AppendToStream(GrpcClient client, String streamName, Iterator<EventData> events, AppendToStreamOptions options) {
this.client = client;
this.streamName = streamName;
this.events = new ArrayList<>();
while (events.hasNext()) {
this.events.add(events.next());
}
this.options = options;
}
public CompletableFuture<WriteResult> execute() {
return this.client.run(channel -> ClientTelemetry.traceAppend(
this::append,
channel,
events,
this.streamName,
this.client.getSettings(),
this.options.getCredentials()));
}
private CompletableFuture<WriteResult> append(ManagedChannel channel, List<EventData> events) {
CompletableFuture<WriteResult> result = new CompletableFuture<>();
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getStreamState().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
.setStreamName(ByteString.copyFromUtf8(streamName))
.build()));
StreamsGrpc.StreamsStub client = GrpcUtils.configureStub(StreamsGrpc.newStub(channel), this.client.getSettings(), this.options);
StreamObserver<StreamsOuterClass.AppendReq> requestStream = client.append(GrpcUtils.convertSingleResponse(result, resp -> {
if (resp.hasSuccess()) {
StreamsOuterClass.AppendResp.Success success = resp.getSuccess();
Position logPosition = null;
if (success.getPositionOptionCase() == StreamsOuterClass.AppendResp.Success.PositionOptionCase.POSITION) {
StreamsOuterClass.AppendResp.Position p = success.getPosition();
logPosition = new Position(p.getCommitPosition(), p.getPreparePosition());
}
StreamState nextExpectedRevision = success.hasNoStream() ? StreamState.noStream()
: StreamState.streamRevision(success.getCurrentRevision());
return new WriteResult(nextExpectedRevision, logPosition);
}
if (resp.hasWrongExpectedVersion()) {
StreamsOuterClass.AppendResp.WrongExpectedVersion wev = resp.getWrongExpectedVersion();
StreamState expectedRevision;
if (wev.getExpectedRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.ExpectedRevisionOptionCase.EXPECTED_ANY) {
expectedRevision = StreamState.any();
} else if (wev.getExpectedRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.ExpectedRevisionOptionCase.EXPECTED_STREAM_EXISTS) {
expectedRevision = StreamState.streamExists();
} else if (wev.getExpectedRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.ExpectedRevisionOptionCase.EXPECTED_NO_STREAM) {
expectedRevision = StreamState.noStream();
} else {
expectedRevision = StreamState.streamRevision(wev.getExpectedRevision());
}
StreamState streamState;
if (wev.getCurrentRevisionOptionCase() == StreamsOuterClass.AppendResp.WrongExpectedVersion.CurrentRevisionOptionCase.CURRENT_NO_STREAM) {
streamState = StreamState.noStream();
} else {
streamState = StreamState.streamRevision(wev.getCurrentRevision());
}
String streamName = options.getStreamIdentifier().getStreamName().toStringUtf8();
throw new WrongExpectedVersionException(streamName, expectedRevision, streamState);
}
throw new IllegalStateException("AppendResponse has neither Success or WrongExpectedVersion variants");
}));
try {
requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder().setOptions(options).build());
for (EventData e : events) {
StreamsOuterClass.AppendReq.ProposedMessage.Builder msgBuilder = StreamsOuterClass.AppendReq.ProposedMessage.newBuilder()
.setId(Shared.UUID.newBuilder()
.setStructured(Shared.UUID.Structured.newBuilder()
.setMostSignificantBits(e.getEventId().getMostSignificantBits())
.setLeastSignificantBits(e.getEventId().getLeastSignificantBits())))
.setData(ByteString.copyFrom(e.getEventData()))
.putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType())
.putMetadata(SystemMetadataKeys.TYPE, e.getEventType());
if (e.getUserMetadata() != null) {
msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getUserMetadata()));
}
requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder()
.setProposedMessage(msgBuilder)
.build());
}
requestStream.onCompleted();
} catch (RuntimeException e) {
result.completeExceptionally(e);
}
return result;
}
}