diff --git a/contrib/pax_storage/src/test/regress/expected/privileges.out b/contrib/pax_storage/src/test/regress/expected/privileges.out index a8d7eabea88..470314067c6 100644 --- a/contrib/pax_storage/src/test/regress/expected/privileges.out +++ b/contrib/pax_storage/src/test/regress/expected/privileges.out @@ -1018,7 +1018,7 @@ WHEN MATCHED THEN UPDATE SET b = s.b, a = t.a + 1 WHEN NOT MATCHED THEN INSERT VALUES (a, b); -ERROR: cannot update column in merge with distributed column +ERROR: permission denied for table mtarget -- fail (no SELECT on t.b) MERGE INTO mtarget t USING msource s ON t.a = s.a WHEN MATCHED AND t.b IS NOT NULL THEN diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 1f835283dcc..cc703b6e1a7 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -75,7 +75,7 @@ static bool try_redistribute(PlannerInfo *root, CdbpathMfjRel *g, static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti); -static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists); +static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists, bool hasSplitUpdate); static bool can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, GpPolicy *policy); /* @@ -2798,19 +2798,22 @@ create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy { /* * If merge contain CMD_INSERT, we need split merge to let new - * insert tuple redistributed to correct segment. otherwise, we - * create motion as the same as update/delete in create_motion_path_for_upddel + * insert tuple redistributed to correct segment. If merge has + * UPDATE that modifies distribution key, we also need split merge + * to handle the DELETE+INSERT split. */ foreach(l, mergeActionLists) { List *mergeActionList = lfirst(l); - foreach(lc, mergeActionList) + foreach(lc, mergeActionList) { MergeAction *action = lfirst(lc); if (action->commandType == CMD_INSERT) need_split_merge = true; } } + if (root->merge_need_split_update) + need_split_merge = true; if (need_split_merge) { @@ -2820,7 +2823,7 @@ create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy rel = build_simple_rel(root, linitial_int(resultRelations), NULL /*parent*/); targetLocus = cdbpathlocus_from_baserel(root, rel, 0); - subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists); + subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists, root->merge_need_split_update); subpath = cdbpath_create_explicit_motion_path(root, subpath, targetLocus); @@ -2923,14 +2926,35 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node) } static SplitMergePath * -make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists) +make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists, bool hasSplitUpdate) { PathTarget *splitMergePathTarget; SplitMergePath *splitmergepath; splitMergePathTarget = copy_pathtarget(subpath->pathtarget); - /* populate information generated above into splitupdate node */ + /* + * Same restriction as SplitUpdate: updating a distribution key + * is not allowed when the target relation has update triggers. + */ + if (hasSplitUpdate) + { + RangeTblEntry *rte = rt_fetch(root->parse->resultRelation, + root->parse->rtable); + if (has_update_triggers(rte->relid, true)) + ereport(ERROR, + (errcode(ERRCODE_GP_FEATURE_NOT_YET), + errmsg("UPDATE on distributed key column not allowed on relation with update triggers"))); + } + + /* When hasSplitUpdate, add DMLAction column for split UPDATE handling */ + if (hasSplitUpdate) + { + DMLActionExpr *actionExpr = makeNode(DMLActionExpr); + add_column_to_pathtarget(splitMergePathTarget, (Expr *) actionExpr, 0); + } + + /* populate information generated above into splitmerge node */ splitmergepath = makeNode(SplitMergePath); splitmergepath->path.pathtype = T_SplitMerge; splitmergepath->path.parent = subpath->parent; @@ -2947,6 +2971,7 @@ make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, L splitmergepath->subpath = subpath; splitmergepath->resultRelations = resultRelations; splitmergepath->mergeActionLists = mergeActionLists; + splitmergepath->hasSplitUpdate = hasSplitUpdate; return splitmergepath; } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 94f5d0678f8..c1f70ae9ba2 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1095,7 +1095,7 @@ ExecInsert(ModifyTableContext *context, */ if (mtstate->operation == CMD_UPDATE) wco_kind = WCO_RLS_UPDATE_CHECK; - else if (mtstate->operation == CMD_MERGE) + else if (mtstate->operation == CMD_MERGE && context->relaction != NULL) wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ? WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; else @@ -4372,7 +4372,77 @@ ExecModifyTable(PlanState *pstate) break; case CMD_MERGE: - slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag); + if (action == (int) DML_INSERT) + { + /* + * Split merge INSERT: extract non-junk columns (positions + * 1..N) from the plan slot into a properly-typed insert + * slot. We can't use ExecGetInsertNewTuple because the + * SplitMerge's ExecInitMergeTupleSlots may have set + * ri_projectNewInfoValid without building the INSERT + * projection. + */ + ResultRelInfo *saved = resultRelInfo; + TupleTableSlot *insertSlot; + int natts; + + resultRelInfo = node->rootResultRelInfo; + natts = RelationGetNumberOfAttributes(resultRelInfo->ri_RelationDesc); + + /* + * Lazily set up partition tuple routing for split-update + * MERGE INSERT on partitioned tables. + */ + if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == + RELKIND_PARTITIONED_TABLE && + node->mt_partition_tuple_routing == NULL) + { + node->mt_partition_tuple_routing = + ExecSetupPartitionTupleRouting(estate, + resultRelInfo->ri_RelationDesc); + } + + insertSlot = resultRelInfo->ri_newTupleSlot; + if (insertSlot == NULL) + { + insertSlot = table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = insertSlot; + } + + ExecClearTuple(insertSlot); + slot_getallattrs(context.planSlot); + memcpy(insertSlot->tts_values, context.planSlot->tts_values, + natts * sizeof(Datum)); + memcpy(insertSlot->tts_isnull, context.planSlot->tts_isnull, + natts * sizeof(bool)); + ExecStoreVirtualTuple(insertSlot); + + /* Set relaction to NULL to avoid ExecInsert dereferencing it */ + context.relaction = NULL; + + slot = ExecInsert(&context, resultRelInfo, insertSlot, + node->canSetTag, NULL, NULL, + true /* splitUpdate */); + resultRelInfo = saved; + } + else if (action == (int) DML_DELETE) + { + /* + * Split merge DELETE: delete the old tuple on this segment. + */ + slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple, segid, + false, /* processReturning */ + false, /* changingPart */ + node->canSetTag, + NULL, NULL, NULL, + true /* splitUpdate */); + } + else + { + /* Normal MERGE processing (no split or pass-through) */ + slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag); + } break; default: @@ -4661,6 +4731,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_action_attno = ExecFindJunkAttributeInTlist(subplan->targetlist, "DMLAction"); } + else if (operation == CMD_MERGE) + { + mtstate->mt_action_attno = + ExecFindJunkAttributeInTlist(subplan->targetlist, "DMLAction"); + } } /* * Do additional per-result-relation initialization. diff --git a/src/backend/executor/nodeSplitMerge.c b/src/backend/executor/nodeSplitMerge.c index 09b5a0d4b97..5916a661dba 100644 --- a/src/backend/executor/nodeSplitMerge.c +++ b/src/backend/executor/nodeSplitMerge.c @@ -24,8 +24,14 @@ #include "executor/instrument.h" #include "executor/nodeSplitMerge.h" +#include "nodes/nodeFuncs.h" #include "utils/memutils.h" +/* + * Action value for rows that should be processed by ModifyTable's + * normal ExecMerge path (NOT MATCHED or MATCHED pass-through). + */ +#define SPLITMERGE_ACTION_PASSTHROUGH (-1) typedef struct MTTargetRelLookup { @@ -68,122 +74,142 @@ evalHashKey(SplitMergeState *node, Datum *values, bool *isnulls) return target_seg; } +/* + * Compute target segment ID from the given slot's values. + * Returns 0 if no hash is configured (DISTRIBUTED RANDOMLY). + */ +static int32 +computeTargetSegment(SplitMergeState *node, TupleTableSlot *slot) +{ + SplitMerge *plannode = (SplitMerge *) node->ps.plan; + + if (node->cdbhash) + return evalHashKey(node, slot->tts_values, slot->tts_isnull); + else + return cdbhashrandomseg(plannode->numHashSegments); +} + +/* + * Build a tuple in the N+M+1 format for hasSplitUpdate. + * + * The output slot layout is: + * [0..N-1] target table columns (from projSlot, or NULL) + * [N..N+M-1] subplan columns (from inputSlot) + * [N+M] DMLAction + * + * N = node->subplan_offset, M = inputSlot column count. + */ +static void +BuildSplitMergeTuple(SplitMergeState *node, TupleTableSlot *outSlot, + TupleTableSlot *inputSlot, TupleTableSlot *projSlot, + int dmlAction, int32 segid) +{ + int offset = node->subplan_offset; + int natts_input = inputSlot->tts_tupleDescriptor->natts; + int natts_out = outSlot->tts_tupleDescriptor->natts; + + ExecClearTuple(outSlot); + + memset(outSlot->tts_values, 0, natts_out * sizeof(Datum)); + memset(outSlot->tts_isnull, true, natts_out * sizeof(bool)); + + /* Positions 0..N-1: projected target table values (if provided) */ + if (projSlot) + { + int natts_proj = projSlot->tts_tupleDescriptor->natts; + slot_getallattrs(projSlot); + memcpy(outSlot->tts_values, projSlot->tts_values, + natts_proj * sizeof(Datum)); + memcpy(outSlot->tts_isnull, projSlot->tts_isnull, + natts_proj * sizeof(bool)); + } + /* Positions N..N+M-1: subplan columns */ + slot_getallattrs(inputSlot); + memcpy(outSlot->tts_values + offset, inputSlot->tts_values, + natts_input * sizeof(Datum)); + memcpy(outSlot->tts_isnull + offset, inputSlot->tts_isnull, + natts_input * sizeof(bool)); + /* gp_segment_id within subplan region */ + outSlot->tts_values[offset + node->segid_attno - 1] = Int32GetDatum(segid); + outSlot->tts_isnull[offset + node->segid_attno - 1] = false; + + /* DMLAction at the end */ + outSlot->tts_values[node->action_attno - 1] = Int32GetDatum(dmlAction); + outSlot->tts_isnull[node->action_attno - 1] = false; + + ExecStoreVirtualTuple(outSlot); +} + +/* + * MergeTupleTableSlot + * + * Handle a NOT MATCHED row: evaluate WHEN NOT MATCHED actions, + * project INSERT values, and compute target segment for routing. + */ static TupleTableSlot * -MergeTupleTableSlot(TupleTableSlot *slot, SplitMerge *plannode, SplitMergeState *node, ResultRelInfo *resultRelInfo) +MergeTupleTableSlot(TupleTableSlot *slot, SplitMerge *plannode, + SplitMergeState *node, ResultRelInfo *resultRelInfo) { ExprContext *econtext = node->ps.ps_ExprContext; - - List *actionStates = NIL; ListCell *l; TupleTableSlot *newslot = NULL; + int32 target_seg = 0; - /* - * For INSERT actions, the root relation's merge action is OK since the - * INSERT's targetlist and the WHEN conditions can only refer to the - * source relation and hence it does not matter which result relation we - * work with. - * - * XXX does this mean that we can avoid creating copies of actionStates on - * partitioned tables, for not-matched actions? - */ - actionStates = resultRelInfo->ri_notMatchedMergeAction; - - /* - * Make source tuple available to ExecQual and ExecProject. We don't need - * the target tuple, since the WHEN quals and targetlist can't refer to - * the target columns. - */ econtext->ecxt_scantuple = NULL; econtext->ecxt_innertuple = slot; econtext->ecxt_outertuple = NULL; - foreach(l, actionStates) + /* Evaluate NOT MATCHED actions to find INSERT projection */ + foreach(l, resultRelInfo->ri_notMatchedMergeAction) { MergeActionState *action = (MergeActionState *) lfirst(l); - CmdType commandType = action->mas_action->commandType; - /* - * Test condition, if any. - * - * In the absence of any condition, we perform the action - * unconditionally (no need to check separately since ExecQual() will - * return true if there are no conditions to evaluate). - */ if (!ExecQual(action->mas_whenqual, econtext)) continue; - /* Perform stated action */ - switch (commandType) - { - case CMD_INSERT: - - /* - * Project the tuple. In case of a partitioned table, the - * projection was already built to use the root's descriptor, - * so we don't need to map the tuple here. - */ - newslot = ExecProject(action->mas_proj); - - break; - case CMD_NOTHING: - /* Do nothing */ - break; - default: - elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); - } + if (action->mas_action->commandType == CMD_INSERT) + newslot = ExecProject(action->mas_proj); + /* else CMD_NOTHING: do nothing */ - /* - * We've activated one of the WHEN clauses, so we don't search - * further. This is required behaviour, not an optimization. - */ - break; + break; /* only first matching action */ } + /* Compute target segment for INSERT, or 0 for DO NOTHING */ if (newslot) - { - /* Compute segment ID for the new row */ - int32 target_seg; - - if (node->cdbhash) - target_seg = evalHashKey(node, newslot->tts_values, newslot->tts_isnull); - else - target_seg = cdbhashrandomseg(plannode->numHashSegments); + target_seg = computeTargetSegment(node, newslot); - slot->tts_values[node->segid_attno - 1] = Int32GetDatum(target_seg); - slot->tts_isnull[node->segid_attno - 1] = false; - } - else + /* Build output in the appropriate format */ + if (plannode->hasSplitUpdate) { - /* - * No newslot generated means that insert action will not be triggered. - * So we just redistributed tuple to any segment, like segment 0. - */ - slot->tts_values[node->segid_attno - 1] = Int32GetDatum(0); - slot->tts_isnull[node->segid_attno - 1] = false; + BuildSplitMergeTuple(node, node->ps.ps_ResultTupleSlot, + slot, NULL, SPLITMERGE_ACTION_PASSTHROUGH, + target_seg); + return node->ps.ps_ResultTupleSlot; } + /* Non-hasSplitUpdate: modify slot in-place */ + slot->tts_values[node->segid_attno - 1] = Int32GetDatum(target_seg); + slot->tts_isnull[node->segid_attno - 1] = false; return slot; } /* * ExecLookupResultRelByOid - * If the table with given OID is among the result relations to be - * updated by the given ModifyTable node, return its ResultRelInfo. + * If the table with given OID is among the result relations to be + * updated by the given SplitMerge node, return its ResultRelInfo. * * If not found, return NULL if missing_ok, else raise error. * - * If update_cache is true, then upon successful lookup, update the node's - * one-element cache. ONLY ExecModifyTable may pass true for this. + * If update_cache is true, update the node's one-element cache. */ static ResultRelInfo * MergeExecLookupResultRelByOid(SplitMergeState *node, Oid resultoid, - bool missing_ok, bool update_cache) + bool missing_ok, bool update_cache) { if (node->mt_resultOidHash) { - /* Use the pre-built hash table to locate the rel */ MTTargetRelLookup *mtlookup; mtlookup = (MTTargetRelLookup *) @@ -200,7 +226,6 @@ MergeExecLookupResultRelByOid(SplitMergeState *node, Oid resultoid, } else { - /* With few target rels, just search the ResultRelInfo array */ for (int ndx = 0; ndx < node->nrel; ndx++) { ResultRelInfo *rInfo = node->resultRelInfo + ndx; @@ -222,64 +247,163 @@ MergeExecLookupResultRelByOid(SplitMergeState *node, Oid resultoid, return NULL; } -/** - * Splits every TupleTableSlot into two TupleTableSlots: DELETE and INSERT. +/* + * SwitchResultRelForPartition + * + * For partitioned tables, look up the correct ResultRelInfo based on tableoid + * from the slot. Updates the cached result relation if it changes. + */ +static ResultRelInfo * +SwitchResultRelForPartition(SplitMergeState *node, TupleTableSlot *slot, + ResultRelInfo *resultRelInfo) +{ + bool isNull; + Datum d; + Oid resultoid; + + if (!AttributeNumberIsValid(node->mt_resultOidAttno)) + return resultRelInfo; + + d = ExecGetJunkAttribute(slot, node->mt_resultOidAttno, &isNull); + Assert(!isNull); + resultoid = DatumGetObjectId(d); + + if (resultoid != node->mt_lastResultOid) + resultRelInfo = MergeExecLookupResultRelByOid(node, resultoid, + false, true); + return resultRelInfo; +} + +/* + * MergeMatchedSplitUpdate + * + * Handle a MATCHED row when hasSplitUpdate is true. + * + * For UPDATE: splits into DELETE + INSERT tuples with DMLAction markers. + * Returns DELETE first; INSERT is saved for the next call. + * For DELETE: emits a single DELETE tuple. + * For DO NOTHING / no match: emits a pass-through tuple. + */ +static TupleTableSlot * +MergeMatchedSplitUpdate(TupleTableSlot *slot, SplitMerge *plannode, + SplitMergeState *node, ResultRelInfo *resultRelInfo) +{ + ExprContext *econtext = node->ps.ps_ExprContext; + ListCell *l; + int32 old_segid; + + econtext->ecxt_scantuple = slot; + econtext->ecxt_innertuple = slot; + econtext->ecxt_outertuple = NULL; + + old_segid = DatumGetInt32(slot->tts_values[node->segid_attno - 1]); + + foreach(l, resultRelInfo->ri_matchedMergeAction) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + switch (commandType) + { + case CMD_UPDATE: + { + TupleTableSlot *newslot; + int32 new_segid; + + newslot = ExecProject(action->mas_proj); + new_segid = computeTargetSegment(node, newslot); + + /* DELETE tuple: routed to old segment */ + BuildSplitMergeTuple(node, node->deleteTuple, slot, NULL, + (int) DML_DELETE, old_segid); + + /* INSERT tuple: projected new values, routed to new segment */ + BuildSplitMergeTuple(node, node->insertTuple, slot, newslot, + (int) DML_INSERT, new_segid); + + /* Return DELETE first, INSERT on next call */ + node->processInsert = true; + return node->deleteTuple; + } + + case CMD_DELETE: + BuildSplitMergeTuple(node, node->ps.ps_ResultTupleSlot, + slot, NULL, (int) DML_DELETE, old_segid); + return node->ps.ps_ResultTupleSlot; + + case CMD_NOTHING: + break; /* fall through to pass-through below */ + + default: + elog(ERROR, "unknown action in MERGE WHEN MATCHED clause"); + } + + break; /* only first matching action */ + } + + /* No UPDATE/DELETE action matched - pass-through */ + BuildSplitMergeTuple(node, node->ps.ps_ResultTupleSlot, slot, NULL, + SPLITMERGE_ACTION_PASSTHROUGH, old_segid); + return node->ps.ps_ResultTupleSlot; +} + +/* + * ExecSplitMerge + * + * Main entry point. For each input tuple from the JOIN: + * - NOT MATCHED: compute target segment for INSERT routing + * - MATCHED + hasSplitUpdate: split UPDATE into DELETE + INSERT + * - MATCHED (no split): pass through */ static TupleTableSlot * ExecSplitMerge(PlanState *pstate) { SplitMergeState *node = castNode(SplitMergeState, pstate); - PlanState *outerNode = outerPlanState(node); + PlanState *outerNode = outerPlanState(node); SplitMerge *plannode = (SplitMerge *) node->ps.plan; ResultRelInfo *resultRelInfo = node->resultRelInfo + node->mt_lastResultIndex; Datum datum; bool isNull; - Oid resultoid; - - TupleTableSlot *slot = NULL; - TupleTableSlot *result = NULL; + TupleTableSlot *slot; Assert(outerNode != NULL); - slot = ExecProcNode(outerNode); + /* Return pending INSERT tuple from a previous split UPDATE */ + if (node->processInsert) + { + node->processInsert = false; + return node->insertTuple; + } + slot = ExecProcNode(outerNode); if (TupIsNull(slot)) - { return NULL; - } datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); - /* ctid is NULL means that not matched, then check the insert action */ if (isNull) - result = MergeTupleTableSlot(slot, plannode, node, resultRelInfo); - else { - /* if partion table must switch resultRelInfo */ - if (AttributeNumberIsValid(node->mt_resultOidAttno)) - { - datum = ExecGetJunkAttribute(slot, node->mt_resultOidAttno, &isNull); - Assert(!isNull); - resultoid = DatumGetObjectId(datum); - if (resultoid != node->mt_lastResultOid) - resultRelInfo = MergeExecLookupResultRelByOid(node, resultoid, - false, true); - } - result = slot; + /* NOT MATCHED: compute target segment for INSERT routing */ + return MergeTupleTableSlot(slot, plannode, node, resultRelInfo); } - return result; -} + /* MATCHED: switch to correct partition if needed */ + resultRelInfo = SwitchResultRelForPartition(node, slot, resultRelInfo); + if (plannode->hasSplitUpdate) + return MergeMatchedSplitUpdate(slot, plannode, node, resultRelInfo); + /* No split update: pass through */ + return slot; +} /* * Initializes the tuple slots in a ResultRelInfo for any MERGE action. - * - * We mark 'projectNewInfoValid' even though the projections themselves - * are not initialized here. */ static void ExecInitMergeTupleSlots(SplitMergeState *mtstate, @@ -297,9 +421,51 @@ ExecInitMergeTupleSlots(SplitMergeState *mtstate, &estate->es_tupleTable); resultRelInfo->ri_projectNewInfoValid = true; } + /* - * Init SplitMerge Node. A memory context is created to hold Split Tuples. - * */ + * Build a TupleDesc for the root table's column layout from the plan's + * non-junk target list entries. Used for UPDATE projections so the result + * matches the SplitMerge output's first N columns regardless of which + * child partition is being updated. + */ +static TupleDesc +BuildRootUpdateTupleDesc(List *targetlist) +{ + TupleDesc desc; + ListCell *lc; + int nnonjunk = 0; + int col = 0; + + foreach(lc, targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + if (!tle->resjunk) + nnonjunk++; + } + + if (nnonjunk == 0) + return NULL; + + desc = CreateTemplateTupleDesc(nnonjunk); + foreach(lc, targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + if (!tle->resjunk) + { + col++; + TupleDescInitEntry(desc, col, tle->resname, + exprType((Node *) tle->expr), + exprTypmod((Node *) tle->expr), 0); + TupleDescInitEntryCollation(desc, col, + exprCollation((Node *) tle->expr)); + } + } + return desc; +} + +/* + * ExecInitSplitMerge + */ SplitMergeState* ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) { @@ -307,62 +473,64 @@ ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) ResultRelInfo *resultRelInfo; ExprContext *econtext; ListCell *lc; - int i; - + int i; + Plan *outerPlan = outerPlan(node); - /* Check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK | EXEC_FLAG_REWIND))); splitmergestate = makeNode(SplitMergeState); - splitmergestate->ps.plan = (Plan *)node; + splitmergestate->ps.plan = (Plan *) node; splitmergestate->ps.state = estate; splitmergestate->ps.ExecProcNode = ExecSplitMerge; + splitmergestate->processInsert = false; - /* - * then initialize outer plan - */ - Plan *outerPlan = outerPlan(node); outerPlanState(splitmergestate) = ExecInitNode(outerPlan, estate, eflags); - ExecAssignExprContext(estate, &splitmergestate->ps); econtext = splitmergestate->ps.ps_ExprContext; + /* Initialize result relations */ splitmergestate->nrel = list_length(node->resultRelations); - splitmergestate->resultRelInfo = (ResultRelInfo *)palloc(splitmergestate->nrel * sizeof(ResultRelInfo)); + splitmergestate->resultRelInfo = (ResultRelInfo *) + palloc(splitmergestate->nrel * sizeof(ResultRelInfo)); resultRelInfo = splitmergestate->resultRelInfo; - i = 0; foreach(lc, node->resultRelations) { Index resultRelation = lfirst_int(lc); - - ExecInitResultRelation(estate, resultRelInfo, resultRelation); - resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(outerPlan->targetlist, "ctid"); + ExecInitResultRelation(estate, resultRelInfo, resultRelation); + resultRelInfo->ri_RowIdAttNo = + ExecFindJunkAttributeInTlist(outerPlan->targetlist, "ctid"); if (!AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo)) elog(ERROR, "could not find junk ctid column"); - resultRelInfo++; - i++; } splitmergestate->mt_lastResultIndex = 0; splitmergestate->mt_lastResultOid = InvalidOid; + /* Build root-table-format slot for UPDATE projections (hasSplitUpdate only) */ + TupleTableSlot *rootUpdateSlot = NULL; + TupleDesc rootUpdateDesc = NULL; + if (node->hasSplitUpdate) + { + rootUpdateDesc = BuildRootUpdateTupleDesc(node->plan.targetlist); + if (rootUpdateDesc) + rootUpdateSlot = ExecInitExtraTupleSlot(estate, rootUpdateDesc, + &TTSOpsVirtual); + } + /* Initialize merge action states and projections */ i = 0; foreach(lc, node->mergeActionLists) { List *mergeActionList = lfirst(lc); - TupleDesc relationDesc; ListCell *l; resultRelInfo = splitmergestate->resultRelInfo + i; i++; - relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); - /* initialize slots for MERGE fetches from this rel */ if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) ExecInitMergeTupleSlots(splitmergestate, resultRelInfo); @@ -370,24 +538,13 @@ ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) { MergeAction *action = (MergeAction *) lfirst(l); MergeActionState *action_state; - TupleTableSlot *tgtslot; - TupleDesc tgtdesc; List **list; - /* - * Build action merge state for this rel. (For partitions, - * equivalent code exists in ExecInitPartitionInfo.) - */ action_state = makeNode(MergeActionState); action_state->mas_action = action; action_state->mas_whenqual = ExecInitQual((List *) action->qual, &splitmergestate->ps); - /* - * We create two lists - one for WHEN MATCHED actions and one for - * WHEN NOT MATCHED actions - and stick the MergeActionState into - * the appropriate list. - */ if (action_state->mas_action->matched) list = &resultRelInfo->ri_matchedMergeAction; else @@ -397,29 +554,22 @@ ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) switch (action->commandType) { case CMD_INSERT: - - /* - * If the MERGE targets a partitioned table, any INSERT - * actions must be routed through it, not the child - * relations. Initialize the routing struct and the root - * table's "new" tuple slot for that, if not already done. - * The projection we prepare, for all relations, uses the - * root relation descriptor, and targets the plan's root - * slot. (This is consistent with the fact that we - * checked the plan output to match the root relation, - * above.) - */ - /* not partitioned? use the stock relation and slot */ - tgtslot = resultRelInfo->ri_newTupleSlot; - tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); - action_state->mas_proj = ExecBuildProjectionInfo(action->targetList, econtext, - tgtslot, + resultRelInfo->ri_newTupleSlot, &splitmergestate->ps, - tgtdesc); + RelationGetDescr(resultRelInfo->ri_RelationDesc)); break; case CMD_UPDATE: + if (node->hasSplitUpdate && rootUpdateSlot != NULL) + { + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + rootUpdateSlot, + &splitmergestate->ps, + rootUpdateDesc); + } + break; case CMD_DELETE: case CMD_NOTHING: break; @@ -430,55 +580,68 @@ ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) } } - /* - * Look up the positions of the gp_segment_id in the subplan's target - * list, and in the result. - */ + /* Look up junk attribute positions in subplan output */ splitmergestate->segid_attno = ExecFindJunkAttributeInTlist(outerPlan->targetlist, "gp_segment_id"); - splitmergestate->mt_resultOidAttno = ExecFindJunkAttributeInTlist(outerPlan->targetlist, "tableoid"); - - Assert(AttributeNumberIsValid(splitmergestate->mt_resultOidAttno) || splitmergestate->nrel == 1); - /* - * DML nodes do not project. - */ + Assert(AttributeNumberIsValid(splitmergestate->mt_resultOidAttno) || + splitmergestate->nrel == 1); + + /* Initialize hasSplitUpdate-specific state */ + if (node->hasSplitUpdate) + { + splitmergestate->action_attno = + ExecFindJunkAttributeInTlist(node->plan.targetlist, "DMLAction"); + Assert(AttributeNumberIsValid(splitmergestate->action_attno)); + + /* subplan_offset = N = total output columns - subplan columns - DMLAction */ + splitmergestate->subplan_offset = + list_length(node->plan.targetlist) - + list_length(outerPlan->targetlist) - 1; + Assert(splitmergestate->subplan_offset > 0); + + /* Dedicated slots for split DELETE + INSERT tuple pair */ + { + TupleDesc tupDesc = ExecTypeFromTL(node->plan.targetlist); + splitmergestate->deleteTuple = + ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual); + splitmergestate->insertTuple = + ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsVirtual); + } + } + else + { + splitmergestate->action_attno = InvalidAttrNumber; + splitmergestate->subplan_offset = 0; + } + ExecInitResultTupleSlotTL(&splitmergestate->ps, &TTSOpsVirtual); splitmergestate->ps.ps_ProjInfo = NULL; - /* - * Initialize for computing hash key - */ + /* Initialize hash for computing target segment */ if (node->numHashAttrs > 0) { splitmergestate->cdbhash = makeCdbHash(node->numHashSegments, - node->numHashAttrs, - node->hashFuncs); + node->numHashAttrs, + node->hashFuncs); } if (estate->es_instrument && (estate->es_instrument & INSTRUMENT_CDB)) - { splitmergestate->ps.cdbexplainbuf = makeStringInfo(); - } return splitmergestate; } -/* Release Resources Requested by SplitMerge node. */ +/* Release resources requested by SplitMerge node. */ void ExecEndSplitMerge(SplitMergeState *node) { - for (int i = 0; i < node->nrel; i++) { ResultRelInfo *resultRelInfo = node->resultRelInfo + i; - /* - * Cleanup the initialized batch slots. This only matters for FDWs - * with batching, but the other cases will have ri_NumSlotsInitialized - * == 0. - */ + for (int j = 0; j < resultRelInfo->ri_NumSlotsInitialized; j++) { ExecDropSingleTupleTableSlot(resultRelInfo->ri_Slots[j]); @@ -486,17 +649,14 @@ ExecEndSplitMerge(SplitMergeState *node) } } - /* - * Free the exprcontext - */ ExecFreeExprContext(&node->ps); - - - /* - * clean out the tuple table - */ + if (node->ps.ps_ResultTupleSlot) ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->insertTuple) + ExecClearTuple(node->insertTuple); + if (node->deleteTuple) + ExecClearTuple(node->deleteTuple); + ExecEndNode(outerPlanState(node)); } - diff --git a/src/backend/nodes/copyfuncs.funcs.c b/src/backend/nodes/copyfuncs.funcs.c index e774ba2a549..6f312d17574 100644 --- a/src/backend/nodes/copyfuncs.funcs.c +++ b/src/backend/nodes/copyfuncs.funcs.c @@ -6475,6 +6475,8 @@ _copySplitMerge(const SplitMerge *from) COPY_SCALAR_FIELD(numHashSegments); COPY_NODE_FIELD(resultRelations); COPY_NODE_FIELD(mergeActionLists); + COPY_SCALAR_FIELD(hasSplitUpdate); + COPY_SCALAR_FIELD(rootResultRelation); return newnode; } diff --git a/src/backend/nodes/outfuncs_common.c b/src/backend/nodes/outfuncs_common.c index a4ba4c77e98..c518e38db0d 100644 --- a/src/backend/nodes/outfuncs_common.c +++ b/src/backend/nodes/outfuncs_common.c @@ -463,6 +463,8 @@ _outSplitMerge(StringInfo str, const SplitMerge *node) WRITE_OID_ARRAY(hashFuncs, node->numHashAttrs); WRITE_NODE_FIELD(resultRelations); WRITE_NODE_FIELD(mergeActionLists); + WRITE_BOOL_FIELD(hasSplitUpdate); + WRITE_UINT_FIELD(rootResultRelation); _outPlanInfo(str, (Plan *) node); } diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index 3222f8187ab..ff3cb5eaddf 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -1167,6 +1167,8 @@ _readSplitMerge(void) READ_NODE_FIELD(resultRelations); READ_NODE_FIELD(mergeActionLists); + READ_BOOL_FIELD(hasSplitUpdate); + READ_UINT_FIELD(rootResultRelation); ReadCommonPlan(&local_node->plan); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index ad0e41ae065..585c1385068 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3188,7 +3188,7 @@ create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path) * root->processed_tlist. The code to create the Split Update node * takes care to label junk columns correctly, instead. */ - if (!best_path->splitUpdate) + if (!best_path->splitUpdate && !root->merge_need_split_update) apply_tlist_labeling(subplan->targetlist, root->processed_tlist); } @@ -3669,6 +3669,7 @@ create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path) int lastresno; Oid *hashFuncs; int i; + int nrels; // RelOptInfo *relOptInfo = root->simple_rel_array[linitial_int(path->resultRelations)]; @@ -3703,20 +3704,92 @@ create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path) copy_generic_path_info(&splitmerge->plan, (Path *) path); - lc = list_head(subplan->targetlist); lastresno = 0; - /* Copy all attributes. */ - for (; lc != NULL; lc = lnext(subplan->targetlist, lc)) + if (path->hasSplitUpdate) { - TargetEntry *tle = (TargetEntry *) lfirst(lc); - TargetEntry *newtle; + /* + * When hasSplitUpdate, build a targetlist with target table columns + * as non-junk entries (positions 1..N), followed by all subplan + * entries as junk, plus a DMLAction junk column. + * + * This allows ModifyTable to use ExecGetInsertNewTuple to extract + * the target table columns for DML_INSERT tuples. + */ + int natts = resultDesc->natts; - newtle = makeTargetEntry(tle->expr, - ++lastresno, - tle->resname, - tle->resjunk); - splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, newtle); + /* + * First: target table columns as non-junk. + * Use Var nodes (not Const) so that ExecInitInsertProjection + * on the Motion output can extract actual values at runtime. + * These Vars use a dummy varno (0) that set_splitmerge_tlist_references + * will convert to OUTER_VAR with varattno = tle->resno. + */ + for (i = 0; i < natts; i++) + { + Form_pg_attribute attr = &resultDesc->attrs[i]; + TargetEntry *tle; + + if (attr->attisdropped) + { + Const *nullConst = makeConst(INT4OID, -1, InvalidOid, + sizeof(int32), (Datum) 0, + true, true); + tle = makeTargetEntry((Expr *) nullConst, + ++lastresno, + pstrdup(NameStr(attr->attname)), + false); + } + else + { + Var *var = makeVar(0, /* dummy varno, fixed in setrefs */ + (AttrNumber) (i + 1), + attr->atttypid, + attr->atttypmod, + attr->attcollation, + 0); + tle = makeTargetEntry((Expr *) var, + ++lastresno, + pstrdup(NameStr(attr->attname)), + false); + } + splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, tle); + } + + /* Then: all subplan entries as junk */ + foreach(lc, subplan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + ++lastresno, + tle->resname, + true); /* mark as junk */ + splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, newtle); + } + + /* Finally: DMLAction junk column */ + splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, + makeTargetEntry((Expr *) makeNode(DMLActionExpr), + ++lastresno, + "DMLAction", + true)); + } + else + { + /* Without hasSplitUpdate: copy subplan targetlist as-is */ + foreach(lc, subplan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + ++lastresno, + tle->resname, + tle->resjunk); + splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, newtle); + } } /* Look up the right hash functions for the hash expressions */ @@ -3737,6 +3810,50 @@ create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path) splitmerge->hashFuncs = hashFuncs; splitmerge->numHashSegments = cdbpolicy->numsegments; + splitmerge->resultRelations = path->resultRelations; + splitmerge->hasSplitUpdate = path->hasSplitUpdate; + splitmerge->rootResultRelation = relOptInfo->relid; + + /* + * Always use the root table's action lists (not per-partition adjusted + * lists). SplitMerge's hashAttnos use root table attribute numbers + * (from cdbpolicy->attrs), so the INSERT projection must produce values + * in root table column order for evalHashKey to read the correct columns. + * Per-partition lists from adjust_appendrel_attrs_multilevel reorder + * resnos to match child partition layout, which would cause hash + * computation on wrong columns for partitions with reordered columns. + */ + nrels = list_length(path->resultRelations); + splitmerge->mergeActionLists = NIL; + for (i = 0; i < nrels; i++) + splitmerge->mergeActionLists = lappend(splitmerge->mergeActionLists, + copyObject(root->parse->mergeActionList)); + + /* + * For split-update MERGE, expand UPDATE action targetlists to include + * all target table columns (not just the SET columns). SplitMerge needs + * complete rows to project INSERT tuples. Uses root table RTI for Vars + * so they match the subplan output. + */ + if (path->hasSplitUpdate) + { + ListCell *lca; + foreach(lca, splitmerge->mergeActionLists) + { + List *actionList = lfirst(lca); + ListCell *lc2; + foreach(lc2, actionList) + { + MergeAction *action = (MergeAction *) lfirst(lc2); + if (action->commandType == CMD_UPDATE) + action->targetList = expand_insert_targetlist(root, + action->targetList, + resultRel, + relOptInfo->relid); + } + } + } + relation_close(resultRel, NoLock); /* @@ -3745,9 +3862,6 @@ create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path) */ root->numMotions++; - splitmerge->mergeActionLists = path->mergeActionLists; - splitmerge->resultRelations = path->resultRelations; - return (Plan *) splitmerge; } @@ -8908,7 +9022,7 @@ cdbpathtoplan_create_motion_plan(PlannerInfo *root, * the DMLActionExpr column, so we cannot apply the * labeling here even if we wanted. */ - if (!IsA(subplan, SplitUpdate)) + if (!IsA(subplan, SplitUpdate) && !IsA(subplan, SplitMerge)) apply_tlist_labeling(subplan->targetlist, root->processed_tlist); segmentid_tle = find_junk_tle(subplan->targetlist, "gp_segment_id"); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 67d03390c61..99c5b1372a6 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -186,6 +186,7 @@ static Node *convert_combining_aggrefs(Node *node, void *context); static Node *convert_deduplicated_aggrefs(Node *node, void *context); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static void set_splitupdate_tlist_references(Plan *plan, int rtoffset); +static void set_splitmerge_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); static Var *search_indexed_tlist_for_var(Var *var, indexed_tlist *itlist, @@ -1630,8 +1631,61 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) set_splitupdate_tlist_references(plan, rtoffset); break; case T_SplitMerge: - /* mergeActionLists will be process in T_ModifyTable */ - set_dummy_tlist_references(plan, rtoffset); + { + SplitMerge *sm = (SplitMerge *) plan; + Plan *childplan = plan->lefttree; + + set_splitmerge_tlist_references(plan, rtoffset); + + /* + * Fix SplitMerge's own mergeActionLists to reference the + * child plan's output positions. This is separate from + * ModifyTable's mergeActionLists which reference the + * Motion output. + */ + if (sm->mergeActionLists != NIL && childplan != NULL) + { + indexed_tlist *itlist; + ListCell *lca; + + itlist = build_tlist_index(childplan->targetlist); + + foreach(lca, sm->mergeActionLists) + { + List *mergeActionList = lfirst(lca); + ListCell *lc2; + + /* + * SplitMerge always uses root table action lists, + * so action Vars reference root table attributes. + * Use root table RTI as acceptable_rel. + */ + Index acceptable_rel = sm->rootResultRelation; + + foreach(lc2, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(lc2); + + action->targetList = fix_join_expr(root, + action->targetList, + NULL, itlist, + acceptable_rel, + rtoffset, + NRM_EQUAL, + NUM_EXEC_TLIST(plan)); + + action->qual = (Node *) fix_join_expr(root, + (List *) action->qual, + NULL, itlist, + acceptable_rel, + rtoffset, + NRM_EQUAL, + NUM_EXEC_QUAL(plan)); + } + } + pfree(itlist); + } + } break; default: elog(ERROR, "unrecognized node type: %d", @@ -3185,7 +3239,91 @@ set_splitupdate_tlist_references(Plan *plan, int rtoffset) /* We don't touch plan->qual here */ } +/* + * set_splitmerge_tlist_references + * Like set_splitupdate_tlist_references, but handles the case where + * hasSplitUpdate prepends N target-table columns before the child plan's + * columns. Non-junk entries (positions 1..N) become OUTER_VAR references + * to themselves (tle->resno), allowing the Motion node to pass them + * through and ModifyTable to extract them via projection. + * Junk entries (positions N+1..N+M) become OUTER_VAR references to + * the child plan's output positions (tracked via child_resno). + * Const entries (dropped columns) and DMLActionExpr are kept as-is. + */ +static void +set_splitmerge_tlist_references(Plan *plan, int rtoffset) +{ + List *output_targetlist; + ListCell *l; + int child_resno = 0; + output_targetlist = NIL; + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Var *oldvar = (Var *) tle->expr; + Var *newvar; + + if (IsA(tle->expr, DMLActionExpr)) + { + output_targetlist = lappend(output_targetlist, tle); + continue; + } + else if (IsA(tle->expr, Const)) + { + /* Dropped column placeholder - keep as Const */ + output_targetlist = lappend(output_targetlist, tle); + continue; + } + + if (!tle->resjunk) + { + /* + * Non-junk entry (target table column, position 1..N). + * Create OUTER_VAR reference to tle->resno (self-referencing). + * At runtime, SplitMerge fills these with actual INSERT values. + * The Motion node passes them through, and ModifyTable extracts them. + */ + newvar = makeVar(OUTER_VAR, + tle->resno, + exprType((Node *) oldvar), + exprTypmod((Node *) oldvar), + exprCollation((Node *) oldvar), + 0); + newvar->varnosyn = 0; + newvar->varattnosyn = 0; + } + else + { + /* + * Junk entry (subplan column, position N+1..N+M). + * Create OUTER_VAR reference to the child plan's output position. + */ + child_resno++; + newvar = makeVar(OUTER_VAR, + child_resno, + exprType((Node *) oldvar), + exprTypmod((Node *) oldvar), + exprCollation((Node *) oldvar), + 0); + if (IsA(oldvar, Var)) + { + newvar->varnosyn = oldvar->varnosyn + rtoffset; + newvar->varattnosyn = oldvar->varattnosyn; + } + else + { + newvar->varnosyn = 0; + newvar->varattnosyn = 0; + } + } + + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newvar; + output_targetlist = lappend(output_targetlist, tle); + } + plan->targetlist = output_targetlist; +} /* * build_tlist_index --- build an index data structure for a child tlist diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index ca2dd95f284..5625cae3854 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -60,7 +60,7 @@ static List *supplement_simply_updatable_targetlist(PlannerInfo *root, List *range_table, List *tlist); -static List *expand_insert_targetlist(PlannerInfo *root, List *tlist, Relation rel, Index split_update_result_relation); +List *expand_insert_targetlist(PlannerInfo *root, List *tlist, Relation rel, Index split_update_result_relation); @@ -143,15 +143,48 @@ preprocess_targetlist(PlannerInfo *root) } else if (command_type == CMD_MERGE) { - /* update distributed column in merge is not supported now */ + /* Check if any MERGE UPDATE action modifies distribution key columns */ foreach(lc, parse->mergeActionList) { MergeAction *action = lfirst(lc); - if(action->commandType == CMD_UPDATE) + if (action->commandType == CMD_UPDATE && + check_splitupdate(action->targetList, result_relation, target_relation)) { - if(check_splitupdate(action->targetList, result_relation, target_relation)) - ereport(ERROR, (errcode(ERRCODE_GP_FEATURE_NOT_YET), - errmsg("cannot update column in merge with distributed column"))); + root->merge_need_split_update = true; + break; + } + } + + /* + * When merge_need_split_update, the SplitMerge node needs all target + * table columns in the subplan output so that the UPDATE projection + * can read unchanged columns from the old row. Add all non-dropped + * user columns of the target table to the subplan targetlist. + */ + if (root->merge_need_split_update) + { + int natts = RelationGetNumberOfAttributes(target_relation); + + for (int attno = 1; attno <= natts; attno++) + { + Form_pg_attribute att = TupleDescAttr(target_relation->rd_att, + attno - 1); + Var *var; + + if (att->attisdropped) + continue; + + var = makeVar(result_relation, attno, + att->atttypid, att->atttypmod, + att->attcollation, 0); + + if (tlist_member((Expr *) var, tlist)) + continue; /* already present */ + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, true)); } } } @@ -225,8 +258,9 @@ preprocess_targetlist(PlannerInfo *root) Var *var = (Var *) lfirst(l2); TargetEntry *tle; - if (IsA(var, Var) && var->varno == result_relation) - continue; /* don't need it */ + if (IsA(var, Var) && var->varno == result_relation && + !root->merge_need_split_update) + continue; /* don't need it unless split update */ if (tlist_member((Expr *) var, tlist)) continue; /* already got it */ @@ -411,7 +445,7 @@ extract_update_targetlist_colnos(List *tlist, bool reorder_resno) * Once upon a time we also did more or less this with UPDATE targetlists, * but now this code is only applied to INSERT targetlists. */ -static List * +List * expand_insert_targetlist(PlannerInfo *root, List *tlist, Relation rel, Index split_update_result_relation) { List *new_tlist = NIL; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index d6b96ebe88e..ce0c00347be 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -3413,6 +3413,13 @@ typedef struct SplitMergeState AttrNumber mt_resultOidAttno; + /* Fields for split update in MERGE */ + TupleTableSlot *insertTuple; /* pending INSERT tuple for split update */ + TupleTableSlot *deleteTuple; /* DELETE tuple for split update */ + bool processInsert; /* true = next call returns insertTuple */ + AttrNumber action_attno; /* attribute number of DMLAction column in output */ + int subplan_offset; /* number of target table columns prepended to output */ + } SplitMergeState; /* diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 9face599e2a..d5556d66c4b 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -663,6 +663,8 @@ struct PlannerInfo int upd_del_replicated_table; bool is_split_update; /* true if UPDATE that modifies * distribution key columns */ + bool merge_need_split_update; /* true if MERGE has UPDATE that + * modifies distribution key columns */ bool is_correlated_subplan; /* true for correlated subqueries nested within subplans */ int numPureOrderedAggs; /* CDB: number that use ORDER BY/WITHIN GROUP, not counting DISTINCT */ @@ -2808,6 +2810,7 @@ typedef struct SplitMergePath List *resultRelations; List *mergeActionLists; /* per-target-table lists of actions for * MERGE */ + bool hasSplitUpdate; /* true if UPDATE modifies distribution key */ } SplitMergePath; /* diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 00ebe7f493e..f759a976ec6 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1956,6 +1956,8 @@ typedef struct SplitMerge List *mergeActionLists; /* per-target-table lists of actions for * MERGE */ + bool hasSplitUpdate; /* true if UPDATE modifies distribution key */ + Index rootResultRelation; /* root table RTI for partitioned tables */ } SplitMerge; /* diff --git a/src/include/optimizer/prep.h b/src/include/optimizer/prep.h index 0526d7c0526..dcc247f6256 100644 --- a/src/include/optimizer/prep.h +++ b/src/include/optimizer/prep.h @@ -18,6 +18,7 @@ #include "nodes/pathnodes.h" #include "nodes/plannodes.h" +#include "utils/relcache.h" /* @@ -44,6 +45,10 @@ extern void preprocess_targetlist(PlannerInfo *root); extern List *extract_update_targetlist_colnos(List *tlist, bool reorder_resno); +extern List *expand_insert_targetlist(PlannerInfo *root, List *tlist, + Relation rel, + Index split_update_result_relation); + extern PlanRowMark *get_plan_rowmark(List *rowmarks, Index rtindex); /* diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index d6568e4b271..18a8a6a3eb0 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -1847,7 +1847,7 @@ MERGE INTO pa_target t ON t.tid = s.sid AND t.tid IN (1,2,3,4) WHEN MATCHED THEN UPDATE SET tid = tid - 1; -ERROR: cannot update column in merge with distributed column +ERROR: new row violates row-level security policy for table "pa_target" ROLLBACK; DROP TABLE pa_source; DROP TABLE pa_target CASCADE; @@ -2233,6 +2233,259 @@ DROP TABLE test1; DROP TABLE target, target2; DROP TABLE source, source2; DROP FUNCTION merge_trigfunc(); +-- Test MERGE with distribution key updates (split-update) +-- These tests verify that MERGE ... WHEN MATCHED THEN UPDATE SET dist_col = ... +-- works correctly by using the SplitMerge mechanism (DELETE old + INSERT new). +-- Basic: update distribution key with constant +CREATE TABLE merge_dist_t (id int, val text) DISTRIBUTED BY (id); +CREATE TABLE merge_dist_s (id int, val text) DISTRIBUTED BY (id); +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'new'); +-- Check segment before merge +SELECT gp_segment_id, * FROM merge_dist_t ORDER BY id; + gp_segment_id | id | val +---------------+----+----- + 1 | 1 | old +(1 row) + +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = 101, val = s.val; +-- Check segment after merge: should match direct INSERT of 101 +SELECT gp_segment_id, * FROM merge_dist_t ORDER BY id; + gp_segment_id | id | val +---------------+-----+----- + 0 | 101 | new +(1 row) + +INSERT INTO merge_dist_t VALUES (101, 'direct'); +SELECT gp_segment_id, * FROM merge_dist_t WHERE id = 101 ORDER BY val; + gp_segment_id | id | val +---------------+-----+-------- + 0 | 101 | direct + 0 | 101 | new +(2 rows) + +DELETE FROM merge_dist_t WHERE val = 'direct'; +-- EXPLAIN VERBOSE: SplitMerge plan for simple table dist key update +EXPLAIN (VERBOSE, COSTS OFF) +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------- + Merge on public.merge_dist_t t + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + Output: t.id, t.val, t.id, t.val, t.ctid, t.gp_segment_id, s.val, s.id, s.ctid, (DMLAction) + -> Split Merge + Output: t.id, t.val, t.id, t.val, t.ctid, t.gp_segment_id, s.val, s.id, s.ctid, DMLAction + -> Hash Left Join + Output: t.id, t.val, t.ctid, t.gp_segment_id, s.val, s.id, s.ctid + Hash Cond: (s.id = t.id) + -> Seq Scan on public.merge_dist_s s + Output: s.val, s.id, s.ctid + -> Hash + Output: t.id, t.val, t.ctid, t.gp_segment_id + -> Seq Scan on public.merge_dist_t t + Output: t.id, t.val, t.ctid, t.gp_segment_id + Optimizer: Postgres query optimizer +(15 rows) + +-- Update distribution key with expression referencing old row +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'a'), (2, 'b'); +INSERT INTO merge_dist_s VALUES (1, 'x'), (2, 'y'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val; +SELECT * FROM merge_dist_t ORDER BY id; + id | val +-----+----- + 101 | x + 102 | y +(2 rows) + +-- Mixed: MATCHED update dist key + NOT MATCHED insert +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'new'), (2, 'two'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +SELECT * FROM merge_dist_t ORDER BY id; + id | val +-----+----- + 2 | two + 101 | new +(2 rows) + +-- Multi-column distribution key: only update one dist column +CREATE TABLE merge_dist_mc (a int, b int, val text) DISTRIBUTED BY (a, b); +CREATE TABLE merge_dist_mc_s (a int, b int, val text) DISTRIBUTED BY (a); +INSERT INTO merge_dist_mc VALUES (1, 1, 'old'); +INSERT INTO merge_dist_mc_s VALUES (1, 1, 'new'); +MERGE INTO merge_dist_mc t USING merge_dist_mc_s s ON t.a = s.a AND t.b = s.b +WHEN MATCHED THEN UPDATE SET a = t.a + 100, val = s.val; +SELECT * FROM merge_dist_mc ORDER BY a; + a | b | val +-----+---+----- + 101 | 1 | new +(1 row) + +-- Conditional WHEN: first clause updates dist key, second does not +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'one'), (2, 'two'); +INSERT INTO merge_dist_s VALUES (1, 'new1'), (2, 'new2'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED AND t.id = 1 THEN UPDATE SET id = t.id + 100, val = s.val +WHEN MATCHED THEN UPDATE SET val = s.val; +SELECT * FROM merge_dist_t ORDER BY id; + id | val +-----+------ + 2 | new2 + 101 | new1 +(2 rows) + +-- Normal MERGE still works alongside split-update tests +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'updated'), (2, 'inserted'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +SELECT * FROM merge_dist_t ORDER BY id; + id | val +----+---------- + 1 | updated + 2 | inserted +(2 rows) + +DROP TABLE merge_dist_t, merge_dist_s; +DROP TABLE merge_dist_mc, merge_dist_mc_s; +-- Partitioned table: MERGE with distribution key update +CREATE TABLE merge_dist_pa_s (sid integer, delta float) DISTRIBUTED BY (sid); +INSERT INTO merge_dist_pa_s VALUES (1, 5), (2, 10), (4, 30); +CREATE TABLE merge_dist_pa_t (tid integer, balance float, val text) + PARTITION BY LIST (tid) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p1 (tid integer, balance float, val text) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p2 (balance float, tid integer, val text) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p4 (extraid text, tid integer, balance float, val text) DISTRIBUTED BY (tid); +ALTER TABLE merge_dist_pa_p4 DROP COLUMN extraid; +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p1 FOR VALUES IN (0, 1); +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p2 FOR VALUES IN (2, 3); +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p4 FOR VALUES IN (4, 5); +INSERT INTO merge_dist_pa_t VALUES (1, 100, 'p1'), (2, 200, 'p2'), (4, 400, 'p4'); +-- EXPLAIN VERBOSE: SplitMerge plan for partitioned table dist key update +EXPLAIN (VERBOSE, COSTS OFF) +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1 +WHEN NOT MATCHED THEN INSERT VALUES (s.sid, s.delta, 'new'); + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------- + Merge on public.merge_dist_pa_t t + Merge on public.merge_dist_pa_p1 t_1 + Merge on public.merge_dist_pa_p2 t_2 + Merge on public.merge_dist_pa_p4 t_3 + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + Output: t.tid, t.balance, t.val, t.tid, t.balance, t.val, s.sid, s.delta, s.ctid, t.tableoid, t.ctid, t.gp_segment_id, (DMLAction) + -> Split Merge + Output: t.tid, t.balance, t.val, t.tid, t.balance, t.val, s.sid, s.delta, s.ctid, t.tableoid, t.ctid, t.gp_segment_id, DMLAction + -> Hash Right Join + Output: t.tid, t.balance, t.val, s.sid, s.delta, s.ctid, t.tableoid, t.ctid, t.gp_segment_id + Hash Cond: (t.tid = s.sid) + -> Append + Partition Selectors: $1 + -> Seq Scan on public.merge_dist_pa_p1 t_1 + Output: t_1.tid, t_1.balance, t_1.val, t_1.tableoid, t_1.ctid, t_1.gp_segment_id + -> Seq Scan on public.merge_dist_pa_p2 t_2 + Output: t_2.tid, t_2.balance, t_2.val, t_2.tableoid, t_2.ctid, t_2.gp_segment_id + -> Seq Scan on public.merge_dist_pa_p4 t_3 + Output: t_3.tid, t_3.balance, t_3.val, t_3.tableoid, t_3.ctid, t_3.gp_segment_id + -> Hash + Output: s.sid, s.delta, s.ctid + -> Partition Selector (selector id: $1) + Output: s.sid, s.delta, s.ctid + -> Seq Scan on public.merge_dist_pa_s s + Output: s.sid, s.delta, s.ctid + Optimizer: Postgres query optimizer +(26 rows) + +-- Update distribution key on partitioned table with reordered/dropped columns +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1; +SELECT gp_segment_id, tableoid::regclass, * FROM merge_dist_pa_t ORDER BY tid; + gp_segment_id | tableoid | tid | balance | val +---------------+------------------+-----+---------+----- + 1 | merge_dist_pa_p1 | 0 | 100 | p1 + 1 | merge_dist_pa_p1 | 1 | 200 | p2 + 0 | merge_dist_pa_p2 | 3 | 400 | p4 +(3 rows) + +-- Verify segment placement matches direct INSERT +CREATE TABLE merge_dist_pa_verify (tid integer, balance float, val text) DISTRIBUTED BY (tid); +INSERT INTO merge_dist_pa_verify VALUES (0, 100, 'p1'), (1, 200, 'p2'), (3, 400, 'p4'); +SELECT gp_segment_id, * FROM merge_dist_pa_verify ORDER BY tid; + gp_segment_id | tid | balance | val +---------------+-----+---------+----- + 1 | 0 | 100 | p1 + 1 | 1 | 200 | p2 + 0 | 3 | 400 | p4 +(3 rows) + +DROP TABLE merge_dist_pa_verify; +-- Mixed: dist key update + insert on partitioned table +TRUNCATE merge_dist_pa_t; +INSERT INTO merge_dist_pa_t VALUES (1, 100, 'p1'), (2, 200, 'p2'); +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1 +WHEN NOT MATCHED THEN INSERT VALUES (s.sid, s.delta, 'new'); +SELECT * FROM merge_dist_pa_t ORDER BY tid; +NOTICE: One or more columns in the following table(s) do not have statistics: merge_dist_pa_t +HINT: For non-partitioned tables, run analyze (). For partitioned tables, run analyze rootpartition (). See log for columns missing statistics. + tid | balance | val +-----+---------+----- + 0 | 100 | p1 + 1 | 200 | p2 + 4 | 30 | new +(3 rows) + +DROP TABLE merge_dist_pa_t CASCADE; +DROP TABLE merge_dist_pa_s; +-- MERGE with dist key update + INSERT trigger (not UPDATE trigger) +-- INSERT triggers should fire; UPDATE triggers would block split merge +CREATE TABLE merge_trig_dist (id int, val text) DISTRIBUTED BY (id); +CREATE TABLE merge_trig_dist_s (id int, val text) DISTRIBUTED BY (id); +CREATE OR REPLACE FUNCTION merge_trig_dist_fn() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE '% trigger on %, id=%, val=%', TG_OP, TG_TABLE_NAME, NEW.id, NEW.val; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +-- Only INSERT trigger, no UPDATE trigger — split merge is allowed +CREATE TRIGGER merge_trig_bi BEFORE INSERT ON merge_trig_dist + FOR EACH ROW EXECUTE FUNCTION merge_trig_dist_fn(); +INSERT INTO merge_trig_dist VALUES (1, 'old'); +NOTICE: INSERT trigger on merge_trig_dist, id=1, val=old +INSERT INTO merge_trig_dist_s VALUES (1, 'new'), (2, 'two'); +-- dist key update + insert, INSERT trigger should fire for new rows +MERGE INTO merge_trig_dist t USING merge_trig_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +NOTICE: INSERT trigger on merge_trig_dist, id=2, val=two +SELECT * FROM merge_trig_dist ORDER BY id; + id | val +-----+----- + 2 | two + 101 | new +(2 rows) + +-- Now add UPDATE trigger — split merge should be blocked +CREATE TRIGGER merge_trig_bu BEFORE UPDATE ON merge_trig_dist + FOR EACH ROW EXECUTE FUNCTION merge_trig_dist_fn(); +MERGE INTO merge_trig_dist t USING merge_trig_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +ERROR: UPDATE on distributed key column not allowed on relation with update triggers +DROP TABLE merge_trig_dist, merge_trig_dist_s; +DROP FUNCTION merge_trig_dist_fn; REVOKE ALL ON SCHEMA public FROM regress_merge_privs; DROP USER regress_merge_privs; DROP USER regress_merge_no_privs; diff --git a/src/test/regress/expected/privileges.out b/src/test/regress/expected/privileges.out index 6412e900473..4de1b96726e 100644 --- a/src/test/regress/expected/privileges.out +++ b/src/test/regress/expected/privileges.out @@ -1023,7 +1023,7 @@ WHEN MATCHED THEN UPDATE SET b = s.b, a = t.a + 1 WHEN NOT MATCHED THEN INSERT VALUES (a, b); -ERROR: cannot update column in merge with distributed column +ERROR: permission denied for table mtarget -- fail (no SELECT on t.b) MERGE INTO mtarget t USING msource s ON t.a = s.a WHEN MATCHED AND t.b IS NOT NULL THEN diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out index dceeca83b42..cfe30213241 100644 --- a/src/test/regress/expected/triggers.out +++ b/src/test/regress/expected/triggers.out @@ -2217,7 +2217,7 @@ ERROR: UPDATE on distributed key column not allowed on relation with update tri -- update action in merge should behave the same merge into parted_trig using (select 1) as ss on true when matched and a = 2 then update set a = 1; -ERROR: cannot update column in merge with distributed column +ERROR: UPDATE on distributed key column not allowed on relation with update triggers drop table parted_trig; -- Verify propagation of trigger arguments to partitions create table parted_trig (a int) partition by list (a); diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 4e99ef36bfd..a22a0805ec7 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1555,6 +1555,158 @@ DROP TABLE test1; DROP TABLE target, target2; DROP TABLE source, source2; DROP FUNCTION merge_trigfunc(); + +-- Test MERGE with distribution key updates (split-update) +-- These tests verify that MERGE ... WHEN MATCHED THEN UPDATE SET dist_col = ... +-- works correctly by using the SplitMerge mechanism (DELETE old + INSERT new). + +-- Basic: update distribution key with constant +CREATE TABLE merge_dist_t (id int, val text) DISTRIBUTED BY (id); +CREATE TABLE merge_dist_s (id int, val text) DISTRIBUTED BY (id); +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'new'); +-- Check segment before merge +SELECT gp_segment_id, * FROM merge_dist_t ORDER BY id; +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = 101, val = s.val; +-- Check segment after merge: should match direct INSERT of 101 +SELECT gp_segment_id, * FROM merge_dist_t ORDER BY id; +INSERT INTO merge_dist_t VALUES (101, 'direct'); +SELECT gp_segment_id, * FROM merge_dist_t WHERE id = 101 ORDER BY val; +DELETE FROM merge_dist_t WHERE val = 'direct'; + +-- EXPLAIN VERBOSE: SplitMerge plan for simple table dist key update +EXPLAIN (VERBOSE, COSTS OFF) +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); + +-- Update distribution key with expression referencing old row +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'a'), (2, 'b'); +INSERT INTO merge_dist_s VALUES (1, 'x'), (2, 'y'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val; +SELECT * FROM merge_dist_t ORDER BY id; + +-- Mixed: MATCHED update dist key + NOT MATCHED insert +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'new'), (2, 'two'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +SELECT * FROM merge_dist_t ORDER BY id; + +-- Multi-column distribution key: only update one dist column +CREATE TABLE merge_dist_mc (a int, b int, val text) DISTRIBUTED BY (a, b); +CREATE TABLE merge_dist_mc_s (a int, b int, val text) DISTRIBUTED BY (a); +INSERT INTO merge_dist_mc VALUES (1, 1, 'old'); +INSERT INTO merge_dist_mc_s VALUES (1, 1, 'new'); +MERGE INTO merge_dist_mc t USING merge_dist_mc_s s ON t.a = s.a AND t.b = s.b +WHEN MATCHED THEN UPDATE SET a = t.a + 100, val = s.val; +SELECT * FROM merge_dist_mc ORDER BY a; + +-- Conditional WHEN: first clause updates dist key, second does not +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'one'), (2, 'two'); +INSERT INTO merge_dist_s VALUES (1, 'new1'), (2, 'new2'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED AND t.id = 1 THEN UPDATE SET id = t.id + 100, val = s.val +WHEN MATCHED THEN UPDATE SET val = s.val; +SELECT * FROM merge_dist_t ORDER BY id; + +-- Normal MERGE still works alongside split-update tests +TRUNCATE merge_dist_t, merge_dist_s; +INSERT INTO merge_dist_t VALUES (1, 'old'); +INSERT INTO merge_dist_s VALUES (1, 'updated'), (2, 'inserted'); +MERGE INTO merge_dist_t t USING merge_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +SELECT * FROM merge_dist_t ORDER BY id; + +DROP TABLE merge_dist_t, merge_dist_s; +DROP TABLE merge_dist_mc, merge_dist_mc_s; + +-- Partitioned table: MERGE with distribution key update +CREATE TABLE merge_dist_pa_s (sid integer, delta float) DISTRIBUTED BY (sid); +INSERT INTO merge_dist_pa_s VALUES (1, 5), (2, 10), (4, 30); + +CREATE TABLE merge_dist_pa_t (tid integer, balance float, val text) + PARTITION BY LIST (tid) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p1 (tid integer, balance float, val text) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p2 (balance float, tid integer, val text) DISTRIBUTED BY (tid); +CREATE TABLE merge_dist_pa_p4 (extraid text, tid integer, balance float, val text) DISTRIBUTED BY (tid); +ALTER TABLE merge_dist_pa_p4 DROP COLUMN extraid; + +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p1 FOR VALUES IN (0, 1); +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p2 FOR VALUES IN (2, 3); +ALTER TABLE merge_dist_pa_t ATTACH PARTITION merge_dist_pa_p4 FOR VALUES IN (4, 5); + +INSERT INTO merge_dist_pa_t VALUES (1, 100, 'p1'), (2, 200, 'p2'), (4, 400, 'p4'); + +-- EXPLAIN VERBOSE: SplitMerge plan for partitioned table dist key update +EXPLAIN (VERBOSE, COSTS OFF) +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1 +WHEN NOT MATCHED THEN INSERT VALUES (s.sid, s.delta, 'new'); + +-- Update distribution key on partitioned table with reordered/dropped columns +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1; +SELECT gp_segment_id, tableoid::regclass, * FROM merge_dist_pa_t ORDER BY tid; +-- Verify segment placement matches direct INSERT +CREATE TABLE merge_dist_pa_verify (tid integer, balance float, val text) DISTRIBUTED BY (tid); +INSERT INTO merge_dist_pa_verify VALUES (0, 100, 'p1'), (1, 200, 'p2'), (3, 400, 'p4'); +SELECT gp_segment_id, * FROM merge_dist_pa_verify ORDER BY tid; +DROP TABLE merge_dist_pa_verify; + +-- Mixed: dist key update + insert on partitioned table +TRUNCATE merge_dist_pa_t; +INSERT INTO merge_dist_pa_t VALUES (1, 100, 'p1'), (2, 200, 'p2'); +MERGE INTO merge_dist_pa_t t USING merge_dist_pa_s s ON t.tid = s.sid +WHEN MATCHED THEN UPDATE SET tid = tid - 1 +WHEN NOT MATCHED THEN INSERT VALUES (s.sid, s.delta, 'new'); +SELECT * FROM merge_dist_pa_t ORDER BY tid; + +DROP TABLE merge_dist_pa_t CASCADE; +DROP TABLE merge_dist_pa_s; + +-- MERGE with dist key update + INSERT trigger (not UPDATE trigger) +-- INSERT triggers should fire; UPDATE triggers would block split merge +CREATE TABLE merge_trig_dist (id int, val text) DISTRIBUTED BY (id); +CREATE TABLE merge_trig_dist_s (id int, val text) DISTRIBUTED BY (id); +CREATE OR REPLACE FUNCTION merge_trig_dist_fn() RETURNS trigger AS $$ +BEGIN + RAISE NOTICE '% trigger on %, id=%, val=%', TG_OP, TG_TABLE_NAME, NEW.id, NEW.val; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +-- Only INSERT trigger, no UPDATE trigger — split merge is allowed +CREATE TRIGGER merge_trig_bi BEFORE INSERT ON merge_trig_dist + FOR EACH ROW EXECUTE FUNCTION merge_trig_dist_fn(); + +INSERT INTO merge_trig_dist VALUES (1, 'old'); +INSERT INTO merge_trig_dist_s VALUES (1, 'new'), (2, 'two'); + +-- dist key update + insert, INSERT trigger should fire for new rows +MERGE INTO merge_trig_dist t USING merge_trig_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); +SELECT * FROM merge_trig_dist ORDER BY id; + +-- Now add UPDATE trigger — split merge should be blocked +CREATE TRIGGER merge_trig_bu BEFORE UPDATE ON merge_trig_dist + FOR EACH ROW EXECUTE FUNCTION merge_trig_dist_fn(); + +MERGE INTO merge_trig_dist t USING merge_trig_dist_s s ON t.id = s.id +WHEN MATCHED THEN UPDATE SET id = t.id + 100, val = s.val +WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val); + +DROP TABLE merge_trig_dist, merge_trig_dist_s; +DROP FUNCTION merge_trig_dist_fn; + REVOKE ALL ON SCHEMA public FROM regress_merge_privs; DROP USER regress_merge_privs; DROP USER regress_merge_no_privs;