Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,35 @@
}

if (!isWorkMemTable) {
/*
* 1. Q1 queries this TVList while it is still in the working memtable and records a smaller
* visible row count.
* 2. Later writes append out-of-order rows to the same TVList, then FLUSH moves the
* memtable to the flushing list.
* 3. Q2 queries the flushing memtable. If Q2 directly reuses the original mutable TVList,
* Q2's query-side sort may reorder the indices in place.
* 4. Q1 continues to read with its old row count and the reordered indices. The converted
* value index can exceed Q1's bitmap range and cause out-of-bound access.
*
* Therefore, this flushing branch can reuse the original list only when it is already
* sorted or no active query is using it. Otherwise, Q2 should read from
* workingListForFlush.
*/
boolean canUseListDirectly = list.isSorted() || list.getQueryContextSet().isEmpty();
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's query list");
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
if (canUseListDirectly) {
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
TVList workingListForFlushSort = memChunk.initWorkingListForFlushIfNecessary(list, true);
// The flush list shares value arrays with the original list, so keep the original list
// referenced by this query until the query finishes.
list.getQueryContextSet().add(context);
workingListForFlushSort.getQueryContextSet().add(context);
context.addTVListToSet(Collections.singletonMap(list, 0));
tvListQueryMap.put(workingListForFlushSort, workingListForFlushSort.rowCount());
}
} else {
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
Expand Down Expand Up @@ -233,7 +258,7 @@
* have chunkMetadata, but query will use these, so we need to generate it for them.
*/
@Override
public AlignedTimeSeriesMetadata generateTimeSeriesMetadata(

Check warning on line 261 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 91 to 64, Complexity from 20 to 14, Nesting Level from 5 to 2, Number of Variables from 21 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrKW1NKbCjkFQneK&open=AZ46wrKW1NKbCjkFQneK&pullRequest=17709
List<ReadOnlyMemChunk> readOnlyMemChunk,
List<IChunkMetadata> chunkMetadataList,
Filter globalTimeFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ public void run() {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime);
memSerializeTime += subTaskTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@
/*
* Concurrency background:
*
* A query may start earlier and record the current row count (rows) of the TVList as its visible range.
* After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList.
* A query may start earlier and record the current row count (rows) of the TVList as its
* visible range. After that, new unseq writes may arrive and immediately trigger a flush, which
* will sort the TVList.
*
* During sorting, the underlying indices array of the TVList may be reordered.
* If the query continues to use the previously recorded rows as its upper bound,
Expand All @@ -219,6 +220,9 @@
* To avoid this issue, when there are active queries on the working TVList, we must
* clone the times and indices before sorting, so that the flush sort does not mutate
* the data structures that concurrent queries rely on.
*
* Flushing-memtable queries may also reuse workingListForFlush instead of the original working
* TVList for the same reason.
*/
boolean needCloneTimesAndIndicesInWorkingTVList;
workingList.lockQueryList();
Expand All @@ -228,7 +232,7 @@
workingList.unlockQueryList();
}
workingListForFlush =
needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList;
initWorkingListForFlushIfNecessary(workingList, needCloneTimesAndIndicesInWorkingTVList);
workingListForFlush.sort();
}

Expand Down Expand Up @@ -267,4 +271,14 @@

@Override
public abstract int serializedSize();

@Override
public synchronized TVList initWorkingListForFlushIfNecessary(
TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {

Check warning on line 277 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'needCloneTimesAndIndicesInWorkingTVList' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrJV1NKbCjkFQneI&open=AZ46wrJV1NKbCjkFQneI&pullRequest=17709
if (workingListForFlush == null) {
workingListForFlush =
needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList;
}
return workingListForFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,7 @@
TVList getWorkingTVList();

void setWorkingTVList(TVList list);

TVList initWorkingListForFlushIfNecessary(
TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);

Check warning on line 131 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'needCloneTimesAndIndicesInWorkingTVList' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrJq1NKbCjkFQneJ&open=AZ46wrJq1NKbCjkFQneJ&pullRequest=17709
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,63 @@
}
}

@Test
public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()

Check warning on line 226 in iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Abbreviation in name 'testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery' must contain no more than '2' consecutive capital letters.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ46wrIT1NKbCjkFQneH&open=AZ46wrIT1NKbCjkFQneH&pullRequest=17709
throws QueryProcessException, IOException, IllegalPathException {

PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
List<IMeasurementSchema> measurementSchemas =
Arrays.asList(
new MeasurementSchema("s1", TSDataType.INT32),
new MeasurementSchema("s2", TSDataType.INT32),
new MeasurementSchema("s3", TSDataType.INT32));
PlainDeviceID deviceID = new PlainDeviceID("root.test.d1");
for (int i = 1000; i < 2000; i++) {
memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i});
}

ResourceByPathUtils resourcesByPathUtils =
ResourceByPathUtils.getResourceInstance(
new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"), measurementSchemas));
AlignedReadOnlyMemChunk firstQueryMemChunk =
(AlignedReadOnlyMemChunk)
resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
TVList originalWorkingList = memTable.getWritableMemChunk(deviceID, "").getWorkingTVList();
Assert.assertSame(
originalWorkingList,
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());

for (int i = 1; i <= 50; i++) {
memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i});
}
MeasurementPath path = new MeasurementPath("root.test.d1.s1", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32);
memTable.delete(path, path.getDevicePath(), 1, 10);
Assert.assertFalse(originalWorkingList.isSorted());

AlignedReadOnlyMemChunk flushingQueryMemChunk =
(AlignedReadOnlyMemChunk)
resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
new QueryContext(2), memTable, new ArrayList<>(), Long.MAX_VALUE, null);
TVList flushingQueryList =
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
Assert.assertNotSame(originalWorkingList, flushingQueryList);

flushingQueryMemChunk.sortTvLists();
Assert.assertFalse(originalWorkingList.isSorted());

firstQueryMemChunk.sortTvLists();
MemPointIterator memPointIterator =
firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
while (memPointIterator.hasNextBatch()) {
memPointIterator.nextBatch();
}
}

@Test
public void memSeriesToStringTest() throws IOException {
TSDataType dataType = TSDataType.INT32;
Expand Down
Loading