Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ WHEN MATCHED THEN
UPDATE SET b = s.b, a = t.a + 1
WHEN NOT MATCHED THEN
INSERT VALUES (a, b);
ERROR: cannot update column in merge with distributed column
ERROR: permission denied for table mtarget
-- fail (no SELECT on t.b)
MERGE INTO mtarget t USING msource s ON t.a = s.a
WHEN MATCHED AND t.b IS NOT NULL THEN
Expand Down
39 changes: 32 additions & 7 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static bool try_redistribute(PlannerInfo *root, CdbpathMfjRel *g,

static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti);

static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists);
static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists, bool hasSplitUpdate);

static bool can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, GpPolicy *policy);
/*
Expand Down Expand Up @@ -2798,19 +2798,22 @@ create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy
{
/*
* If merge contain CMD_INSERT, we need split merge to let new
* insert tuple redistributed to correct segment. otherwise, we
* create motion as the same as update/delete in create_motion_path_for_upddel
* insert tuple redistributed to correct segment. If merge has
* UPDATE that modifies distribution key, we also need split merge
* to handle the DELETE+INSERT split.
*/
foreach(l, mergeActionLists)
{
List *mergeActionList = lfirst(l);
foreach(lc, mergeActionList)
foreach(lc, mergeActionList)
{
MergeAction *action = lfirst(lc);
if (action->commandType == CMD_INSERT)
need_split_merge = true;
}
}
if (root->merge_need_split_update)
need_split_merge = true;

if (need_split_merge)
{
Expand All @@ -2820,7 +2823,7 @@ create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy
rel = build_simple_rel(root, linitial_int(resultRelations), NULL /*parent*/);
targetLocus = cdbpathlocus_from_baserel(root, rel, 0);

subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists);
subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists, root->merge_need_split_update);
subpath = cdbpath_create_explicit_motion_path(root,
subpath,
targetLocus);
Expand Down Expand Up @@ -2923,14 +2926,35 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
}

static SplitMergePath *
make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists)
make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists, bool hasSplitUpdate)
{
PathTarget *splitMergePathTarget;
SplitMergePath *splitmergepath;

splitMergePathTarget = copy_pathtarget(subpath->pathtarget);

/* populate information generated above into splitupdate node */
/*
* Same restriction as SplitUpdate: updating a distribution key
* is not allowed when the target relation has update triggers.
*/
if (hasSplitUpdate)
{
RangeTblEntry *rte = rt_fetch(root->parse->resultRelation,
root->parse->rtable);
if (has_update_triggers(rte->relid, true))
ereport(ERROR,
(errcode(ERRCODE_GP_FEATURE_NOT_YET),
errmsg("UPDATE on distributed key column not allowed on relation with update triggers")));
}

/* When hasSplitUpdate, add DMLAction column for split UPDATE handling */
if (hasSplitUpdate)
{
DMLActionExpr *actionExpr = makeNode(DMLActionExpr);
add_column_to_pathtarget(splitMergePathTarget, (Expr *) actionExpr, 0);
}

/* populate information generated above into splitmerge node */
splitmergepath = makeNode(SplitMergePath);
splitmergepath->path.pathtype = T_SplitMerge;
splitmergepath->path.parent = subpath->parent;
Expand All @@ -2947,6 +2971,7 @@ make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, L
splitmergepath->subpath = subpath;
splitmergepath->resultRelations = resultRelations;
splitmergepath->mergeActionLists = mergeActionLists;
splitmergepath->hasSplitUpdate = hasSplitUpdate;

return splitmergepath;
}
Expand Down
79 changes: 77 additions & 2 deletions src/backend/executor/nodeModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,7 @@ ExecInsert(ModifyTableContext *context,
*/
if (mtstate->operation == CMD_UPDATE)
wco_kind = WCO_RLS_UPDATE_CHECK;
else if (mtstate->operation == CMD_MERGE)
else if (mtstate->operation == CMD_MERGE && context->relaction != NULL)
wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ?
WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK;
else
Expand Down Expand Up @@ -4372,7 +4372,77 @@ ExecModifyTable(PlanState *pstate)
break;

case CMD_MERGE:
slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag);
if (action == (int) DML_INSERT)
{
/*
* Split merge INSERT: extract non-junk columns (positions
* 1..N) from the plan slot into a properly-typed insert
* slot. We can't use ExecGetInsertNewTuple because the
* SplitMerge's ExecInitMergeTupleSlots may have set
* ri_projectNewInfoValid without building the INSERT
* projection.
*/
ResultRelInfo *saved = resultRelInfo;
TupleTableSlot *insertSlot;
int natts;

resultRelInfo = node->rootResultRelInfo;
natts = RelationGetNumberOfAttributes(resultRelInfo->ri_RelationDesc);

/*
* Lazily set up partition tuple routing for split-update
* MERGE INSERT on partitioned tables.
*/
if (resultRelInfo->ri_RelationDesc->rd_rel->relkind ==
RELKIND_PARTITIONED_TABLE &&
node->mt_partition_tuple_routing == NULL)
{
node->mt_partition_tuple_routing =
ExecSetupPartitionTupleRouting(estate,
resultRelInfo->ri_RelationDesc);
}

insertSlot = resultRelInfo->ri_newTupleSlot;
if (insertSlot == NULL)
{
insertSlot = table_slot_create(resultRelInfo->ri_RelationDesc,
&estate->es_tupleTable);
resultRelInfo->ri_newTupleSlot = insertSlot;
}

ExecClearTuple(insertSlot);
slot_getallattrs(context.planSlot);
memcpy(insertSlot->tts_values, context.planSlot->tts_values,
natts * sizeof(Datum));
memcpy(insertSlot->tts_isnull, context.planSlot->tts_isnull,
natts * sizeof(bool));
ExecStoreVirtualTuple(insertSlot);

/* Set relaction to NULL to avoid ExecInsert dereferencing it */
context.relaction = NULL;

slot = ExecInsert(&context, resultRelInfo, insertSlot,
node->canSetTag, NULL, NULL,
true /* splitUpdate */);
resultRelInfo = saved;
}
else if (action == (int) DML_DELETE)
{
/*
* Split merge DELETE: delete the old tuple on this segment.
*/
slot = ExecDelete(&context, resultRelInfo, tupleid, oldtuple, segid,
false, /* processReturning */
false, /* changingPart */
node->canSetTag,
NULL, NULL, NULL,
true /* splitUpdate */);
}
else
{
/* Normal MERGE processing (no split or pass-through) */
slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag);
}
break;

default:
Expand Down Expand Up @@ -4661,6 +4731,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
mtstate->mt_action_attno =
ExecFindJunkAttributeInTlist(subplan->targetlist, "DMLAction");
}
else if (operation == CMD_MERGE)
{
mtstate->mt_action_attno =
ExecFindJunkAttributeInTlist(subplan->targetlist, "DMLAction");
}
}
/*
* Do additional per-result-relation initialization.
Expand Down
Loading
Loading