Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
import org.apache.zookeeper.proto.AddWatchRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
Expand Down Expand Up @@ -213,20 +214,23 @@ public void processRequest(Request request) {
switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_PING);
updateStats(request, lastOp, lastZxid);

responseSize = cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
return;
}
case OpCode.createSession: {
lastOp = "SESS";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CREATE_SESSION);
updateStats(request, lastOp, lastZxid);

zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_MULTI);
rsp = new MultiResponse();

for (ProcessTxnResult subTxnResult : rc.multiResult) {
Expand Down Expand Up @@ -269,6 +273,7 @@ public void processRequest(Request request) {
}
case OpCode.multiRead: {
lastOp = "MLTR";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_MULTI_READ);
MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new);
rsp = new MultiResponse();
OpResult subResult;
Expand Down Expand Up @@ -297,6 +302,7 @@ public void processRequest(Request request) {
}
case OpCode.create: {
lastOp = "CREA";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CREATE);
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
Expand All @@ -306,6 +312,7 @@ public void processRequest(Request request) {
case OpCode.createTTL:
case OpCode.createContainer: {
lastOp = "CREA";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CREATE);
rsp = new Create2Response(rc.path, rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
Expand All @@ -314,52 +321,60 @@ public void processRequest(Request request) {
case OpCode.delete:
case OpCode.deleteContainer: {
lastOp = "DELE";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_DELETE);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.setData: {
lastOp = "SETD";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_SET_DATA);
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.reconfig: {
lastOp = "RECO";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_RECONFIG);
rsp = new GetDataResponse(
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8),
rc.stat);
((QuorumZooKeeperServer) zks).self.getQuorumVerifier().toString().getBytes(UTF_8),
rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_SET_ACL);
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
requestPathMetricsCollector.registerRequest(request.type, rc.path);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CLOSE_SESSION);
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_SYNC);
SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CHECK);
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_EXISTS);
ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
Expand All @@ -382,6 +397,7 @@ public void processRequest(Request request) {
}
case OpCode.getData: {
lastOp = "GETD";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_DATA);
GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
Expand All @@ -390,6 +406,7 @@ public void processRequest(Request request) {
}
case OpCode.setWatches: {
lastOp = "SETW";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_SET_WATCHES);
SetWatches setWatches = request.readRequestRecord(SetWatches::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
Expand All @@ -405,6 +422,7 @@ public void processRequest(Request request) {
}
case OpCode.setWatches2: {
lastOp = "STW2";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_SET_WATCHES);
SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
Expand All @@ -418,13 +436,15 @@ public void processRequest(Request request) {
}
case OpCode.addWatch: {
lastOp = "ADDW";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_ADD_WATCH);
AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_ACL);
GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
Expand Down Expand Up @@ -467,6 +487,7 @@ public void processRequest(Request request) {
}
case OpCode.getChildren: {
lastOp = "GETC";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_CHILDREN);
GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
Expand All @@ -475,6 +496,7 @@ public void processRequest(Request request) {
}
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_ALL_CHILDREN_NUMBER);
GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
Expand All @@ -494,6 +516,7 @@ public void processRequest(Request request) {
}
case OpCode.getChildren2: {
lastOp = "GETC";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_CHILDREN);
GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
Stat stat = new Stat();
path = getChildren2Request.getPath();
Expand All @@ -515,6 +538,7 @@ public void processRequest(Request request) {
}
case OpCode.checkWatches: {
lastOp = "CHKW";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_CHECK_WATCHES);
CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
Expand All @@ -528,6 +552,7 @@ public void processRequest(Request request) {
}
case OpCode.removeWatches: {
lastOp = "REMW";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_REMOVE_WATCHES);
RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
Expand All @@ -541,11 +566,13 @@ public void processRequest(Request request) {
}
case OpCode.whoAmI: {
lastOp = "HOMI";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_WHO_AM_I);
rsp = new WhoAmIResponse(AuthUtil.getClientInfos(request.authInfo));
break;
}
case OpCode.getEphemerals: {
lastOp = "GETE";
incrementOpCount(ServerMetrics.getMetrics().OP_COUNT_GET_EPHEMERALS);
GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
Expand Down Expand Up @@ -677,4 +704,9 @@ private void updateStats(Request request, String lastOp, long lastZxid) {
request.cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp, request.createTime, currentTime);
}

private void incrementOpCount(final Counter specificCounter) {
final ServerMetrics metrics = ServerMetrics.getMetrics();
specificCounter.add(1);
metrics.OP_COUNT_TOTAL.add(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,32 @@ private ServerMetrics(MetricsProvider metricsProvider) {

TTL_NODE_DELETED_COUNT = metricsContext.getCounter("ttl_node_deleted_count");
TTL_NODE_CREATED_COUNT = metricsContext.getCounter("ttl_node_created_count");

// Operation count metrics
OP_COUNT_TOTAL = metricsContext.getCounter("op_count_total");
OP_COUNT_ADD_WATCH = metricsContext.getCounter("op_count_add_watch");
OP_COUNT_CHECK = metricsContext.getCounter("op_count_check");
OP_COUNT_CHECK_WATCHES = metricsContext.getCounter("op_count_check_watches");
OP_COUNT_CLOSE_SESSION = metricsContext.getCounter("op_count_close_session");
OP_COUNT_CREATE = metricsContext.getCounter("op_count_create");
OP_COUNT_CREATE_SESSION = metricsContext.getCounter("op_count_create_session");
OP_COUNT_DELETE = metricsContext.getCounter("op_count_delete");
OP_COUNT_EXISTS = metricsContext.getCounter("op_count_exists");
OP_COUNT_GET_ACL = metricsContext.getCounter("op_count_get_acl");
OP_COUNT_GET_ALL_CHILDREN_NUMBER = metricsContext.getCounter("op_count_get_all_children_number");
OP_COUNT_GET_CHILDREN = metricsContext.getCounter("op_count_get_children");
OP_COUNT_GET_DATA = metricsContext.getCounter("op_count_get_data");
OP_COUNT_GET_EPHEMERALS = metricsContext.getCounter("op_count_get_ephemerals");
OP_COUNT_MULTI = metricsContext.getCounter("op_count_multi");
OP_COUNT_MULTI_READ = metricsContext.getCounter("op_count_multi_read");
OP_COUNT_PING = metricsContext.getCounter("op_count_ping");
OP_COUNT_RECONFIG = metricsContext.getCounter("op_count_reconfig");
OP_COUNT_REMOVE_WATCHES = metricsContext.getCounter("op_count_remove_watches");
OP_COUNT_SET_ACL = metricsContext.getCounter("op_count_set_acl");
OP_COUNT_SET_DATA = metricsContext.getCounter("op_count_set_data");
OP_COUNT_SET_WATCHES = metricsContext.getCounter("op_count_set_watches");
OP_COUNT_SYNC = metricsContext.getCounter("op_count_sync");
OP_COUNT_WHO_AM_I = metricsContext.getCounter("op_count_who_am_i");
}

/**
Expand Down Expand Up @@ -556,6 +582,33 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter TTL_NODE_DELETED_COUNT;
public final Counter TTL_NODE_CREATED_COUNT;

// Operation count metrics
public final Counter OP_COUNT_TOTAL;
public final Counter OP_COUNT_ADD_WATCH;
public final Counter OP_COUNT_CHECK;
public final Counter OP_COUNT_CHECK_WATCHES;
public final Counter OP_COUNT_CLOSE_SESSION;
public final Counter OP_COUNT_CREATE;
public final Counter OP_COUNT_CREATE_SESSION;
public final Counter OP_COUNT_DELETE;
public final Counter OP_COUNT_EXISTS;
public final Counter OP_COUNT_GET_ACL;
public final Counter OP_COUNT_GET_ALL_CHILDREN_NUMBER;
public final Counter OP_COUNT_GET_CHILDREN;
public final Counter OP_COUNT_GET_DATA;
public final Counter OP_COUNT_GET_EPHEMERALS;
public final Counter OP_COUNT_MULTI;
public final Counter OP_COUNT_MULTI_READ;
public final Counter OP_COUNT_PING;
public final Counter OP_COUNT_RECONFIG;
public final Counter OP_COUNT_REMOVE_WATCHES;
public final Counter OP_COUNT_SET_ACL;
public final Counter OP_COUNT_SET_DATA;
public final Counter OP_COUNT_SET_WATCHES;
public final Counter OP_COUNT_SYNC;
public final Counter OP_COUNT_WHO_AM_I;


private final MetricsProvider metricsProvider;

public void resetAll() {
Expand Down
Loading