diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 460cb7bcc1043..153afb99e94f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -338,8 +338,8 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C this.factory = factory; - // [-44, 0..2, 42, 200..204, 210, 302] - Use in tests. - // [300..307, 350..352] - CalciteMessageFactory. + // [-44, 0..2, 42, 200..204, 210] - Use in tests. + // [300 - 500] - CalciteMessageFactory. // [-4..-22, -30..-35, -54..-57] - SQL // [5000 - 5500]: Utility messages. Most of them originally come from Discovery. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 55a90f49a3374..d24ab7d7982be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -19,35 +19,37 @@ import java.util.Arrays; import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; /** * Partition update counters message. + * + * @see #finishUpdating() */ -public class PartitionUpdateCountersMessage implements MarshallableMessage { +public class PartitionUpdateCountersMessage implements Message { /** */ private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */; - /** Byte representation of partition counters. */ + /** */ @Order(0) - byte[] data; + int cacheId; - /** */ + /** Byte representation of partition counters. */ @Order(1) - int cacheId; + byte[] data; /** */ - private int size; + @Order(2) + int size; /** Used for assigning counters to cache entries during tx finish. */ private Map counters; - /** */ + /** Empty constructor for a {@link MessageFactory}. */ public PartitionUpdateCountersMessage() { // No-op. } @@ -120,6 +122,8 @@ public long updatesCount(int idx) { * @param part Partition number. * @param init Init partition counter. * @param updatesCnt Update counter delta. + * + * @see #finishUpdating() */ public void add(int part, long init, long updatesCnt) { ensureSpace(size + 1); @@ -131,6 +135,15 @@ public void add(int part, long init, long updatesCnt) { GridUnsafe.putLong(data, off, updatesCnt); } + /** Optimizes the memory used after adding counters with {@link #add(int, long, long)}. */ + public void finishUpdating() { + if (data != null && data.length != size * ITEM_SIZE) { + assert data.length > size * ITEM_SIZE; + + data = Arrays.copyOf(data, size * ITEM_SIZE); + } + } + /** * Calculate next counter for partition. * @@ -158,10 +171,9 @@ private void ensureSpace(int newSize) { int req = newSize * ITEM_SIZE; if (data.length < req) - data = Arrays.copyOf(data, data.length << 1); + data = Arrays.copyOf(data, (int)(data.length * 1.33f)); } - /** {@inheritDoc} */ @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -182,14 +194,4 @@ private void ensureSpace(int newSize) { ", cntrs=" + sb + '}'; } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - data = Arrays.copyOf(data, size * ITEM_SIZE); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - size = data == null ? 0 : data.length / ITEM_SIZE; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 10f38a3d105e1..392f37d32b060 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -2197,6 +2197,8 @@ public void applyPartitionsUpdatesCounters( resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i)); } + resCntrs.finishUpdating(); + if (resCntrs.size() > 0) res.add(resCntrs); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 71c79fe8f9bac..bf2ea8938c245 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -475,6 +475,8 @@ public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedExc } } + msg.finishUpdating(); + if (msg.size() > 0) cntrMsgs.add(msg); }