Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@

this.factory = factory;

// [-44, 0..2, 42, 200..204, 210, 302] - Use in tests.
// [300..307, 350..352] - CalciteMessageFactory.
// [-44, 0..2, 42, 200..204, 210] - Use in tests.
// [300 - 500] - CalciteMessageFactory.
// [-4..-22, -30..-35, -54..-57] - SQL

// [5000 - 5500]: Utility messages. Most of them originally come from Discovery.
Expand Down Expand Up @@ -448,7 +448,7 @@
withNoSchema(ExchangeFailureMessage.class);
withNoSchema(CacheStatisticsClearMessage.class);
withNoSchema(ClientCacheChangeDummyDiscoveryMessage.class);
// TODO: revise using resolved class loader, https://issues.apache.org/jira/browse/IGNITE-28637

Check warning on line 451 in modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZ5P9s8TOZg2hyVe6k6Y&open=AZ5P9s8TOZg2hyVe6k6Y&pullRequest=13140
withNoSchemaResolvedClassLoader(DynamicCacheChangeBatch.class);

// [10000 - 10200]: Transaction and lock related messages. Most of them originally comes from Communication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,37 @@

import java.util.Arrays;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/**
* Partition update counters message.
*
* @see #finishUpdating()
*/
public class PartitionUpdateCountersMessage implements MarshallableMessage {
public class PartitionUpdateCountersMessage implements Message {
/** */
private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */;

/** Byte representation of partition counters. */
/** */
@Order(0)
byte[] data;
int cacheId;

/** */
/** Byte representation of partition counters. */
@Order(1)
int cacheId;
byte[] data;

/** */
private int size;
@Order(2)
int size;

/** Used for assigning counters to cache entries during tx finish. */
private Map<Integer, Long> counters;

/** */
/** Empty constructor for a {@link MessageFactory}. */
public PartitionUpdateCountersMessage() {
// No-op.
}
Expand Down Expand Up @@ -120,6 +122,8 @@ public long updatesCount(int idx) {
* @param part Partition number.
* @param init Init partition counter.
* @param updatesCnt Update counter delta.
*
* @see #finishUpdating()
*/
public void add(int part, long init, long updatesCnt) {
ensureSpace(size + 1);
Expand All @@ -131,6 +135,15 @@ public void add(int part, long init, long updatesCnt) {
GridUnsafe.putLong(data, off, updatesCnt);
}

/** Optimizes the memory used after adding counters with {@link #add(int, long, long)}. */
public void finishUpdating() {
if (data != null && data.length != size * ITEM_SIZE) {
assert data.length > size * ITEM_SIZE;

data = Arrays.copyOf(data, size * ITEM_SIZE);
}
}

/**
* Calculate next counter for partition.
*
Expand Down Expand Up @@ -158,10 +171,9 @@ private void ensureSpace(int newSize) {
int req = newSize * ITEM_SIZE;

if (data.length < req)
data = Arrays.copyOf(data, data.length << 1);
data = Arrays.copyOf(data, (int)(data.length * 1.33f));
Comment thread
shishkovilja marked this conversation as resolved.
}


/** {@inheritDoc} */
@Override public String toString() {
StringBuilder sb = new StringBuilder();
Expand All @@ -182,14 +194,4 @@ private void ensureSpace(int newSize) {
", cntrs=" + sb +
'}';
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
data = Arrays.copyOf(data, size * ITEM_SIZE);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
size = data == null ? 0 : data.length / ITEM_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,8 @@ public void applyPartitionsUpdatesCounters(
resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
}

resCntrs.finishUpdating();

if (resCntrs.size() > 0)
res.add(resCntrs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedExc
}
}

msg.finishUpdating();

if (msg.size() > 0)
cntrMsgs.add(msg);
}
Expand Down
Loading