-
Notifications
You must be signed in to change notification settings - Fork 107
Expand file tree
/
Copy pathNonBlockingDirectStatsDClient.java
More file actions
217 lines (175 loc) · 7.32 KB
/
NonBlockingDirectStatsDClient.java
File metadata and controls
217 lines (175 loc) · 7.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package com.timgroup.statsd;
class NonBlockingDirectStatsDClient extends NonBlockingStatsDClient implements DirectStatsDClient {
public NonBlockingDirectStatsDClient(final NonBlockingStatsDClientBuilder builder) throws StatsDClientException {
super(builder);
}
@Override
public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) {
if (values != null && values.length > 0) {
sendMetric(new DoublesStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
@Override
public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) {
if (values != null && values.length > 0) {
sendMetric(new LongsStatsDMessage(aspect, Message.Type.DISTRIBUTION, values, sampleRate, 0, tags));
}
}
@Override
public void recordSketchWithTimestamp(String aspect, long[] values, double sampleRate, long timestamp, String... tags) {
if (values != null && values.length > 0) {
sendMetric(new LongSketchMessage(aspect, values, sampleRate, timestamp, tags));
}
}
abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none
private int metadataSize = -1; // Cache the size of the metadata, -1 means not calculated yet
private int offset = 0; // The index of the first value that has not been written
MultiValuedStatsDMessage(String aspect, Message.Type type, String[] tags, double sampleRate, long timestamp) {
super(aspect, type, tags);
this.sampleRate = sampleRate;
this.timestamp = timestamp;
}
@Override
public final boolean canAggregate() {
return false;
}
@Override
public final void aggregate(Message message) {
}
@Override
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
int metadataSize = metadataSize(builder, containerID);
writeHeadMetadata(builder);
boolean partialWrite = writeValuesTo(builder, capacity - metadataSize);
writeTailMetadata(builder, containerID);
return partialWrite;
}
private int metadataSize(StringBuilder builder, String containerID) {
if (metadataSize == -1) {
final int previousLength = builder.length();
final int previousEncodedLength = Utf8.encodedLength(builder);
writeHeadMetadata(builder);
writeTailMetadata(builder, containerID);
metadataSize = Utf8.encodedLength(builder) - previousEncodedLength;
builder.setLength(previousLength);
}
return metadataSize;
}
private void writeHeadMetadata(StringBuilder builder) {
builder.append(prefix).append(aspect);
}
private void writeTailMetadata(StringBuilder builder, String containerID) {
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
}
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
tagString(tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}
builder.append('\n');
}
private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) {
if (offset >= lengthOfValues()) {
return false;
}
int maxLength = builder.length() + remainingCapacity;
// Add at least one value
builder.append(':');
writeValueTo(builder, offset);
int previousLength = builder.length();
// Add remaining values up to the max length
for (int i = offset + 1; i < lengthOfValues(); i++) {
builder.append(':');
writeValueTo(builder, i);
if (builder.length() > maxLength) {
builder.setLength(previousLength);
offset = i;
return true;
}
previousLength = builder.length();
}
offset = lengthOfValues();
return false;
}
protected abstract int lengthOfValues();
protected abstract void writeValueTo(StringBuilder buffer, int index);
}
final class LongsStatsDMessage extends MultiValuedStatsDMessage {
private final long[] values;
LongsStatsDMessage(String aspect, Message.Type type, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}
@Override
protected int lengthOfValues() {
return values.length;
}
@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}
final class DoublesStatsDMessage extends MultiValuedStatsDMessage {
private final double[] values;
DoublesStatsDMessage(String aspect, Message.Type type, double[] values, double sampleRate, long timestamp,
String[] tags) {
super(aspect, type, tags, sampleRate, timestamp);
this.values = values;
}
@Override
protected int lengthOfValues() {
return values.length;
}
@Override
protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}
final ProtobufWriter pw = new ProtobufWriter();
final DirectSketch sk = new DirectSketch();
final class LongSketchMessage extends Message {
final long[] values;
final double sampleRate;
final long timestamp;
LongSketchMessage(String aspect, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, Message.Type.SKETCH, tags);
this.sampleRate = sampleRate;
this.values = values;
this.timestamp = timestamp;
}
@Override
public final boolean canAggregate() {
return false;
}
@Override
public final void aggregate(Message message) {}
@Override
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
sk.build(values, sampleRate);
pw.clear();
sk.serialize(pw, timestamp);
builder
.append(prefix)
.append(aspect)
.append(":");
pw.flip();
pw.encodeAscii(builder);
builder.append("|S");
if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}
tagString(tags, builder);
builder.append("\n");
return false;
}
}
}