From d281808fb8aeda3df1a70d30d1a1f87ef90c3f9f Mon Sep 17 00:00:00 2001 From: Yi Peng Date: Wed, 14 Jan 2026 20:51:12 +0100 Subject: [PATCH 1/6] concurrency tests --- .../io/FrameBlockConcurrentCopyTest.java | 274 ++++++++++++++++++ 1 file changed, 274 insertions(+) create mode 100644 src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java diff --git a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java new file mode 100644 index 00000000000..35f247d62c2 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java @@ -0,0 +1,274 @@ +package org.apache.sysds.test.functions.io; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(value = Parameterized.class) +@net.jcip.annotations.NotThreadSafe +public class FrameBlockConcurrentCopyTest extends AutomatedTestBase { + + private final static String TEST_NAME = "FrameBlockConcurrentCopyTest"; + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameBlockConcurrentCopyTest.class.getSimpleName() + "/"; + + private final int _threads; + + public FrameBlockConcurrentCopyTest(int threads) { + _threads = threads; + } + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); + } + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {20}, {50} + }); + } + + /** + * Boolean Array Consistency: Verifies that concurrent writes of "All True" or "All False" + * rows do not result in mixed rows. + */ + @Test + public void testBooleanArrayConsistency() { + final int COLS = 64; + final int ITERATIONS = 100; + + for(int iter = 0; iter < ITERATIONS; iter++) { + ValueType[] schema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); + FrameBlock target = new FrameBlock(schema, 1); + target.ensureAllocatedColumns(1); + + // all false + for(int c = 0; c < COLS; c++) target.set(0, c, false); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(_threads); + AtomicInteger errors = new AtomicInteger(0); + + for(int t = 0; t < _threads; t++) { + final int threadId = t; + new Thread(() -> { + try { + startLatch.await(); + + // pattern + boolean pattern = (threadId % 2 == 0); + ValueType[] subSchema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); + + // source blocks + FrameBlock source = new FrameBlock(subSchema, 1); + source.ensureAllocatedColumns(1); + for(int c = 0; c < COLS; c++) source.set(0, c, pattern); + + // repeatedly copy to the shared target + for(int i = 0; i < 50; i++) { + // change pattern + for(int c = 0; c < COLS; c++) source.set(0, c, !pattern); + pattern = !pattern; + + target.copy(0, 0, 0, COLS - 1, source); + } + } catch (Exception e) { + errors.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }).start(); + } + + startLatch.countDown(); + try { + doneLatch.await(); + } catch (InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Exceptions occurred inside worker threads", 0, errors.get()); + + // The row must be uniform (All True/False), mix row indicates overwriting + Boolean firstVal = (Boolean) target.get(0, 0); + for(int c = 1; c < COLS; c++) { + Boolean val = (Boolean) target.get(0, c); + if(val != firstVal) { + Assert.fail("Inconsistent array detected! Found mixed values in what should be an atomic block write."); + } + } + } + } + + /** + * Non-Boolean Type Safety: Verifies that standard types are thread-safe + * when threads write to disjoint rows. + */ + @Test + public void testNonBooleanTypesNoSync() { + final int ITERATIONS = 20; + final int COLS = 5; + ValueType[] types = {ValueType.INT64, ValueType.FP64, ValueType.STRING}; + + for(ValueType type : types) { + for(int iter = 0; iter < ITERATIONS; iter++) { + // 1 row per threat to check disjoint writing + ValueType[] schema = UtilFunctions.nCopies(COLS, type); + FrameBlock target = new FrameBlock(schema, _threads); + target.ensureAllocatedColumns(_threads); + + CyclicBarrier barrier = new CyclicBarrier(_threads); + AtomicInteger errors = new AtomicInteger(0); + Thread[] threads = new Thread[_threads]; + + for(int t = 0; t < _threads; t++) { + final int threadId = t; + threads[t] = new Thread(() -> { + try { + barrier.await(); + + FrameBlock source = new FrameBlock(schema, 1); + source.ensureAllocatedColumns(1); + + // value based on threadID + for(int c = 0; c < COLS; c++) { + initializeCell(source, 0, c, type, threadId + c); + } + + // Copy source into target at specific row index + target.copy(threadId, threadId, 0, COLS - 1, source); + + } catch (Exception e) { + errors.incrementAndGet(); + } + }); + threads[t].start(); + } + + try { + for(Thread t : threads) t.join(); + } catch (InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Thread errors detected for type " + type, 0, errors.get()); + + for(int r = 0; r < _threads; r++) { + for(int c = 0; c < COLS; c++) { + Object val = target.get(r, c); + verifyCell(val, type, r + c); + } + } + } + } + } + + /** + * Test 3: High-Contention Bit Packing Stress Test + * Race-condition test + * Threads focus rows within shared column + */ + @Test + public void testBooleanBitPacking() { + for(int iter = 0; iter < 10; iter++) { + final int ROWS = 1024; + final int STRESS_LOOPS = 2000; + + ValueType[] schema = new ValueType[]{ValueType.BOOLEAN}; + FrameBlock target = new FrameBlock(schema, ROWS); + target.ensureAllocatedColumns(ROWS); + + //all false + for(int r = 0; r < ROWS; r++) target.set(r, 0, false); + + // pre-allocate source blocks + FrameBlock[] sources = new FrameBlock[ROWS]; + for(int i = 0; i < ROWS; i++) { + sources[i] = new FrameBlock(schema, 1); + sources[i].ensureAllocatedColumns(1); + sources[i].set(0, 0, true); + } + + CyclicBarrier barrier = new CyclicBarrier(ROWS); + AtomicInteger exceptions = new AtomicInteger(0); + Thread[] threads = new Thread[ROWS]; + + for(int i = 0; i < ROWS; i++) { + final int rowIndex = i; + threads[i] = new Thread(() -> { + try { + barrier.await(); + + // try to maximize overlapp + for(int k=0; k Date: Wed, 14 Jan 2026 20:51:37 +0100 Subject: [PATCH 2/6] add copy() re-implementation --- .../sysds/runtime/frame/data/FrameBlock.java | 61 +++++++++++++++---- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java index 63cadb43cf4..3dbf4e77508 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java @@ -1302,23 +1302,60 @@ public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { ensureAllocateMeta(); if(_coldata == null) // allocate column data. _coldata = new Array[_schema.length]; - synchronized(this) { // make sync locks - // TODO remove sync locks on array types where they are not needed. - if(_columnLocks == null) { - Object[] locks = new Object[_schema.length]; - for(int i = 0; i < locks.length; i++) - locks[i] = new Object(); - _columnLocks = new SoftReference<>(locks); - } - } - Object[] locks = _columnLocks.get(); - for(int j = cl; j <= cu; j++) { // for each column - synchronized(locks[j]) { // synchronize on the column. + + for(int j = cl; j <= cu; j++) { + if(_schema[j] == ValueType.BOOLEAN) { + // option 1: Boolean columns and needs lock + // double check logging. logging implementation made using chatgpt + Object[] locks = (_columnLocks != null) ? _columnLocks.get() : null; + if(locks == null) { + synchronized(this) { + locks = (_columnLocks != null) ? _columnLocks.get() : null; + if(locks == null) { + locks = new Object[_schema.length]; + for(int i = 0; i < locks.length; i++) + locks[i] = new Object(); + _columnLocks = new java.lang.ref.SoftReference<>(locks); + } + } + } + // read/write inside lock, for safest write and most accurate read + synchronized(locks[j]) { + _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); + } + } else { + // option 2: not boolean so no locking _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); } } } +// public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { +// // If full copy, fall back to default copy +// if(rl == 0 && cl == 0 && ru + 1 == this.getNumRows() && cu + 1 == this.getNumColumns()) { +// copy(src); +// return; +// } +// ensureAllocateMeta(); +// if(_coldata == null) // allocate column data. +// _coldata = new Array[_schema.length]; +// synchronized(this) { // make sync locks +// // TODO remove sync locks on array types where they are not needed. +// if(_columnLocks == null) { +// Object[] locks = new Object[_schema.length]; +// for(int i = 0; i < locks.length; i++) +// locks[i] = new Object(); +// _columnLocks = new SoftReference<>(locks); +// } +// } +// Object[] locks = _columnLocks.get(); +// for(int j = cl; j <= cu; j++) { // for each column +// synchronized(locks[j]) { // synchronize on the column. +// _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); +// } +// } +// } + /** * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX, as Recode map * generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map which contains token and From a1a08a223421f30ee4d7251db03ad7f0e1d665bc Mon Sep 17 00:00:00 2001 From: Yi Peng Date: Wed, 28 Jan 2026 21:56:40 +0100 Subject: [PATCH 3/6] add copy() re-implementation --- .../sysds/runtime/frame/data/FrameBlock.java | 56 ++++++------------- 1 file changed, 16 insertions(+), 40 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java index 3dbf4e77508..b40bb1e4078 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java @@ -52,9 +52,7 @@ import org.apache.sysds.runtime.codegen.CodegenUtils; import org.apache.sysds.runtime.controlprogram.caching.CacheBlock; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; -import org.apache.sysds.runtime.frame.data.columns.Array; -import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; -import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata; +import org.apache.sysds.runtime.frame.data.columns.*; import org.apache.sysds.runtime.frame.data.iterators.IteratorFactory; import org.apache.sysds.runtime.frame.data.lib.FrameFromMatrixBlock; import org.apache.sysds.runtime.frame.data.lib.FrameLibAppend; @@ -1304,58 +1302,36 @@ public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { _coldata = new Array[_schema.length]; for(int j = cl; j <= cu; j++) { - if(_schema[j] == ValueType.BOOLEAN) { - // option 1: Boolean columns and needs lock - // double check logging. logging implementation made using chatgpt - Object[] locks = (_columnLocks != null) ? _columnLocks.get() : null; - if(locks == null) { + Array col = _coldata[j]; + boolean isUnsafe = (col == null) || + (col instanceof OptionalArray) || + (col instanceof BitSetArray) || + (col instanceof RaggedArray) || + (col instanceof ACompressedArray); + + if(isUnsafe) { + if(_columnLocks == null || _columnLocks.get() == null) { synchronized(this) { - locks = (_columnLocks != null) ? _columnLocks.get() : null; - if(locks == null) { - locks = new Object[_schema.length]; + if(_columnLocks == null || _columnLocks.get() == null) { + Object[] locks = new Object[_schema.length]; for(int i = 0; i < locks.length; i++) locks[i] = new Object(); - _columnLocks = new java.lang.ref.SoftReference<>(locks); + _columnLocks = new SoftReference<>(locks); } } } + Object[] locks = _columnLocks.get(); // read/write inside lock, for safest write and most accurate read synchronized(locks[j]) { _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); } - } else { - // option 2: not boolean so no locking + } + else { _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); } } } -// public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { -// // If full copy, fall back to default copy -// if(rl == 0 && cl == 0 && ru + 1 == this.getNumRows() && cu + 1 == this.getNumColumns()) { -// copy(src); -// return; -// } -// ensureAllocateMeta(); -// if(_coldata == null) // allocate column data. -// _coldata = new Array[_schema.length]; -// synchronized(this) { // make sync locks -// // TODO remove sync locks on array types where they are not needed. -// if(_columnLocks == null) { -// Object[] locks = new Object[_schema.length]; -// for(int i = 0; i < locks.length; i++) -// locks[i] = new Object(); -// _columnLocks = new SoftReference<>(locks); -// } -// } -// Object[] locks = _columnLocks.get(); -// for(int j = cl; j <= cu; j++) { // for each column -// synchronized(locks[j]) { // synchronize on the column. -// _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); -// } -// } -// } - /** * This function will split every Recode map in the column using delimiter Lop.DATATYPE_PREFIX, as Recode map * generated earlier in the form of Code+Lop.DATATYPE_PREFIX+Token and store it in a map which contains token and From 4f507b12f4c6189424949b10ca696a777ef00bbc Mon Sep 17 00:00:00 2001 From: Yi Peng Date: Wed, 28 Jan 2026 21:56:49 +0100 Subject: [PATCH 4/6] add new tests --- .../io/FrameBlockConcurrentCopyTest.java | 675 +++++++++++------- 1 file changed, 435 insertions(+), 240 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java index 35f247d62c2..d02a1282f48 100644 --- a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java +++ b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java @@ -1,7 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.sysds.test.functions.io; +import javassist.bytecode.ByteArray; +import org.apache.hadoop.security.SaslOutputStream; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.*; import org.apache.sysds.runtime.util.UtilFunctions; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; @@ -10,6 +32,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import scala.Enumeration; import java.util.Arrays; import java.util.Collection; @@ -21,20 +44,20 @@ @net.jcip.annotations.NotThreadSafe public class FrameBlockConcurrentCopyTest extends AutomatedTestBase { - private final static String TEST_NAME = "FrameBlockConcurrentCopyTest"; - private final static String TEST_DIR = "functions/frame/"; - private final static String TEST_CLASS_DIR = TEST_DIR + FrameBlockConcurrentCopyTest.class.getSimpleName() + "/"; + private final static String TEST_NAME = "FrameBlockConcurrentCopyTest"; + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameBlockConcurrentCopyTest.class.getSimpleName() + "/"; - private final int _threads; + private final int _threads; - public FrameBlockConcurrentCopyTest(int threads) { - _threads = threads; - } + public FrameBlockConcurrentCopyTest(int threads) { + _threads = threads; + } - @Override - public void setUp() { - addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); - } + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); + } @Parameters public static Collection data() { @@ -43,232 +66,404 @@ public static Collection data() { }); } - /** - * Boolean Array Consistency: Verifies that concurrent writes of "All True" or "All False" - * rows do not result in mixed rows. - */ - @Test - public void testBooleanArrayConsistency() { - final int COLS = 64; - final int ITERATIONS = 100; - - for(int iter = 0; iter < ITERATIONS; iter++) { - ValueType[] schema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); - FrameBlock target = new FrameBlock(schema, 1); - target.ensureAllocatedColumns(1); - - // all false - for(int c = 0; c < COLS; c++) target.set(0, c, false); - - CountDownLatch startLatch = new CountDownLatch(1); - CountDownLatch doneLatch = new CountDownLatch(_threads); - AtomicInteger errors = new AtomicInteger(0); - - for(int t = 0; t < _threads; t++) { - final int threadId = t; - new Thread(() -> { - try { - startLatch.await(); - - // pattern - boolean pattern = (threadId % 2 == 0); - ValueType[] subSchema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); - - // source blocks - FrameBlock source = new FrameBlock(subSchema, 1); - source.ensureAllocatedColumns(1); - for(int c = 0; c < COLS; c++) source.set(0, c, pattern); - - // repeatedly copy to the shared target - for(int i = 0; i < 50; i++) { - // change pattern - for(int c = 0; c < COLS; c++) source.set(0, c, !pattern); - pattern = !pattern; - - target.copy(0, 0, 0, COLS - 1, source); - } - } catch (Exception e) { - errors.incrementAndGet(); - } finally { - doneLatch.countDown(); - } - }).start(); - } - - startLatch.countDown(); - try { - doneLatch.await(); - } catch (InterruptedException e) { - Assert.fail("Test interrupted: " + e.getMessage()); - } - - Assert.assertEquals("Exceptions occurred inside worker threads", 0, errors.get()); - - // The row must be uniform (All True/False), mix row indicates overwriting - Boolean firstVal = (Boolean) target.get(0, 0); - for(int c = 1; c < COLS; c++) { - Boolean val = (Boolean) target.get(0, c); - if(val != firstVal) { - Assert.fail("Inconsistent array detected! Found mixed values in what should be an atomic block write."); - } - } - } - } - - /** - * Non-Boolean Type Safety: Verifies that standard types are thread-safe - * when threads write to disjoint rows. - */ - @Test - public void testNonBooleanTypesNoSync() { - final int ITERATIONS = 20; - final int COLS = 5; - ValueType[] types = {ValueType.INT64, ValueType.FP64, ValueType.STRING}; - - for(ValueType type : types) { - for(int iter = 0; iter < ITERATIONS; iter++) { - // 1 row per threat to check disjoint writing - ValueType[] schema = UtilFunctions.nCopies(COLS, type); - FrameBlock target = new FrameBlock(schema, _threads); - target.ensureAllocatedColumns(_threads); - - CyclicBarrier barrier = new CyclicBarrier(_threads); - AtomicInteger errors = new AtomicInteger(0); - Thread[] threads = new Thread[_threads]; - - for(int t = 0; t < _threads; t++) { - final int threadId = t; - threads[t] = new Thread(() -> { - try { - barrier.await(); - - FrameBlock source = new FrameBlock(schema, 1); - source.ensureAllocatedColumns(1); - - // value based on threadID - for(int c = 0; c < COLS; c++) { - initializeCell(source, 0, c, type, threadId + c); - } - - // Copy source into target at specific row index - target.copy(threadId, threadId, 0, COLS - 1, source); - - } catch (Exception e) { - errors.incrementAndGet(); - } - }); - threads[t].start(); - } - - try { - for(Thread t : threads) t.join(); - } catch (InterruptedException e) { - Assert.fail("Test interrupted: " + e.getMessage()); - } - - Assert.assertEquals("Thread errors detected for type " + type, 0, errors.get()); - - for(int r = 0; r < _threads; r++) { - for(int c = 0; c < COLS; c++) { - Object val = target.get(r, c); - verifyCell(val, type, r + c); - } - } - } - } - } - - /** - * Test 3: High-Contention Bit Packing Stress Test - * Race-condition test - * Threads focus rows within shared column - */ - @Test - public void testBooleanBitPacking() { - for(int iter = 0; iter < 10; iter++) { - final int ROWS = 1024; - final int STRESS_LOOPS = 2000; - - ValueType[] schema = new ValueType[]{ValueType.BOOLEAN}; - FrameBlock target = new FrameBlock(schema, ROWS); - target.ensureAllocatedColumns(ROWS); - - //all false - for(int r = 0; r < ROWS; r++) target.set(r, 0, false); - - // pre-allocate source blocks - FrameBlock[] sources = new FrameBlock[ROWS]; - for(int i = 0; i < ROWS; i++) { - sources[i] = new FrameBlock(schema, 1); - sources[i].ensureAllocatedColumns(1); - sources[i].set(0, 0, true); - } - - CyclicBarrier barrier = new CyclicBarrier(ROWS); - AtomicInteger exceptions = new AtomicInteger(0); - Thread[] threads = new Thread[ROWS]; - - for(int i = 0; i < ROWS; i++) { - final int rowIndex = i; - threads[i] = new Thread(() -> { - try { - barrier.await(); - - // try to maximize overlapp - for(int k=0; k { + try { + startLatch.await(); + + // pattern + boolean pattern = (threadId % 2 == 0); + ValueType[] subSchema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); + + // source blocks + FrameBlock source = new FrameBlock(subSchema, 1); + source.ensureAllocatedColumns(1); + for(int c = 0; c < COLS; c++) + source.set(0, c, pattern); + + // repeatedly copy to the shared target + for(int i = 0; i < 50; i++) { + // change pattern + for(int c = 0; c < COLS; c++) + source.set(0, c, !pattern); + pattern = !pattern; + + target.copy(0, 0, 0, COLS - 1, source); + } + } + catch(Exception e) { + errors.incrementAndGet(); + } + finally { + doneLatch.countDown(); + } + }).start(); + } + + startLatch.countDown(); + try { + doneLatch.await(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Exceptions occurred inside worker threads", 0, errors.get()); + + // The row must be uniform (All True/False), mix row indicates overwriting + Boolean firstVal = (Boolean) target.get(0, 0); + for(int c = 1; c < COLS; c++) { + Boolean val = (Boolean) target.get(0, c); + if(val != firstVal) { + Assert.fail( + "Inconsistent array detected! Found mixed values in what should be an atomic block write."); + } + } + } + } + + /** + * Non-Boolean Type Safety: Verifies that standard types are thread-safe when threads write to disjoint rows. + */ + @Test + public void testSafeTypesNoSync() { + final int ITERATIONS = 20; + final int COLS = 5; + ValueType[] types = { + ValueType.BOOLEAN, + ValueType.INT64, + ValueType.FP64, + ValueType.STRING, + ValueType.INT32, + ValueType.FP32, + ValueType.UINT4, + ValueType.CHARACTER, + ValueType.HASH32, + ValueType.HASH64 + }; + + for(ValueType type : types) { + for(int iter = 0; iter < ITERATIONS; iter++) { + // 1 row per threat to check disjoint writing + ValueType[] schema = UtilFunctions.nCopies(COLS, type); + FrameBlock target = new FrameBlock(schema, _threads); + target.ensureAllocatedColumns(_threads); + + CyclicBarrier barrier = new CyclicBarrier(_threads); + AtomicInteger errors = new AtomicInteger(0); + Thread[] threads = new Thread[_threads]; + + for(int t = 0; t < _threads; t++) { + final int threadId = t; + threads[t] = new Thread(() -> { + try { + barrier.await(); + + FrameBlock source = new FrameBlock(schema, 1); + source.ensureAllocatedColumns(1); + // value based on threadID + for(int c = 0; c < COLS; c++) { + initializeCell(source, 0, c, type, threadId + c); + } + // Copy source into target at specific row index + target.copy(threadId, threadId, 0, COLS - 1, source); + + } + catch(Exception e) { + errors.incrementAndGet(); + } + }); + threads[t].start(); + } + + try { + for(Thread t : threads) + t.join(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Thread errors detected for type " + type, 0, errors.get()); + + for(int r = 0; r < _threads; r++) { + for(int c = 0; c < COLS; c++) { + Object val = target.get(r, c); + verifyCell(val, type, r + c); + } + } + } + } + } + + /** + * Test 3: High-Contention Bit Packing Stress Test Race-condition test Threads focus rows within shared column + */ + @Test + public void testBooleanBitPacking() { + for(int iter = 0; iter < 10; iter++) { + final int ROWS = 1024; + final int STRESS_LOOPS = 2000; + + ValueType[] schema = new ValueType[] {ValueType.BOOLEAN}; + FrameBlock target = new FrameBlock(schema, ROWS); + target.ensureAllocatedColumns(ROWS); + + //all false + for(int r = 0; r < ROWS; r++) + target.set(r, 0, false); + + // pre-allocate source blocks + FrameBlock[] sources = new FrameBlock[ROWS]; + for(int i = 0; i < ROWS; i++) { + sources[i] = new FrameBlock(schema, 1); + sources[i].ensureAllocatedColumns(1); + sources[i].set(0, 0, true); + } + + CyclicBarrier barrier = new CyclicBarrier(ROWS); + AtomicInteger exceptions = new AtomicInteger(0); + Thread[] threads = new Thread[ROWS]; + + for(int i = 0; i < ROWS; i++) { + final int rowIndex = i; + threads[i] = new Thread(() -> { + try { + barrier.await(); + + // try to maximize overlapp + for(int k = 0; k < STRESS_LOOPS; k++) { + target.copy(rowIndex, rowIndex, 0, 0, sources[rowIndex]); + } + } + catch(Exception e) { + exceptions.incrementAndGet(); + } + }); + threads[i].start(); + } + + try { + for(Thread t : threads) + t.join(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Exceptions occurred in threads", 0, exceptions.get()); + + // row should be TRUE, any FALSE value means update was overwritten by another thread + int trueCount = 0; + for(int r = 0; r < ROWS; r++) { + Object val = target.get(r, 0); + if(val != null && (Boolean) val) + trueCount++; + } + + Assert.assertEquals("Race condition. Lost updates in iteration " + iter, ROWS, trueCount); + } + } + + /** + * Test 4: OptionalArray Locking Test + */ + @Test + public void testOptionalArray() { + for(int i = 0; i < 10; i++) { + System.out.println("--- RUN " + (i+1) + " ---"); + runOptional(); + } + } + + @SuppressWarnings("unchecked") + private void runOptional() { + int rows = 50000; + ValueType type = ValueType.FP64; + FrameBlock target = new FrameBlock(new ValueType[]{type}, rows); + target.ensureAllocatedColumns(rows); + + // target (optional array) + target.setColumn(0, ArrayFactory.allocateOptional(type, rows)); + + // source 1 (non-null) + FrameBlock sourceValid = new FrameBlock(new ValueType[]{type}, rows); + sourceValid.ensureAllocatedColumns(rows); + Array sCol1 = ArrayFactory.allocateOptional(type, rows); + ((OptionalArray)sCol1).fill(1.0d); + sourceValid.setColumn(0, sCol1); + + // source 2 (all null) + FrameBlock sourceNull = new FrameBlock(new ValueType[]{type}, rows); + sourceNull.ensureAllocatedColumns(rows); + Array sCol2 = ArrayFactory.allocateOptional(type, rows); + ((OptionalArray)sCol2).fill((Double) null); + sourceNull.setColumn(0, sCol2); + + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicInteger errors = new AtomicInteger(0); + + Thread t1 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows-1, 0, 0, sourceValid); + } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + }); + + Thread t2 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows-1, 0, 0, sourceNull); + } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + }); + + t1.start(); t2.start(); + try { t1.join(); t2.join(); } catch (InterruptedException e) {} + + int validCount = 0; + int nullCount = 0; + + for(int r = 1; r < rows; r++) { + if(target.get(r, 0) == null) nullCount++; + else validCount++; + } + + System.out.println("Valid: " + validCount + ", Null: " + nullCount); + + if (validCount > 0 && nullCount > 0) { + Assert.fail("Race condition OptionalArray:\n" + + "Thread 1 (non-null) and Thread 2 (null) mixed instructions!\n" + + "Result: " + validCount + " valid, " + nullCount + " null."); + } + } + + /** + * Test 5: BitSetArray Locking Test + */ + @Test + public void testBitSetArray() { + for(int i = 0; i < 20; i++) { + System.out.println("--- BitSet RUN " + (i+1) + " ---"); + runBitSet(); + } + } + + private void runBitSet() { + int rows = 50000; + ValueType type = ValueType.BOOLEAN; + FrameBlock target = new FrameBlock(new ValueType[]{type}, rows); + target.ensureAllocatedColumns(rows); + // initialize false + target.setColumn(0, ArrayFactory.allocate(type, rows)); + + // Source 1 (all true) + FrameBlock sourceTrue = new FrameBlock(new ValueType[]{type}, rows); + sourceTrue.ensureAllocatedColumns(rows); + Array sCol1 = ArrayFactory.allocate(type, rows); + // Fill with TRUE + for(int k=0; k sCol2 = ArrayFactory.allocate(type, rows); + // fill with false + for(int k=0; k { + try { + barrier.await(); + target.copy(1, rows-1, 0, 0, sourceTrue); + } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + }); + + Thread t2 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows-1, 0, 0, sourceFalse); + } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + }); + + t1.start(); t2.start(); + try { t1.join(); t2.join(); } catch (InterruptedException e) {} + + // Verification + int trueCount = 0; + int falseCount = 0; + + for(int r = 3; r < rows; r++) { + Boolean val = (Boolean) target.get(r, 0); + if(val != null && val) trueCount++; + else falseCount++; + } + + System.out.println("True: " + trueCount + ", False: " + falseCount); + + if (trueCount > 0 && falseCount > 0) { + Assert.fail("Race condition in BitSetArray:\n" + + "Thread True and Thread False mixed instructions!\n" + + "Result: " + trueCount + " True, " + falseCount + " False."); + } + } + + private void initializeCell(FrameBlock fb, int row, int col, ValueType type, int seed) { + switch(type) { + case BOOLEAN: fb.set(row, col, (seed % 2 == 0)); break; + case INT32: fb.set(row, col, seed); break; + case INT64: fb.set(row, col, (long) seed); break; + case FP32: fb.set(row, col, (float) seed); break; + case FP64: fb.set(row, col, (double) seed); break; + case UINT8: fb.set(row, col, (int) (seed % 127)); break; + case UINT4: fb.set(row, col, (int) (seed % 15)); break; + case CHARACTER: fb.set(row, col, (char) ('a' + (seed % 26))); break; + case STRING: fb.set(row, col, "v" + seed); break; + case HASH32: fb.set(row, col, seed); break; + case HASH64: fb.set(row, col, (long) seed); break; + default: + fb.set(row, col, String.valueOf(seed)); + } + } + + private void verifyCell(Object val, ValueType type, int expectedSeed) { + Assert.assertNotNull("Value should not be null", val); + switch(type) { + case INT32: Assert.assertEquals((int)expectedSeed, ((Integer)val).intValue()); break; + case INT64: Assert.assertEquals((long)expectedSeed, ((Long)val).longValue()); break; + case FP32: Assert.assertEquals((float)expectedSeed, ((Float)val).floatValue(), 0.0001); break; + case FP64: Assert.assertEquals((double)expectedSeed, ((Double)val).doubleValue(), 0.0001); break; + case UINT8: Assert.assertEquals((int)(expectedSeed % 127), ((Integer)val).intValue()); break; + case CHARACTER: Assert.assertEquals((char)('a' + (expectedSeed%26)), ((Character)val).charValue()); break; + case STRING: Assert.assertEquals("v" + expectedSeed, val); break; + case BOOLEAN: Assert.assertEquals((expectedSeed % 2 == 0), val); break; + default: break; + } + } +} From 4170adc28173ea8af78d9887ec7807010a7b02a4 Mon Sep 17 00:00:00 2001 From: Yi Peng Date: Wed, 28 Jan 2026 21:59:34 +0100 Subject: [PATCH 5/6] Remove unused imports --- .../sysds/test/functions/io/FrameBlockConcurrentCopyTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java index d02a1282f48..8b87e8a3ce2 100644 --- a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java +++ b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java @@ -19,8 +19,6 @@ package org.apache.sysds.test.functions.io; -import javassist.bytecode.ByteArray; -import org.apache.hadoop.security.SaslOutputStream; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.frame.data.columns.*; @@ -32,7 +30,6 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import scala.Enumeration; import java.util.Arrays; import java.util.Collection; From 38562dcbd7e4dd4a846c31448b5556302fffd7c4 Mon Sep 17 00:00:00 2001 From: Yi Peng Date: Thu, 29 Jan 2026 17:31:07 +0100 Subject: [PATCH 6/6] Code refactoring White-Space Checkstyle --- .../sysds/runtime/frame/data/FrameBlock.java | 140 ++++++------ .../io/FrameBlockConcurrentCopyTest.java | 209 +++++++++++------- 2 files changed, 208 insertions(+), 141 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java index b40bb1e4078..fa88a7504fa 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java @@ -154,7 +154,7 @@ public FrameBlock(ValueType[] schema, String[][] data) { /** * FrameBlock constructor with constant - * + * * @param schema The schema to allocate (also specifying number of columns) * @param constant The constant to allocate in all cells * @param nRow the number of rows @@ -169,9 +169,9 @@ public FrameBlock(ValueType[] schema, String constant, int nRow) { /** * allocate a FrameBlock with the given data arrays. - * + * * The data is in row major, making the first dimension number of rows. second number of columns. - * + * * @param schema the schema to allocate * @param names The names of the column * @param data The data. @@ -200,7 +200,7 @@ public FrameBlock(ValueType[] schema, String[] colNames, ColumnMetadata[] meta, /** * Create a FrameBlock containing columns of the specified arrays - * + * * @param data The column data contained */ public FrameBlock(Array[] data) { @@ -216,15 +216,16 @@ public FrameBlock(Array[] data) { if(debug) { for(int i = 0; i < data.length; i++) { if(data[i].size() != getNumRows()) - throw new DMLRuntimeException("Invalid Frame allocation with different size arrays " - + data[i].size() + " vs " + getNumRows()); + throw new DMLRuntimeException( + "Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + + getNumRows()); } } } /** * Create a FrameBlock containing columns of the specified arrays and names - * + * * @param data The column data contained * @param colnames The column names of the contained columns */ @@ -241,8 +242,9 @@ public FrameBlock(Array[] data, String[] colnames) { if(debug) { for(int i = 0; i < data.length; i++) { if(data[i].size() != getNumRows()) - throw new DMLRuntimeException("Invalid Frame allocation with different size arrays " - + data[i].size() + " vs " + getNumRows()); + throw new DMLRuntimeException( + "Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + + getNumRows()); } } } @@ -445,7 +447,7 @@ private void ensureAllocateMeta() { /** * Checks for matching column sizes in case of existing columns. - * + * * If the check parses the number of rows is reassigned to the given newLen * * @param newLen number of rows to compare with existing number of rows @@ -522,7 +524,7 @@ public void set(int r, int c, Object val) { /** * Sets the value in position (r,c), to the input string value, and at the individual arrays, convert to correct * type. - * + * * @param r row index * @param c column index * @param val value to set at specified position @@ -555,11 +557,12 @@ public void reset() { /** * Sets row at position r to the input array of objects, corresponding to the schema. - * @param r row index + * + * @param r row index * @param row array of objects */ public void setRow(int r, Object[] row) { - for (int i = 0; i < row.length; i++) { + for(int i = 0; i < row.length; i++) { set(r, i, row[i]); } } @@ -568,7 +571,7 @@ public void setRow(int r, Object[] row) { * Append a row to the end of the data frame, where all row fields are boxed objects according to the schema. * * Append row should be avoided if possible. - * + * * @param row array of objects */ public void appendRow(Object[] row) { @@ -594,7 +597,7 @@ public void appendRow(Object[] row) { * Append a row to the end of the data frame, where all row fields are string encoded. * * Append row should be avoided if possible - * + * * @param row array of strings */ public void appendRow(String[] row) { @@ -690,7 +693,7 @@ public void appendColumn(double[] col) { /** * Append the metadata associated with adding a column. - * + * * @param vt The Value type */ private void appendColumnMetaData(ValueType vt) { @@ -715,8 +718,8 @@ public void appendColumns(double[][] cols) { for(int j = 0; j < ncol; j++) tmpData[j] = ArrayFactory.create(cols[j]); _colnames = empty ? null : ArrayUtils.addAll(getColumnNames(), createColNames(getNumColumns(), ncol)); // before - // schema - // modification + // schema + // modification _schema = empty ? tmpSchema : ArrayUtils.addAll(_schema, tmpSchema); _coldata = empty ? tmpData : ArrayUtils.addAll(_coldata, tmpData); _nRow = cols[0].length; @@ -729,7 +732,7 @@ public static FrameBlock convertToFrameBlock(MatrixBlock mb, ValueType[] schema, /** * Add a column of already allocated Array type. - * + * * @param col column to add. */ public void appendColumn(Array col) { @@ -764,19 +767,20 @@ else if(column != null && column.size() != _nRow) /** * Appends a chunk of data to the end of a specified column. - * + * * @param c column index * @param chunk chunk of data to append */ public void appendColumnChunk(int c, Array chunk) { - if (_coldata == null) { + if(_coldata == null) { _coldata = new Array[getNumColumns()]; } - if (_coldata[c] == null) { + if(_coldata[c] == null) { _coldata[c] = chunk; _nRow = chunk.size(); - } else { + } + else { _coldata[c] = ArrayFactory.append(_coldata[c], chunk); _nRow += chunk.size(); } @@ -786,31 +790,31 @@ public void appendColumnChunk(int c, Array chunk) { /** * Sets a chunk of data to a specified column, starting at the specified offset. - * - * @param c column index + * + * @param c column index * @param chunk chunk of data to set * @param offset offset position where it should set the chunk * @param colSize size of columns, in case columns aren't initialized yet */ public void setColumnChunk(int c, Array chunk, int offset, int colSize) { - if (_coldata == null) { + if(_coldata == null) { _coldata = new Array[getNumColumns()]; _nRow = colSize; } - if (_coldata[c] == null) { + if(_coldata[c] == null) { _coldata[c] = ArrayFactory.allocate(chunk.getValueType(), _nRow); } - if (_coldata[c].getValueType() != chunk.getValueType()) { - throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " + - _coldata[c].getValueType() + " but got " + chunk.getValueType()); + if(_coldata[c].getValueType() != chunk.getValueType()) { + throw new DMLRuntimeException( + "ValueType mismatch in setColumnChunk: expected " + _coldata[c].getValueType() + " but got " + + chunk.getValueType()); } ArrayFactory.set(_coldata[c], chunk, offset, offset + chunk.size() - 1, _nRow); } - @Override public void write(DataOutput out) throws IOException { final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault(); @@ -904,7 +908,7 @@ public long getInMemorySize() { // meta data array (overhead and entries) size += MemoryEstimates.objectArrayCost(clen); - if( _colmeta != null ) + if(_colmeta != null) for(ColumnMetadata mtd : _colmeta) size += mtd == null ? 8 : mtd.getInMemorySize(); @@ -923,7 +927,7 @@ private double arraysSizeInMemory() { for(int j = 0; j < clen; j++) size += ArrayFactory.getInMemorySize(_schema[j], rlen, true); else {// allocated - if((rlen > 1000 || clen > 10 )&& ConfigurationManager.isParallelIOEnabled()) { + if((rlen > 1000 || clen > 10) && ConfigurationManager.isParallelIOEnabled()) { final ExecutorService pool = CommonThreadPool.get(); try { List> f = new ArrayList<>(clen); @@ -983,7 +987,7 @@ public boolean isShallowSerialize() { public boolean isShallowSerialize(boolean inclConvert) { // shallow serialize if non-string schema because a frame block // is always dense but strings have large array overhead per cell - if( _schema != null ) + if(_schema != null) for(int j = 0; j < _schema.length; j++) if(!_coldata[j].isShallowSerialize()) return false; @@ -1012,8 +1016,9 @@ public void compactEmptyBlock() { */ public FrameBlock binaryOperations(BinaryOperator bop, FrameBlock that, FrameBlock out) { if(getNumColumns() != that.getNumColumns() && getNumRows() != that.getNumColumns()) - throw new DMLRuntimeException("Frame dimension mismatch " + getNumRows() + " * " + getNumColumns() + " != " - + that.getNumRows() + " * " + that.getNumColumns()); + throw new DMLRuntimeException( + "Frame dimension mismatch " + getNumRows() + " * " + getNumColumns() + " != " + that.getNumRows() + + " * " + that.getNumColumns()); String[][] outputData = new String[getNumRows()][getNumColumns()]; // compare output value, incl implicit type promotion if necessary if(bop.fn instanceof ValueComparisonFunction) { @@ -1089,15 +1094,16 @@ public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, in if(rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows() || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) - + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]."); + "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) + + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]."); } if((ru - rl + 1) < rhsFrame.getNumRows() || (cu - cl + 1) < rhsFrame.getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: " + "dimensions of the source frame [" + rhsFrame.getNumRows() + "x" - + rhsFrame.getNumColumns() + "] " + "do not match the shape of the frame specified by indices [" - + (rl + 1) + ":" + (ru + 1) + ", " + (cl + 1) + ":" + (cu + 1) + "]."); + "Invalid values for frame indexing: " + "dimensions of the source frame [" + rhsFrame.getNumRows() + + "x" + rhsFrame.getNumColumns() + "] " + + "do not match the shape of the frame specified by indices [" + (rl + 1) + ":" + (ru + 1) + ", " + + (cl + 1) + ":" + (cu + 1) + "]."); } // allocate output frame (incl deep copy schema) @@ -1117,7 +1123,7 @@ public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, in // fast-path for homogeneous column schemas if(_schema[j] == rhsFrame._schema[j - cl]) tmp.set(rl, ru, rhsFrame._coldata[j - cl]); - // general-path for heterogeneous column schemas + // general-path for heterogeneous column schemas else { for(int i = rl; i <= ru; i++) tmp.set(i, UtilFunctions.objectToObject(_schema[j], rhsFrame._coldata[j - cl].get(i - rl))); @@ -1209,8 +1215,8 @@ protected void validateSliceArgument(int rl, int ru, int cl, int cu) { if(rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows() || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) - + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]"); + "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) + + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]"); } } @@ -1264,10 +1270,10 @@ public void copy(FrameBlock src) { _msize = -1; } - public FrameBlock copyShallow(){ + public FrameBlock copyShallow() { FrameBlock ret = new FrameBlock(); ret._nRow = _nRow; - ret._msize = _msize; + ret._msize = _msize; final int nCol = getNumColumns(); if(_coldata != null) ret._coldata = Arrays.copyOf(_coldata, nCol); @@ -1284,7 +1290,7 @@ public FrameBlock copyShallow(){ * Copy src matrix into the index range of the existing current matrix. * * This is used to copy smaller blocks into a larger block, for instance in binary reading. - * + * * @param rl row start * @param ru row end inclusive * @param cl col start @@ -1303,11 +1309,8 @@ public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { for(int j = cl; j <= cu; j++) { Array col = _coldata[j]; - boolean isUnsafe = (col == null) || - (col instanceof OptionalArray) || - (col instanceof BitSetArray) || - (col instanceof RaggedArray) || - (col instanceof ACompressedArray); + boolean isUnsafe = (col == null) || (col instanceof OptionalArray) || (col instanceof BitSetArray) || + (col instanceof RaggedArray) || (col instanceof ACompressedArray); if(isUnsafe) { if(_columnLocks == null || _columnLocks.get() == null) { @@ -1356,8 +1359,9 @@ public FrameBlock merge(FrameBlock that) { // check dimensions (before potentially copy to prevent implicit dimension change) if(getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns()) - throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target=" + getNumRows() + "x" - + getNumColumns() + ", source=" + that.getNumRows() + "x" + that.getNumColumns() + ")"); + throw new DMLRuntimeException( + "Dimension mismatch on merge disjoint (target=" + getNumRows() + "x" + getNumColumns() + ", source=" + + that.getNumRows() + "x" + that.getNumColumns() + ")"); // meta data copy if necessary for(int j = 0; j < getNumColumns(); j++) @@ -1453,15 +1457,16 @@ public final FrameBlock applySchema(FrameBlock schema, int k) { /** * Drop the cell value which does not confirms to the data type of its column - * + * * @param schema of the frame * @return original frame where invalid values are replaced with null */ public FrameBlock dropInvalidType(FrameBlock schema) { // sanity checks if(this.getNumColumns() != schema.getNumColumns()) - throw new DMLException("mismatch in number of columns in frame and its schema " + this.getNumColumns() - + " != " + schema.getNumColumns()); + throw new DMLException( + "mismatch in number of columns in frame and its schema " + this.getNumColumns() + " != " + + schema.getNumColumns()); // extract the schema in String array String[] schemaString = IteratorFactory.getStringRowIterator(schema).next(); @@ -1488,8 +1493,9 @@ else if(schemaCol.contains("STRING")) if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type.equals("INT")) && !(dataType == ValueType.BOOLEAN && type.equals("FP"))) { - LOG.warn("Datatype detected: " + dataType + " where expected: " + schemaString[i] + " col: " - + (i + 1) + ", row:" + (j + 1)); + LOG.warn( + "Datatype detected: " + dataType + " where expected: " + schemaString[i] + " col: " + (i + 1) + + ", row:" + (j + 1)); this.set(j, i, null); } @@ -1735,8 +1741,8 @@ else if(varname.length == 2) { return (FrameMapFunction) CodegenUtils.compileClass(cname, sb.toString()).getDeclaredConstructor() .newInstance(); } - catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException - | NoSuchMethodException | SecurityException e) { + catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | + NoSuchMethodException | SecurityException e) { throw new DMLRuntimeException("Failed to compile FrameMapFunction.", e); } } @@ -1762,12 +1768,12 @@ public FrameBlock replaceOperations(String pattern, String replacement) { boolean NaNp = "NaN".equals(pattern); boolean NaNr = "NaN".equals(replacement); - ValueType patternType = UtilFunctions - .isBoolean(pattern) ? ValueType.BOOLEAN : (NumberUtils.isCreatable(pattern) | - NaNp ? (UtilFunctions.isIntegerNumber(pattern) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); - ValueType replacementType = UtilFunctions.isBoolean(replacement) ? ValueType.BOOLEAN : (NumberUtils - .isCreatable(replacement) | - NaNr ? (UtilFunctions.isIntegerNumber(replacement) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); + ValueType patternType = UtilFunctions.isBoolean(pattern) ? ValueType.BOOLEAN : ( + NumberUtils.isCreatable(pattern) | NaNp ? (UtilFunctions.isIntegerNumber( + pattern) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); + ValueType replacementType = UtilFunctions.isBoolean(replacement) ? ValueType.BOOLEAN : ( + NumberUtils.isCreatable(replacement) | NaNr ? (UtilFunctions.isIntegerNumber( + replacement) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); if(patternType != replacementType || !ValueType.isSameTypeString(patternType, replacementType)) throw new DMLRuntimeException( diff --git a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java index 8b87e8a3ce2..c97d0f05243 100644 --- a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java +++ b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java @@ -56,12 +56,10 @@ public void setUp() { addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); } - @Parameters - public static Collection data() { - return Arrays.asList(new Object[][] { - {20}, {50} - }); - } + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] {{20}, {50}}); + } /** * Boolean Array Consistency: Verifies that concurrent writes of "All True" or "All False" rows do not result in @@ -149,18 +147,8 @@ public void testBooleanArrayConsistency() { public void testSafeTypesNoSync() { final int ITERATIONS = 20; final int COLS = 5; - ValueType[] types = { - ValueType.BOOLEAN, - ValueType.INT64, - ValueType.FP64, - ValueType.STRING, - ValueType.INT32, - ValueType.FP32, - ValueType.UINT4, - ValueType.CHARACTER, - ValueType.HASH32, - ValueType.HASH64 - }; + ValueType[] types = {ValueType.BOOLEAN, ValueType.INT64, ValueType.FP64, ValueType.STRING, ValueType.INT32, + ValueType.FP32, ValueType.UINT4, ValueType.CHARACTER, ValueType.HASH32, ValueType.HASH64}; for(ValueType type : types) { for(int iter = 0; iter < ITERATIONS; iter++) { @@ -291,7 +279,7 @@ public void testBooleanBitPacking() { @Test public void testOptionalArray() { for(int i = 0; i < 10; i++) { - System.out.println("--- RUN " + (i+1) + " ---"); + System.out.println("--- RUN " + (i + 1) + " ---"); runOptional(); } } @@ -300,24 +288,24 @@ public void testOptionalArray() { private void runOptional() { int rows = 50000; ValueType type = ValueType.FP64; - FrameBlock target = new FrameBlock(new ValueType[]{type}, rows); + FrameBlock target = new FrameBlock(new ValueType[] {type}, rows); target.ensureAllocatedColumns(rows); // target (optional array) target.setColumn(0, ArrayFactory.allocateOptional(type, rows)); // source 1 (non-null) - FrameBlock sourceValid = new FrameBlock(new ValueType[]{type}, rows); + FrameBlock sourceValid = new FrameBlock(new ValueType[] {type}, rows); sourceValid.ensureAllocatedColumns(rows); Array sCol1 = ArrayFactory.allocateOptional(type, rows); - ((OptionalArray)sCol1).fill(1.0d); + ((OptionalArray) sCol1).fill(1.0d); sourceValid.setColumn(0, sCol1); // source 2 (all null) - FrameBlock sourceNull = new FrameBlock(new ValueType[]{type}, rows); + FrameBlock sourceNull = new FrameBlock(new ValueType[] {type}, rows); sourceNull.ensureAllocatedColumns(rows); Array sCol2 = ArrayFactory.allocateOptional(type, rows); - ((OptionalArray)sCol2).fill((Double) null); + ((OptionalArray) sCol2).fill((Double) null); sourceNull.setColumn(0, sCol2); CyclicBarrier barrier = new CyclicBarrier(2); @@ -326,34 +314,50 @@ private void runOptional() { Thread t1 = new Thread(() -> { try { barrier.await(); - target.copy(1, rows-1, 0, 0, sourceValid); - } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + target.copy(1, rows - 1, 0, 0, sourceValid); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } }); Thread t2 = new Thread(() -> { try { barrier.await(); - target.copy(1, rows-1, 0, 0, sourceNull); - } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + target.copy(1, rows - 1, 0, 0, sourceNull); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } }); - t1.start(); t2.start(); - try { t1.join(); t2.join(); } catch (InterruptedException e) {} + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } + catch(InterruptedException e) { + } int validCount = 0; int nullCount = 0; for(int r = 1; r < rows; r++) { - if(target.get(r, 0) == null) nullCount++; - else validCount++; + if(target.get(r, 0) == null) + nullCount++; + else + validCount++; } System.out.println("Valid: " + validCount + ", Null: " + nullCount); - if (validCount > 0 && nullCount > 0) { - Assert.fail("Race condition OptionalArray:\n" + - "Thread 1 (non-null) and Thread 2 (null) mixed instructions!\n" + - "Result: " + validCount + " valid, " + nullCount + " null."); + if(validCount > 0 && nullCount > 0) { + Assert.fail( + "Race condition OptionalArray:\n" + "Thread 1 (non-null) and Thread 2 (null) mixed instructions!\n" + + "Result: " + validCount + " valid, " + nullCount + " null."); } } @@ -363,7 +367,7 @@ private void runOptional() { @Test public void testBitSetArray() { for(int i = 0; i < 20; i++) { - System.out.println("--- BitSet RUN " + (i+1) + " ---"); + System.out.println("--- BitSet RUN " + (i + 1) + " ---"); runBitSet(); } } @@ -371,25 +375,27 @@ public void testBitSetArray() { private void runBitSet() { int rows = 50000; ValueType type = ValueType.BOOLEAN; - FrameBlock target = new FrameBlock(new ValueType[]{type}, rows); + FrameBlock target = new FrameBlock(new ValueType[] {type}, rows); target.ensureAllocatedColumns(rows); // initialize false target.setColumn(0, ArrayFactory.allocate(type, rows)); // Source 1 (all true) - FrameBlock sourceTrue = new FrameBlock(new ValueType[]{type}, rows); + FrameBlock sourceTrue = new FrameBlock(new ValueType[] {type}, rows); sourceTrue.ensureAllocatedColumns(rows); Array sCol1 = ArrayFactory.allocate(type, rows); // Fill with TRUE - for(int k=0; k sCol2 = ArrayFactory.allocate(type, rows); // fill with false - for(int k=0; k { try { barrier.await(); - target.copy(1, rows-1, 0, 0, sourceTrue); - } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + target.copy(1, rows - 1, 0, 0, sourceTrue); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } }); Thread t2 = new Thread(() -> { try { barrier.await(); - target.copy(1, rows-1, 0, 0, sourceFalse); - } catch (Exception e) { e.printStackTrace(); errors.incrementAndGet(); } + target.copy(1, rows - 1, 0, 0, sourceFalse); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } }); - t1.start(); t2.start(); - try { t1.join(); t2.join(); } catch (InterruptedException e) {} + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } + catch(InterruptedException e) { + } // Verification int trueCount = 0; @@ -418,32 +438,56 @@ private void runBitSet() { for(int r = 3; r < rows; r++) { Boolean val = (Boolean) target.get(r, 0); - if(val != null && val) trueCount++; - else falseCount++; + if(val != null && val) + trueCount++; + else + falseCount++; } System.out.println("True: " + trueCount + ", False: " + falseCount); - if (trueCount > 0 && falseCount > 0) { - Assert.fail("Race condition in BitSetArray:\n" + - "Thread True and Thread False mixed instructions!\n" + - "Result: " + trueCount + " True, " + falseCount + " False."); + if(trueCount > 0 && falseCount > 0) { + Assert.fail( + "Race condition in BitSetArray:\n" + "Thread True and Thread False mixed instructions!\n" + "Result: " + + trueCount + " True, " + falseCount + " False."); } } private void initializeCell(FrameBlock fb, int row, int col, ValueType type, int seed) { switch(type) { - case BOOLEAN: fb.set(row, col, (seed % 2 == 0)); break; - case INT32: fb.set(row, col, seed); break; - case INT64: fb.set(row, col, (long) seed); break; - case FP32: fb.set(row, col, (float) seed); break; - case FP64: fb.set(row, col, (double) seed); break; - case UINT8: fb.set(row, col, (int) (seed % 127)); break; - case UINT4: fb.set(row, col, (int) (seed % 15)); break; - case CHARACTER: fb.set(row, col, (char) ('a' + (seed % 26))); break; - case STRING: fb.set(row, col, "v" + seed); break; - case HASH32: fb.set(row, col, seed); break; - case HASH64: fb.set(row, col, (long) seed); break; + case BOOLEAN: + fb.set(row, col, (seed % 2 == 0)); + break; + case INT32: + fb.set(row, col, seed); + break; + case INT64: + fb.set(row, col, (long) seed); + break; + case FP32: + fb.set(row, col, (float) seed); + break; + case FP64: + fb.set(row, col, (double) seed); + break; + case UINT8: + fb.set(row, col, (int) (seed % 127)); + break; + case UINT4: + fb.set(row, col, (int) (seed % 15)); + break; + case CHARACTER: + fb.set(row, col, (char) ('a' + (seed % 26))); + break; + case STRING: + fb.set(row, col, "v" + seed); + break; + case HASH32: + fb.set(row, col, seed); + break; + case HASH64: + fb.set(row, col, (long) seed); + break; default: fb.set(row, col, String.valueOf(seed)); } @@ -452,15 +496,32 @@ private void initializeCell(FrameBlock fb, int row, int col, ValueType type, int private void verifyCell(Object val, ValueType type, int expectedSeed) { Assert.assertNotNull("Value should not be null", val); switch(type) { - case INT32: Assert.assertEquals((int)expectedSeed, ((Integer)val).intValue()); break; - case INT64: Assert.assertEquals((long)expectedSeed, ((Long)val).longValue()); break; - case FP32: Assert.assertEquals((float)expectedSeed, ((Float)val).floatValue(), 0.0001); break; - case FP64: Assert.assertEquals((double)expectedSeed, ((Double)val).doubleValue(), 0.0001); break; - case UINT8: Assert.assertEquals((int)(expectedSeed % 127), ((Integer)val).intValue()); break; - case CHARACTER: Assert.assertEquals((char)('a' + (expectedSeed%26)), ((Character)val).charValue()); break; - case STRING: Assert.assertEquals("v" + expectedSeed, val); break; - case BOOLEAN: Assert.assertEquals((expectedSeed % 2 == 0), val); break; - default: break; + case INT32: + Assert.assertEquals((int) expectedSeed, ((Integer) val).intValue()); + break; + case INT64: + Assert.assertEquals((long) expectedSeed, ((Long) val).longValue()); + break; + case FP32: + Assert.assertEquals((float) expectedSeed, ((Float) val).floatValue(), 0.0001); + break; + case FP64: + Assert.assertEquals((double) expectedSeed, ((Double) val).doubleValue(), 0.0001); + break; + case UINT8: + Assert.assertEquals((int) (expectedSeed % 127), ((Integer) val).intValue()); + break; + case CHARACTER: + Assert.assertEquals((char) ('a' + (expectedSeed % 26)), ((Character) val).charValue()); + break; + case STRING: + Assert.assertEquals("v" + expectedSeed, val); + break; + case BOOLEAN: + Assert.assertEquals((expectedSeed % 2 == 0), val); + break; + default: + break; } } }