Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ private int mergeUpdatedAttributeBits(AttributeValueHolder<?>[] attrVals) {
}

/** Immutable container that stores an attribute and its corresponding value. */
private static class AttributeValueHolder<T> {
public static class AttributeValueHolder<T> {
/** */
private final OperationContextAttribute<T> attr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class OperationContextAttribute<T> {
static final AtomicInteger ID_GEN = new AtomicInteger();

/** */
static final int MAX_ATTR_CNT = Integer.SIZE;
public static final int MAX_ATTR_CNT = Integer.SIZE;

/** */
private final int bitmask;
Expand All @@ -41,7 +41,7 @@ public class OperationContextAttribute<T> {
@Nullable private final T initVal;

/** */
private OperationContextAttribute(int bitmask, @Nullable T initVal) {
public OperationContextAttribute(int bitmask, @Nullable T initVal) {
this.bitmask = bitmask;
this.initVal = initVal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoSecurityAwareMessage;
import org.apache.ignite.internal.managers.communication.GridIoUserMessage;
import org.apache.ignite.internal.managers.communication.IgniteIoTestMessage;
import org.apache.ignite.internal.managers.communication.SessionChannelMessage;
Expand Down Expand Up @@ -229,6 +228,7 @@
import org.apache.ignite.internal.processors.query.stat.messages.StatisticsResponse;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRequest;
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
import org.apache.ignite.internal.processors.security.SecuritySubjectMessage;
import org.apache.ignite.internal.processors.service.ServiceChangeBatchRequest;
import org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResult;
import org.apache.ignite.internal.processors.service.ServiceClusterDeploymentResultBatch;
Expand Down Expand Up @@ -602,7 +602,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(GridIoMessage.class);
withNoSchema(IgniteIoTestMessage.class);
withSchema(GridIoUserMessage.class);
withSchema(GridIoSecurityAwareMessage.class);
++msgIdx; // Former GridIoSecurityAwareMessage.
withNoSchema(RecoveryLastReceivedMessage.class);
withNoSchema(TcpInverseConnectionResponseMessage.class);
withNoSchema(SessionChannelMessage.class);
Expand Down Expand Up @@ -670,6 +670,12 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
withNoSchema(PartitionHashRecord.class);
withNoSchema(TransactionsHashRecord.class);

// [13400 - 13600]: Operation context messages.
msgIdx = 13400;
withNoSchema(OperationContexAttributeMessage.class);
withNoSchema(OperationContexMessage.class);
withNoSchema(SecuritySubjectMessage.class);

assert msgIdx <= MAX_MESSAGE_ID;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal;

import org.apache.ignite.internal.thread.context.OperationContextAttribute;
import org.apache.ignite.plugin.extensions.communication.Message;

/** Transport of {@link OperationContextAttribute}. */
public class OperationContexAttributeMessage implements Message {
/** Operation context attribute type. */
@Order(0)
OperationContextAttributeType type;

/** Operation context attribute value. */
@Order(1)
Message val;

/** Empty constructor for serialization purposes. */
public OperationContexAttributeMessage() {
// No-op.
}

/** Creates operation context attribute message. */
public OperationContexAttributeMessage(OperationContextAttributeType type, Message val) {
this.type = type;
this.val = val;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal;

import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.thread.context.OperationContext;
import org.apache.ignite.internal.thread.context.OperationContextAttribute;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/** Transport of {@link OperationContext}. */
public class OperationContexMessage implements Message {
/** Expected maximal number of operation context attributes. Is for message size optimization. */
private static final int DFLT_EXPECTED_MAX_ATTRIBUTES_NUMBER = 5;

/** Effective operation context attributes and their values. */
@Order(0)
List<OperationContexAttributeMessage> opCtxAttrs;

/**
* Mapping of the attributes: attribute id -> effective attribute index + 1.
* If attributes index is 0, corresponding attribute is not set.
*/
@Order(1)
byte[] attrsMapping = new byte[OperationContextAttribute.MAX_ATTR_CNT];

/** Empty constructor for serialization purposes. */
public OperationContexMessage() {
// No-op.
}

/** */
public boolean hasAttribute(OperationContextAttributeType attrType) {
assert attrType.id() >= 0 && attrType.id() < attrsMapping.length;

return attrsMapping[attrType.id()] > 0;
}

/** */
public <T> @Nullable T attributeValue(OperationContextAttributeType attrType) {
assert attrType.id() >= 0 && attrType.id() < attrsMapping.length;

byte idx = attrsMapping[attrType.id()];

if (idx < 1)
return null;

--idx;

assert idx < opCtxAttrs.size();

OperationContexAttributeMessage attrMsg = opCtxAttrs.get(idx);

assert attrMsg != null;
assert attrMsg.type == attrType;

return (T)attrMsg.val;
}

/** */
public static OperationContexMessage enrich(
@Nullable OperationContexMessage msg,
OperationContextAttributeType attrType,
Message attrVal
) {
if (msg == null) {
msg = new OperationContexMessage();

msg.opCtxAttrs = new ArrayList<>(DFLT_EXPECTED_MAX_ATTRIBUTES_NUMBER);
}

assert attrType.id() >= 0 && attrType.id() < msg.attrsMapping.length;

msg.opCtxAttrs.add(new OperationContexAttributeMessage(attrType, attrVal));

msg.attrsMapping[attrType.id()] = (byte)msg.opCtxAttrs.size();

return msg;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal;

import org.apache.ignite.internal.processors.security.SecuritySubjectMessage;
import org.apache.ignite.internal.thread.context.OperationContextAttribute;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* Type of {@link OperationContextAttribute}.
*/
public enum OperationContextAttributeType {
/** */
SECURITY(SecuritySubjectMessage.class);

/** Attribute value type. */
private final Class<? extends Message> valType;

/** */
private OperationContextAttributeType(Class<? extends Message> valType) {
this.valType = valType;
}

/** */
public <T extends Message> OperationContextAttribute<T> create(OperationContextAttributeType type, @Nullable T initVal) {
assert type == null || initVal.getClass().isAssignableFrom(type());

return new OperationContextAttribute<>(type.id(), initVal);
}

/** */
public Class<? extends Message> type() {
return valType;
}

/**
* Attribute id (number). Limited by {@link OperationContextAttribute#MAX_ATTR_CNT}.
*
* @see OperationContextAttribute#bitmask()
*/
public byte id() {
assert (byte)ordinal() < OperationContextAttribute.MAX_ATTR_CNT;

return (byte)ordinal();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.OperationContexMessage;
import org.apache.ignite.internal.OperationContextAttributeType;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
Expand All @@ -97,6 +99,7 @@
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.security.SecuritySubjectMessage;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
Expand Down Expand Up @@ -137,7 +140,6 @@
import org.apache.ignite.spi.communication.tcp.internal.ConnectionRequestor;
import org.apache.ignite.spi.communication.tcp.internal.TcpConnectionRequestDiscoveryMessage;
import org.apache.ignite.spi.communication.tcp.internal.TcpInverseConnectionResponseMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
Expand Down Expand Up @@ -2025,28 +2027,27 @@ private long getInverseConnectionWaitTimeout() {
return ctx.config().getFailureDetectionTimeout();
}

/**
* @return One of two message wrappers. The first is {@link GridIoMessage}, the second is secured version {@link
* GridIoSecurityAwareMessage}.
*/
private @NotNull GridIoMessage createGridIoMessage(
/** @return A {@link GridIoMessage} wrapper for {@code msg}. */
private GridIoMessage createGridIoMessage(
Object topic,
Message msg,
byte plc,
boolean ordered,
long timeout,
boolean skipOnTimeout
) {
GridIoMessage res = new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout);

if (ctx.security().enabled()) {
UUID secSubjId = null;
assert res.opCtxMsg == null : "Several context operation attributes aren't supported yet.";

if (!ctx.security().isDefaultContext())
secSubjId = ctx.security().securityContext().subject().id();
UUID secSubjId = ctx.security().isDefaultContext() ? null : ctx.security().securityContext().subject().id();

return new GridIoSecurityAwareMessage(secSubjId, plc, topic, msg, ordered, timeout, skipOnTimeout);
res.opCtxMsg = OperationContexMessage.enrich(res.opCtxMsg, OperationContextAttributeType.SECURITY,
new SecuritySubjectMessage(secSubjId));
}

return new GridIoMessage(plc, topic, msg, ordered, timeout, skipOnTimeout);
return res;
}

/**
Expand Down Expand Up @@ -4236,9 +4237,12 @@ public long binLatencyMcs() {
*/
private UUID secSubjId(GridIoMessage msg) {
if (ctx.security().enabled()) {
assert msg instanceof GridIoSecurityAwareMessage;
assert msg.opCtxMsg != null;
assert msg.opCtxMsg.hasAttribute(OperationContextAttributeType.SECURITY);

SecuritySubjectMessage secSubjMsg = msg.opCtxMsg.attributeValue(OperationContextAttributeType.SECURITY);

return ((GridIoSecurityAwareMessage)msg).securitySubjectId();
return secSubjMsg.id;
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridTopicMessage;
import org.apache.ignite.internal.OperationContexMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
Expand Down Expand Up @@ -64,6 +65,11 @@ public class GridIoMessage implements Message, SpanTransport {
@Order(6)
byte[] span;

/** Effective operation context attributes. */
@Order(7)
@GridToStringInclude
public @Nullable OperationContexMessage opCtxMsg;

/**
* Default constructor.
*/
Expand Down
Loading