diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java index 63cadb43cf4..fa88a7504fa 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java @@ -52,9 +52,7 @@ import org.apache.sysds.runtime.codegen.CodegenUtils; import org.apache.sysds.runtime.controlprogram.caching.CacheBlock; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; -import org.apache.sysds.runtime.frame.data.columns.Array; -import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; -import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata; +import org.apache.sysds.runtime.frame.data.columns.*; import org.apache.sysds.runtime.frame.data.iterators.IteratorFactory; import org.apache.sysds.runtime.frame.data.lib.FrameFromMatrixBlock; import org.apache.sysds.runtime.frame.data.lib.FrameLibAppend; @@ -156,7 +154,7 @@ public FrameBlock(ValueType[] schema, String[][] data) { /** * FrameBlock constructor with constant - * + * * @param schema The schema to allocate (also specifying number of columns) * @param constant The constant to allocate in all cells * @param nRow the number of rows @@ -171,9 +169,9 @@ public FrameBlock(ValueType[] schema, String constant, int nRow) { /** * allocate a FrameBlock with the given data arrays. - * + * * The data is in row major, making the first dimension number of rows. second number of columns. - * + * * @param schema the schema to allocate * @param names The names of the column * @param data The data. @@ -202,7 +200,7 @@ public FrameBlock(ValueType[] schema, String[] colNames, ColumnMetadata[] meta, /** * Create a FrameBlock containing columns of the specified arrays - * + * * @param data The column data contained */ public FrameBlock(Array[] data) { @@ -218,15 +216,16 @@ public FrameBlock(Array[] data) { if(debug) { for(int i = 0; i < data.length; i++) { if(data[i].size() != getNumRows()) - throw new DMLRuntimeException("Invalid Frame allocation with different size arrays " - + data[i].size() + " vs " + getNumRows()); + throw new DMLRuntimeException( + "Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + + getNumRows()); } } } /** * Create a FrameBlock containing columns of the specified arrays and names - * + * * @param data The column data contained * @param colnames The column names of the contained columns */ @@ -243,8 +242,9 @@ public FrameBlock(Array[] data, String[] colnames) { if(debug) { for(int i = 0; i < data.length; i++) { if(data[i].size() != getNumRows()) - throw new DMLRuntimeException("Invalid Frame allocation with different size arrays " - + data[i].size() + " vs " + getNumRows()); + throw new DMLRuntimeException( + "Invalid Frame allocation with different size arrays " + data[i].size() + " vs " + + getNumRows()); } } } @@ -447,7 +447,7 @@ private void ensureAllocateMeta() { /** * Checks for matching column sizes in case of existing columns. - * + * * If the check parses the number of rows is reassigned to the given newLen * * @param newLen number of rows to compare with existing number of rows @@ -524,7 +524,7 @@ public void set(int r, int c, Object val) { /** * Sets the value in position (r,c), to the input string value, and at the individual arrays, convert to correct * type. - * + * * @param r row index * @param c column index * @param val value to set at specified position @@ -557,11 +557,12 @@ public void reset() { /** * Sets row at position r to the input array of objects, corresponding to the schema. - * @param r row index + * + * @param r row index * @param row array of objects */ public void setRow(int r, Object[] row) { - for (int i = 0; i < row.length; i++) { + for(int i = 0; i < row.length; i++) { set(r, i, row[i]); } } @@ -570,7 +571,7 @@ public void setRow(int r, Object[] row) { * Append a row to the end of the data frame, where all row fields are boxed objects according to the schema. * * Append row should be avoided if possible. - * + * * @param row array of objects */ public void appendRow(Object[] row) { @@ -596,7 +597,7 @@ public void appendRow(Object[] row) { * Append a row to the end of the data frame, where all row fields are string encoded. * * Append row should be avoided if possible - * + * * @param row array of strings */ public void appendRow(String[] row) { @@ -692,7 +693,7 @@ public void appendColumn(double[] col) { /** * Append the metadata associated with adding a column. - * + * * @param vt The Value type */ private void appendColumnMetaData(ValueType vt) { @@ -717,8 +718,8 @@ public void appendColumns(double[][] cols) { for(int j = 0; j < ncol; j++) tmpData[j] = ArrayFactory.create(cols[j]); _colnames = empty ? null : ArrayUtils.addAll(getColumnNames(), createColNames(getNumColumns(), ncol)); // before - // schema - // modification + // schema + // modification _schema = empty ? tmpSchema : ArrayUtils.addAll(_schema, tmpSchema); _coldata = empty ? tmpData : ArrayUtils.addAll(_coldata, tmpData); _nRow = cols[0].length; @@ -731,7 +732,7 @@ public static FrameBlock convertToFrameBlock(MatrixBlock mb, ValueType[] schema, /** * Add a column of already allocated Array type. - * + * * @param col column to add. */ public void appendColumn(Array col) { @@ -766,19 +767,20 @@ else if(column != null && column.size() != _nRow) /** * Appends a chunk of data to the end of a specified column. - * + * * @param c column index * @param chunk chunk of data to append */ public void appendColumnChunk(int c, Array chunk) { - if (_coldata == null) { + if(_coldata == null) { _coldata = new Array[getNumColumns()]; } - if (_coldata[c] == null) { + if(_coldata[c] == null) { _coldata[c] = chunk; _nRow = chunk.size(); - } else { + } + else { _coldata[c] = ArrayFactory.append(_coldata[c], chunk); _nRow += chunk.size(); } @@ -788,31 +790,31 @@ public void appendColumnChunk(int c, Array chunk) { /** * Sets a chunk of data to a specified column, starting at the specified offset. - * - * @param c column index + * + * @param c column index * @param chunk chunk of data to set * @param offset offset position where it should set the chunk * @param colSize size of columns, in case columns aren't initialized yet */ public void setColumnChunk(int c, Array chunk, int offset, int colSize) { - if (_coldata == null) { + if(_coldata == null) { _coldata = new Array[getNumColumns()]; _nRow = colSize; } - if (_coldata[c] == null) { + if(_coldata[c] == null) { _coldata[c] = ArrayFactory.allocate(chunk.getValueType(), _nRow); } - if (_coldata[c].getValueType() != chunk.getValueType()) { - throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " + - _coldata[c].getValueType() + " but got " + chunk.getValueType()); + if(_coldata[c].getValueType() != chunk.getValueType()) { + throw new DMLRuntimeException( + "ValueType mismatch in setColumnChunk: expected " + _coldata[c].getValueType() + " but got " + + chunk.getValueType()); } ArrayFactory.set(_coldata[c], chunk, offset, offset + chunk.size() - 1, _nRow); } - @Override public void write(DataOutput out) throws IOException { final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault(); @@ -906,7 +908,7 @@ public long getInMemorySize() { // meta data array (overhead and entries) size += MemoryEstimates.objectArrayCost(clen); - if( _colmeta != null ) + if(_colmeta != null) for(ColumnMetadata mtd : _colmeta) size += mtd == null ? 8 : mtd.getInMemorySize(); @@ -925,7 +927,7 @@ private double arraysSizeInMemory() { for(int j = 0; j < clen; j++) size += ArrayFactory.getInMemorySize(_schema[j], rlen, true); else {// allocated - if((rlen > 1000 || clen > 10 )&& ConfigurationManager.isParallelIOEnabled()) { + if((rlen > 1000 || clen > 10) && ConfigurationManager.isParallelIOEnabled()) { final ExecutorService pool = CommonThreadPool.get(); try { List> f = new ArrayList<>(clen); @@ -985,7 +987,7 @@ public boolean isShallowSerialize() { public boolean isShallowSerialize(boolean inclConvert) { // shallow serialize if non-string schema because a frame block // is always dense but strings have large array overhead per cell - if( _schema != null ) + if(_schema != null) for(int j = 0; j < _schema.length; j++) if(!_coldata[j].isShallowSerialize()) return false; @@ -1014,8 +1016,9 @@ public void compactEmptyBlock() { */ public FrameBlock binaryOperations(BinaryOperator bop, FrameBlock that, FrameBlock out) { if(getNumColumns() != that.getNumColumns() && getNumRows() != that.getNumColumns()) - throw new DMLRuntimeException("Frame dimension mismatch " + getNumRows() + " * " + getNumColumns() + " != " - + that.getNumRows() + " * " + that.getNumColumns()); + throw new DMLRuntimeException( + "Frame dimension mismatch " + getNumRows() + " * " + getNumColumns() + " != " + that.getNumRows() + + " * " + that.getNumColumns()); String[][] outputData = new String[getNumRows()][getNumColumns()]; // compare output value, incl implicit type promotion if necessary if(bop.fn instanceof ValueComparisonFunction) { @@ -1091,15 +1094,16 @@ public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, in if(rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows() || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) - + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]."); + "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) + + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]."); } if((ru - rl + 1) < rhsFrame.getNumRows() || (cu - cl + 1) < rhsFrame.getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: " + "dimensions of the source frame [" + rhsFrame.getNumRows() + "x" - + rhsFrame.getNumColumns() + "] " + "do not match the shape of the frame specified by indices [" - + (rl + 1) + ":" + (ru + 1) + ", " + (cl + 1) + ":" + (cu + 1) + "]."); + "Invalid values for frame indexing: " + "dimensions of the source frame [" + rhsFrame.getNumRows() + + "x" + rhsFrame.getNumColumns() + "] " + + "do not match the shape of the frame specified by indices [" + (rl + 1) + ":" + (ru + 1) + ", " + + (cl + 1) + ":" + (cu + 1) + "]."); } // allocate output frame (incl deep copy schema) @@ -1119,7 +1123,7 @@ public FrameBlock leftIndexingOperations(FrameBlock rhsFrame, int rl, int ru, in // fast-path for homogeneous column schemas if(_schema[j] == rhsFrame._schema[j - cl]) tmp.set(rl, ru, rhsFrame._coldata[j - cl]); - // general-path for heterogeneous column schemas + // general-path for heterogeneous column schemas else { for(int i = rl; i <= ru; i++) tmp.set(i, UtilFunctions.objectToObject(_schema[j], rhsFrame._coldata[j - cl].get(i - rl))); @@ -1211,8 +1215,8 @@ protected void validateSliceArgument(int rl, int ru, int cl, int cu) { if(rl < 0 || rl >= getNumRows() || ru < rl || ru >= getNumRows() || cl < 0 || cu >= getNumColumns() || cu < cl || cu >= getNumColumns()) { throw new DMLRuntimeException( - "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) - + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]"); + "Invalid values for frame indexing: [" + (rl + 1) + ":" + (ru + 1) + "," + (cl + 1) + ":" + (cu + 1) + + "] " + "must be within frame dimensions [" + getNumRows() + "," + getNumColumns() + "]"); } } @@ -1266,10 +1270,10 @@ public void copy(FrameBlock src) { _msize = -1; } - public FrameBlock copyShallow(){ + public FrameBlock copyShallow() { FrameBlock ret = new FrameBlock(); ret._nRow = _nRow; - ret._msize = _msize; + ret._msize = _msize; final int nCol = getNumColumns(); if(_coldata != null) ret._coldata = Arrays.copyOf(_coldata, nCol); @@ -1286,7 +1290,7 @@ public FrameBlock copyShallow(){ * Copy src matrix into the index range of the existing current matrix. * * This is used to copy smaller blocks into a larger block, for instance in binary reading. - * + * * @param rl row start * @param ru row end inclusive * @param cl col start @@ -1302,18 +1306,30 @@ public void copy(int rl, int ru, int cl, int cu, FrameBlock src) { ensureAllocateMeta(); if(_coldata == null) // allocate column data. _coldata = new Array[_schema.length]; - synchronized(this) { // make sync locks - // TODO remove sync locks on array types where they are not needed. - if(_columnLocks == null) { - Object[] locks = new Object[_schema.length]; - for(int i = 0; i < locks.length; i++) - locks[i] = new Object(); - _columnLocks = new SoftReference<>(locks); + + for(int j = cl; j <= cu; j++) { + Array col = _coldata[j]; + boolean isUnsafe = (col == null) || (col instanceof OptionalArray) || (col instanceof BitSetArray) || + (col instanceof RaggedArray) || (col instanceof ACompressedArray); + + if(isUnsafe) { + if(_columnLocks == null || _columnLocks.get() == null) { + synchronized(this) { + if(_columnLocks == null || _columnLocks.get() == null) { + Object[] locks = new Object[_schema.length]; + for(int i = 0; i < locks.length; i++) + locks[i] = new Object(); + _columnLocks = new SoftReference<>(locks); + } + } + } + Object[] locks = _columnLocks.get(); + // read/write inside lock, for safest write and most accurate read + synchronized(locks[j]) { + _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); + } } - } - Object[] locks = _columnLocks.get(); - for(int j = cl; j <= cu; j++) { // for each column - synchronized(locks[j]) { // synchronize on the column. + else { _coldata[j] = ArrayFactory.set(_coldata[j], src._coldata[j - cl], rl, ru, _nRow); } } @@ -1343,8 +1359,9 @@ public FrameBlock merge(FrameBlock that) { // check dimensions (before potentially copy to prevent implicit dimension change) if(getNumRows() != that.getNumRows() || getNumColumns() != that.getNumColumns()) - throw new DMLRuntimeException("Dimension mismatch on merge disjoint (target=" + getNumRows() + "x" - + getNumColumns() + ", source=" + that.getNumRows() + "x" + that.getNumColumns() + ")"); + throw new DMLRuntimeException( + "Dimension mismatch on merge disjoint (target=" + getNumRows() + "x" + getNumColumns() + ", source=" + + that.getNumRows() + "x" + that.getNumColumns() + ")"); // meta data copy if necessary for(int j = 0; j < getNumColumns(); j++) @@ -1440,15 +1457,16 @@ public final FrameBlock applySchema(FrameBlock schema, int k) { /** * Drop the cell value which does not confirms to the data type of its column - * + * * @param schema of the frame * @return original frame where invalid values are replaced with null */ public FrameBlock dropInvalidType(FrameBlock schema) { // sanity checks if(this.getNumColumns() != schema.getNumColumns()) - throw new DMLException("mismatch in number of columns in frame and its schema " + this.getNumColumns() - + " != " + schema.getNumColumns()); + throw new DMLException( + "mismatch in number of columns in frame and its schema " + this.getNumColumns() + " != " + + schema.getNumColumns()); // extract the schema in String array String[] schemaString = IteratorFactory.getStringRowIterator(schema).next(); @@ -1475,8 +1493,9 @@ else if(schemaCol.contains("STRING")) if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type.equals("INT")) && !(dataType == ValueType.BOOLEAN && type.equals("FP"))) { - LOG.warn("Datatype detected: " + dataType + " where expected: " + schemaString[i] + " col: " - + (i + 1) + ", row:" + (j + 1)); + LOG.warn( + "Datatype detected: " + dataType + " where expected: " + schemaString[i] + " col: " + (i + 1) + + ", row:" + (j + 1)); this.set(j, i, null); } @@ -1722,8 +1741,8 @@ else if(varname.length == 2) { return (FrameMapFunction) CodegenUtils.compileClass(cname, sb.toString()).getDeclaredConstructor() .newInstance(); } - catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException - | NoSuchMethodException | SecurityException e) { + catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | + NoSuchMethodException | SecurityException e) { throw new DMLRuntimeException("Failed to compile FrameMapFunction.", e); } } @@ -1749,12 +1768,12 @@ public FrameBlock replaceOperations(String pattern, String replacement) { boolean NaNp = "NaN".equals(pattern); boolean NaNr = "NaN".equals(replacement); - ValueType patternType = UtilFunctions - .isBoolean(pattern) ? ValueType.BOOLEAN : (NumberUtils.isCreatable(pattern) | - NaNp ? (UtilFunctions.isIntegerNumber(pattern) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); - ValueType replacementType = UtilFunctions.isBoolean(replacement) ? ValueType.BOOLEAN : (NumberUtils - .isCreatable(replacement) | - NaNr ? (UtilFunctions.isIntegerNumber(replacement) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); + ValueType patternType = UtilFunctions.isBoolean(pattern) ? ValueType.BOOLEAN : ( + NumberUtils.isCreatable(pattern) | NaNp ? (UtilFunctions.isIntegerNumber( + pattern) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); + ValueType replacementType = UtilFunctions.isBoolean(replacement) ? ValueType.BOOLEAN : ( + NumberUtils.isCreatable(replacement) | NaNr ? (UtilFunctions.isIntegerNumber( + replacement) ? ValueType.INT64 : ValueType.FP64) : ValueType.STRING); if(patternType != replacementType || !ValueType.isSameTypeString(patternType, replacementType)) throw new DMLRuntimeException( diff --git a/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java new file mode 100644 index 00000000000..c97d0f05243 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/FrameBlockConcurrentCopyTest.java @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io; + +import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.frame.data.FrameBlock; +import org.apache.sysds.runtime.frame.data.columns.*; +import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; + +@RunWith(value = Parameterized.class) +@net.jcip.annotations.NotThreadSafe +public class FrameBlockConcurrentCopyTest extends AutomatedTestBase { + + private final static String TEST_NAME = "FrameBlockConcurrentCopyTest"; + private final static String TEST_DIR = "functions/frame/"; + private final static String TEST_CLASS_DIR = TEST_DIR + FrameBlockConcurrentCopyTest.class.getSimpleName() + "/"; + + private final int _threads; + + public FrameBlockConcurrentCopyTest(int threads) { + _threads = threads; + } + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B"})); + } + + @Parameters + public static Collection data() { + return Arrays.asList(new Object[][] {{20}, {50}}); + } + + /** + * Boolean Array Consistency: Verifies that concurrent writes of "All True" or "All False" rows do not result in + * mixed rows. + */ + @Test + public void testBooleanArrayConsistency() { + final int COLS = 64; + final int ITERATIONS = 100; + + for(int iter = 0; iter < ITERATIONS; iter++) { + ValueType[] schema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); + FrameBlock target = new FrameBlock(schema, 1); + target.ensureAllocatedColumns(1); + + // all false + for(int c = 0; c < COLS; c++) + target.set(0, c, false); + + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(_threads); + AtomicInteger errors = new AtomicInteger(0); + + for(int t = 0; t < _threads; t++) { + final int threadId = t; + new Thread(() -> { + try { + startLatch.await(); + + // pattern + boolean pattern = (threadId % 2 == 0); + ValueType[] subSchema = UtilFunctions.nCopies(COLS, ValueType.BOOLEAN); + + // source blocks + FrameBlock source = new FrameBlock(subSchema, 1); + source.ensureAllocatedColumns(1); + for(int c = 0; c < COLS; c++) + source.set(0, c, pattern); + + // repeatedly copy to the shared target + for(int i = 0; i < 50; i++) { + // change pattern + for(int c = 0; c < COLS; c++) + source.set(0, c, !pattern); + pattern = !pattern; + + target.copy(0, 0, 0, COLS - 1, source); + } + } + catch(Exception e) { + errors.incrementAndGet(); + } + finally { + doneLatch.countDown(); + } + }).start(); + } + + startLatch.countDown(); + try { + doneLatch.await(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Exceptions occurred inside worker threads", 0, errors.get()); + + // The row must be uniform (All True/False), mix row indicates overwriting + Boolean firstVal = (Boolean) target.get(0, 0); + for(int c = 1; c < COLS; c++) { + Boolean val = (Boolean) target.get(0, c); + if(val != firstVal) { + Assert.fail( + "Inconsistent array detected! Found mixed values in what should be an atomic block write."); + } + } + } + } + + /** + * Non-Boolean Type Safety: Verifies that standard types are thread-safe when threads write to disjoint rows. + */ + @Test + public void testSafeTypesNoSync() { + final int ITERATIONS = 20; + final int COLS = 5; + ValueType[] types = {ValueType.BOOLEAN, ValueType.INT64, ValueType.FP64, ValueType.STRING, ValueType.INT32, + ValueType.FP32, ValueType.UINT4, ValueType.CHARACTER, ValueType.HASH32, ValueType.HASH64}; + + for(ValueType type : types) { + for(int iter = 0; iter < ITERATIONS; iter++) { + // 1 row per threat to check disjoint writing + ValueType[] schema = UtilFunctions.nCopies(COLS, type); + FrameBlock target = new FrameBlock(schema, _threads); + target.ensureAllocatedColumns(_threads); + + CyclicBarrier barrier = new CyclicBarrier(_threads); + AtomicInteger errors = new AtomicInteger(0); + Thread[] threads = new Thread[_threads]; + + for(int t = 0; t < _threads; t++) { + final int threadId = t; + threads[t] = new Thread(() -> { + try { + barrier.await(); + + FrameBlock source = new FrameBlock(schema, 1); + source.ensureAllocatedColumns(1); + // value based on threadID + for(int c = 0; c < COLS; c++) { + initializeCell(source, 0, c, type, threadId + c); + } + // Copy source into target at specific row index + target.copy(threadId, threadId, 0, COLS - 1, source); + + } + catch(Exception e) { + errors.incrementAndGet(); + } + }); + threads[t].start(); + } + + try { + for(Thread t : threads) + t.join(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Thread errors detected for type " + type, 0, errors.get()); + + for(int r = 0; r < _threads; r++) { + for(int c = 0; c < COLS; c++) { + Object val = target.get(r, c); + verifyCell(val, type, r + c); + } + } + } + } + } + + /** + * Test 3: High-Contention Bit Packing Stress Test Race-condition test Threads focus rows within shared column + */ + @Test + public void testBooleanBitPacking() { + for(int iter = 0; iter < 10; iter++) { + final int ROWS = 1024; + final int STRESS_LOOPS = 2000; + + ValueType[] schema = new ValueType[] {ValueType.BOOLEAN}; + FrameBlock target = new FrameBlock(schema, ROWS); + target.ensureAllocatedColumns(ROWS); + + //all false + for(int r = 0; r < ROWS; r++) + target.set(r, 0, false); + + // pre-allocate source blocks + FrameBlock[] sources = new FrameBlock[ROWS]; + for(int i = 0; i < ROWS; i++) { + sources[i] = new FrameBlock(schema, 1); + sources[i].ensureAllocatedColumns(1); + sources[i].set(0, 0, true); + } + + CyclicBarrier barrier = new CyclicBarrier(ROWS); + AtomicInteger exceptions = new AtomicInteger(0); + Thread[] threads = new Thread[ROWS]; + + for(int i = 0; i < ROWS; i++) { + final int rowIndex = i; + threads[i] = new Thread(() -> { + try { + barrier.await(); + + // try to maximize overlapp + for(int k = 0; k < STRESS_LOOPS; k++) { + target.copy(rowIndex, rowIndex, 0, 0, sources[rowIndex]); + } + } + catch(Exception e) { + exceptions.incrementAndGet(); + } + }); + threads[i].start(); + } + + try { + for(Thread t : threads) + t.join(); + } + catch(InterruptedException e) { + Assert.fail("Test interrupted: " + e.getMessage()); + } + + Assert.assertEquals("Exceptions occurred in threads", 0, exceptions.get()); + + // row should be TRUE, any FALSE value means update was overwritten by another thread + int trueCount = 0; + for(int r = 0; r < ROWS; r++) { + Object val = target.get(r, 0); + if(val != null && (Boolean) val) + trueCount++; + } + + Assert.assertEquals("Race condition. Lost updates in iteration " + iter, ROWS, trueCount); + } + } + + /** + * Test 4: OptionalArray Locking Test + */ + @Test + public void testOptionalArray() { + for(int i = 0; i < 10; i++) { + System.out.println("--- RUN " + (i + 1) + " ---"); + runOptional(); + } + } + + @SuppressWarnings("unchecked") + private void runOptional() { + int rows = 50000; + ValueType type = ValueType.FP64; + FrameBlock target = new FrameBlock(new ValueType[] {type}, rows); + target.ensureAllocatedColumns(rows); + + // target (optional array) + target.setColumn(0, ArrayFactory.allocateOptional(type, rows)); + + // source 1 (non-null) + FrameBlock sourceValid = new FrameBlock(new ValueType[] {type}, rows); + sourceValid.ensureAllocatedColumns(rows); + Array sCol1 = ArrayFactory.allocateOptional(type, rows); + ((OptionalArray) sCol1).fill(1.0d); + sourceValid.setColumn(0, sCol1); + + // source 2 (all null) + FrameBlock sourceNull = new FrameBlock(new ValueType[] {type}, rows); + sourceNull.ensureAllocatedColumns(rows); + Array sCol2 = ArrayFactory.allocateOptional(type, rows); + ((OptionalArray) sCol2).fill((Double) null); + sourceNull.setColumn(0, sCol2); + + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicInteger errors = new AtomicInteger(0); + + Thread t1 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows - 1, 0, 0, sourceValid); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + + Thread t2 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows - 1, 0, 0, sourceNull); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } + catch(InterruptedException e) { + } + + int validCount = 0; + int nullCount = 0; + + for(int r = 1; r < rows; r++) { + if(target.get(r, 0) == null) + nullCount++; + else + validCount++; + } + + System.out.println("Valid: " + validCount + ", Null: " + nullCount); + + if(validCount > 0 && nullCount > 0) { + Assert.fail( + "Race condition OptionalArray:\n" + "Thread 1 (non-null) and Thread 2 (null) mixed instructions!\n" + + "Result: " + validCount + " valid, " + nullCount + " null."); + } + } + + /** + * Test 5: BitSetArray Locking Test + */ + @Test + public void testBitSetArray() { + for(int i = 0; i < 20; i++) { + System.out.println("--- BitSet RUN " + (i + 1) + " ---"); + runBitSet(); + } + } + + private void runBitSet() { + int rows = 50000; + ValueType type = ValueType.BOOLEAN; + FrameBlock target = new FrameBlock(new ValueType[] {type}, rows); + target.ensureAllocatedColumns(rows); + // initialize false + target.setColumn(0, ArrayFactory.allocate(type, rows)); + + // Source 1 (all true) + FrameBlock sourceTrue = new FrameBlock(new ValueType[] {type}, rows); + sourceTrue.ensureAllocatedColumns(rows); + Array sCol1 = ArrayFactory.allocate(type, rows); + // Fill with TRUE + for(int k = 0; k < rows; k++) + sCol1.set(k, String.valueOf(true)); + sourceTrue.setColumn(0, sCol1); + + // Source 2 (all false) + FrameBlock sourceFalse = new FrameBlock(new ValueType[] {type}, rows); + sourceFalse.ensureAllocatedColumns(rows); + Array sCol2 = ArrayFactory.allocate(type, rows); + // fill with false + for(int k = 0; k < rows; k++) + sCol2.set(k, String.valueOf(false)); + sourceFalse.setColumn(0, sCol2); + + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicInteger errors = new AtomicInteger(0); + + Thread t1 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows - 1, 0, 0, sourceTrue); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + + Thread t2 = new Thread(() -> { + try { + barrier.await(); + target.copy(1, rows - 1, 0, 0, sourceFalse); + } + catch(Exception e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + }); + + t1.start(); + t2.start(); + try { + t1.join(); + t2.join(); + } + catch(InterruptedException e) { + } + + // Verification + int trueCount = 0; + int falseCount = 0; + + for(int r = 3; r < rows; r++) { + Boolean val = (Boolean) target.get(r, 0); + if(val != null && val) + trueCount++; + else + falseCount++; + } + + System.out.println("True: " + trueCount + ", False: " + falseCount); + + if(trueCount > 0 && falseCount > 0) { + Assert.fail( + "Race condition in BitSetArray:\n" + "Thread True and Thread False mixed instructions!\n" + "Result: " + + trueCount + " True, " + falseCount + " False."); + } + } + + private void initializeCell(FrameBlock fb, int row, int col, ValueType type, int seed) { + switch(type) { + case BOOLEAN: + fb.set(row, col, (seed % 2 == 0)); + break; + case INT32: + fb.set(row, col, seed); + break; + case INT64: + fb.set(row, col, (long) seed); + break; + case FP32: + fb.set(row, col, (float) seed); + break; + case FP64: + fb.set(row, col, (double) seed); + break; + case UINT8: + fb.set(row, col, (int) (seed % 127)); + break; + case UINT4: + fb.set(row, col, (int) (seed % 15)); + break; + case CHARACTER: + fb.set(row, col, (char) ('a' + (seed % 26))); + break; + case STRING: + fb.set(row, col, "v" + seed); + break; + case HASH32: + fb.set(row, col, seed); + break; + case HASH64: + fb.set(row, col, (long) seed); + break; + default: + fb.set(row, col, String.valueOf(seed)); + } + } + + private void verifyCell(Object val, ValueType type, int expectedSeed) { + Assert.assertNotNull("Value should not be null", val); + switch(type) { + case INT32: + Assert.assertEquals((int) expectedSeed, ((Integer) val).intValue()); + break; + case INT64: + Assert.assertEquals((long) expectedSeed, ((Long) val).longValue()); + break; + case FP32: + Assert.assertEquals((float) expectedSeed, ((Float) val).floatValue(), 0.0001); + break; + case FP64: + Assert.assertEquals((double) expectedSeed, ((Double) val).doubleValue(), 0.0001); + break; + case UINT8: + Assert.assertEquals((int) (expectedSeed % 127), ((Integer) val).intValue()); + break; + case CHARACTER: + Assert.assertEquals((char) ('a' + (expectedSeed % 26)), ((Character) val).charValue()); + break; + case STRING: + Assert.assertEquals("v" + expectedSeed, val); + break; + case BOOLEAN: + Assert.assertEquals((expectedSeed % 2 == 0), val); + break; + default: + break; + } + } +}