Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,16 +587,12 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs,
// key as null, too.
//
for (int column : outerSmallTableKeyColumnMap) {
ColumnVector colVector = batch.cols[column];
colVector.noNulls = false;
colVector.isNull[batchIndex] = true;
batch.cols[column].clearValue(batchIndex);
}

// Small table values are set to null.
for (int column : smallTableValueColumnMap) {
ColumnVector colVector = batch.cols[column];
colVector.noNulls = false;
colVector.isNull[batchIndex] = true;
batch.cols[column].clearValue(batchIndex);
}
}
}
Expand Down Expand Up @@ -746,15 +742,13 @@ protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws Hi
//
for (int column : outerSmallTableKeyColumnMap) {
ColumnVector colVector = batch.cols[column];
colVector.noNulls = false;
colVector.isNull[0] = true;
colVector.clearValue(0);
colVector.isRepeating = true;
}

for (int column : smallTableValueColumnMap) {
ColumnVector colVector = batch.cols[column];
colVector.noNulls = false;
colVector.isNull[0] = true;
colVector.clearValue(0);
colVector.isRepeating = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.vector.mapjoin;

import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VoidColumnVector;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests that {@link VectorMapJoinOuterGenerateResultOperator} invokes
* {@code clearValue} on every small-table key and value column for each
* unmatched big-table row. The HIVE-29598 bug is that without that call,
* a stale {@code vector[i]} survives the null marking and leaks into
* downstream operators that read the slot without checking {@code isNull[i]}.
*/
class TestVectorMapJoinOuterGenerateResultOperator {

/**
* A concrete subclass of the abstract operator with no-op stubs for the
* abstract methods, just enough to invoke {@code generateOuterNulls} and
* {@code generateOuterNullsRepeatedAll} directly from tests.
*/
private static class TestableOuterOp extends VectorMapJoinOuterGenerateResultOperator {
@Override
protected String getLoggingPrefix() {
return "test";
}

@Override
public void processBatch(VectorizedRowBatch batch) {
// No-op: tests invoke the generateOuterNulls* methods directly.
}
}

/**
* LongColumnVector that records every {@code clearSlotValue} invocation.
* Used to assert that the operator dispatched through the new
* {@link org.apache.hadoop.hive.ql.exec.vector.ColumnVector#clearValue(int)}
* contract, rather than just observing the slot-clearing side effect
* (which could also be produced by another mechanism).
*/
private static class TrackingLongColumnVector extends LongColumnVector {
final List<Integer> clearedIndices = new ArrayList<>();

TrackingLongColumnVector(int size) {
super(size);
}

@Override
protected void clearSlotValue(int elementNum) {
super.clearSlotValue(elementNum);
clearedIndices.add(elementNum);
}
}

@Test
void generateOuterNullsCallsClearValueOnEachMappedColumnForEachUnmatchedRow() throws HiveException, IOException {
TestableOuterOp op = new TestableOuterOp();
op.outerSmallTableKeyColumnMap = new int[] {0};
op.smallTableValueColumnMap = new int[] {1, 2};

VectorizedRowBatch batch = new VectorizedRowBatch(3, 4);
TrackingLongColumnVector keyCol = new TrackingLongColumnVector(4);
TrackingLongColumnVector valCol1 = new TrackingLongColumnVector(4);
TrackingLongColumnVector valCol2 = new TrackingLongColumnVector(4);
keyCol.vector[1] = 99L; // stale values that should be cleared
valCol1.vector[1] = 88L;
valCol2.vector[3] = 77L;
batch.cols[0] = keyCol;
batch.cols[1] = valCol1;
batch.cols[2] = valCol2;

int[] noMatchs = new int[] {1, 3};
op.generateOuterNulls(batch, noMatchs, noMatchs.length);

// Each tracked column had clearSlotValue invoked at indices 1 and 3.
assertEquals(Arrays.asList(1, 3), keyCol.clearedIndices);
assertEquals(Arrays.asList(1, 3), valCol1.clearedIndices);
assertEquals(Arrays.asList(1, 3), valCol2.clearedIndices);

// Bookkeeping side effect of clearValue (final base-class method).
assertFalse(keyCol.noNulls);
assertTrue(keyCol.isNull[1]);
assertTrue(keyCol.isNull[3]);
assertFalse(keyCol.isNull[0]);
assertFalse(keyCol.isNull[2]);

// Stale slot values cleared to 0L.
assertEquals(0L, keyCol.vector[1]);
assertEquals(0L, valCol1.vector[1]);
assertEquals(0L, valCol2.vector[3]);
}

@Test
void generateOuterNullsRepeatedAllCallsClearValueAtIndexZeroForEachMappedColumn() throws HiveException, IOException {
TestableOuterOp op = new TestableOuterOp();
op.outerSmallTableKeyColumnMap = new int[] {0};
op.smallTableValueColumnMap = new int[] {1};

VectorizedRowBatch batch = new VectorizedRowBatch(2, 4);
TrackingLongColumnVector keyCol = new TrackingLongColumnVector(4);
TrackingLongColumnVector valCol = new TrackingLongColumnVector(4);
keyCol.vector[0] = 42L;
valCol.vector[0] = 84L;
batch.cols[0] = keyCol;
batch.cols[1] = valCol;

op.generateOuterNullsRepeatedAll(batch);

// Each tracked column had clearSlotValue invoked exactly once at index 0.
assertEquals(Arrays.asList(0), keyCol.clearedIndices);
assertEquals(Arrays.asList(0), valCol.clearedIndices);

// Bookkeeping plus isRepeating set by the operator after clearValue.
assertFalse(keyCol.noNulls);
assertTrue(keyCol.isNull[0]);
assertTrue(keyCol.isRepeating);
assertFalse(valCol.noNulls);
assertTrue(valCol.isNull[0]);
assertTrue(valCol.isRepeating);

// Stale slot values cleared to 0L.
assertEquals(0L, keyCol.vector[0]);
assertEquals(0L, valCol.vector[0]);
}

@Test
void generateOuterNullsSetsBookkeepingOnTypeWithNoClearSlotValueOverride() throws HiveException, IOException {
// VoidColumnVector inherits the base ColumnVector.clearSlotValue() no-op
// — no per-slot value to zero. This verifies the operator still drives
// the bookkeeping (isNull[i], noNulls) through the final clearValue()
// contract on a type that doesn't override the hook.
TestableOuterOp op = new TestableOuterOp();
op.outerSmallTableKeyColumnMap = new int[] {};
op.smallTableValueColumnMap = new int[] {0};

VectorizedRowBatch batch = new VectorizedRowBatch(1, 4);
VoidColumnVector voidCol = new VoidColumnVector(4);
batch.cols[0] = voidCol;

int[] noMatchs = new int[] {1, 3};
op.generateOuterNulls(batch, noMatchs, noMatchs.length);

assertFalse(voidCol.noNulls);
assertTrue(voidCol.isNull[1]);
assertTrue(voidCol.isNull[3]);
assertFalse(voidCol.isNull[0]);
assertFalse(voidCol.isNull[2]);
}

/**
* Verifies that for every {@link ColumnVector} subclass whose
* {@code clearSlotValue} the PR overrides, the operator's call through
* {@code clearValue} ultimately reaches that override and zeroes the slot
* to the type's cleared state. Complements
* {@link #generateOuterNullsCallsClearValueOnEachMappedColumnForEachUnmatchedRow}
* which proves the dispatch chain on Long alone.
*/
@ParameterizedTest(name = "{0}")
@MethodSource("modifiedColumnVectorTypes")
void generateOuterNullsClearsSlotForEachModifiedType(
String typeName,
ColumnVector cv,
Runnable preLoad,
Runnable assertSlotCleared) throws HiveException, IOException {

TestableOuterOp op = new TestableOuterOp();
op.outerSmallTableKeyColumnMap = new int[] {};
op.smallTableValueColumnMap = new int[] {0};

VectorizedRowBatch batch = new VectorizedRowBatch(1, 4);
preLoad.run();
batch.cols[0] = cv;

int[] noMatchs = new int[] {2};
op.generateOuterNulls(batch, noMatchs, noMatchs.length);

assertTrue(cv.isNull[2]);
assertFalse(cv.noNulls);
assertSlotCleared.run();
}

static Stream<Arguments> modifiedColumnVectorTypes() {
final LongColumnVector longCv = new LongColumnVector(4);
final DoubleColumnVector doubleCv = new DoubleColumnVector(4);
final BytesColumnVector bytesCv = new BytesColumnVector(4);
final DecimalColumnVector decCv = new DecimalColumnVector(4, 18, 4);
final TimestampColumnVector tsCv = new TimestampColumnVector(4);
final IntervalDayTimeColumnVector ivCv = new IntervalDayTimeColumnVector(4);

return Stream.of(
Arguments.of(
"LongColumnVector",
longCv,
(Runnable) () -> longCv.vector[2] = 999L,
(Runnable) () -> assertEquals(0L, longCv.vector[2])),
Arguments.of(
"DoubleColumnVector",
doubleCv,
(Runnable) () -> doubleCv.vector[2] = 3.14,
(Runnable) () -> assertEquals(0.0, doubleCv.vector[2])),
Arguments.of(
"BytesColumnVector",
bytesCv,
(Runnable) () -> {
bytesCv.vector[2] = "stale".getBytes(StandardCharsets.UTF_8);
bytesCv.start[2] = 1;
bytesCv.length[2] = 3;
},
(Runnable) () -> {
assertNull(bytesCv.vector[2]);
assertEquals(0, bytesCv.start[2]);
assertEquals(0, bytesCv.length[2]);
}),
Arguments.of(
"DecimalColumnVector",
decCv,
(Runnable) () -> decCv.vector[2].setFromLong(999L),
(Runnable) () -> assertEquals(0L, decCv.vector[2].serialize64(decCv.scale))),
Arguments.of(
"TimestampColumnVector",
tsCv,
(Runnable) () -> {
tsCv.time[2] = 1234567890000L;
tsCv.nanos[2] = 999;
},
(Runnable) () -> {
// setNullValue convention: time = 0, nanos = 1
assertEquals(0L, tsCv.time[2]);
assertEquals(1, tsCv.nanos[2]);
}),
Arguments.of(
"IntervalDayTimeColumnVector",
ivCv,
(Runnable) () -> ivCv.set(2, new HiveIntervalDayTime(5, 0)),
(Runnable) () -> {
// setNullValue convention: totalSeconds = 0, nanos = 1
assertEquals(0L, ivCv.getTotalSeconds(2));
assertEquals(1, ivCv.getNanos(2));
})
);
}
}
61 changes: 61 additions & 0 deletions ql/src/test/queries/clientpositive/vector_outer_join7.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
set hive.explain.user=false;
SET hive.auto.convert.join=true;
SET hive.auto.convert.join.noconditionaltask=true;
set hive.fetch.task.conversion=none;

-- SORT_QUERY_RESULTS

-- Regression test for HIVE-29598: vectorized outer-join MapJoin can leave a
-- stale typed value in a scratch slot that the vectorizer has aliased to a
-- smallTableValueMapping target. For unmatched rows, generateOuterNulls()
-- flips isNull[i] = true but does not clear vector[i], so a downstream
-- ColOrCol -> IfExprLongScalarLongScalar chain that reads vector[i] without
-- consulting isNull[i] propagates the stale value into the result.
--
-- Repro shape: a LEFT OUTER MapJoin whose ON predicate uses a CAST scratch
-- column that is then reused as the small-table boolean-value column; the
-- projection computes CAST((s.s_bool OR p.p_bool) AS INT) over the null-padded
-- rows. The MAX() aggregate barrier prevents Calcite from inlining and
-- simplifying the bug surface away.
--
-- Expected: zero rows. Every probe row's (s_bool OR p_bool) is TRUE per SQL
-- three-valued logic (matched: FALSE OR TRUE; unmatched: NULL OR TRUE), so
-- CAST(... AS INT) is always 1 and WHERE observed_value = 0 matches nothing.
-- Without the fix: 'C 3 0 1' and 'D 3 0 1' leak through, since the stale int
-- from CastStringToLong ORed with 1 fails the strict == 1 check in
-- IfExprLongScalarLongScalar and stores 0.

CREATE TABLE t (k STRING, v STRING) STORED AS ORC;

INSERT INTO t VALUES
('A','1'),('A','2'),('A','3'),
('B','2'),('B','3'),
('C','3'),
('D','1'),('D','3');

WITH
probe AS (
SELECT k, v, (CAST(v AS INT) > 0) AS p_bool
FROM t WHERE CAST(v AS INT) >= 3
),
small_side AS (
SELECT k, v, (CAST(v AS INT) > 9999) AS s_bool
FROM t
),
classified AS (
SELECT p.k, p.v, CAST((s.s_bool OR p.p_bool) AS INT) AS observed_value
FROM probe p
LEFT JOIN small_side s
ON p.k = s.k
AND CAST(p.v AS INT) - 1 = CAST(s.v AS INT)
),
diagnosed AS (
SELECT k, v, MAX(observed_value) AS observed_value
FROM classified
GROUP BY k, v
)
SELECT k, v,
observed_value AS observed_value_returned_by_select,
1 AS required_value_per_sql_semantics
FROM diagnosed
WHERE observed_value = 0;
Loading
Loading