Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.optimizer.lineage;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -713,158 +714,178 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Obje
Operator<? extends OperatorDesc> inpOp = getParent(stack);
lCtx.getIndex().copyPredicates(inpOp, op);

Dependency dep = new Dependency();
DependencyType newType = DependencyType.EXPRESSION;
dep.setType(newType);

Set<String> columns = new HashSet<>();
PartitionedTableFunctionDef funcDef = op.getConf().getFuncDef();
StringBuilder sb = new StringBuilder();
WindowFrameDef windowFrameDef = null;

if (!(funcDef.getTFunction() instanceof Noop)) {

if (funcDef instanceof WindowTableFunctionDef) {
// function name
WindowFunctionDef windowFunctionDef = ((WindowTableFunctionDef) funcDef).getWindowFunctions().getFirst();
sb.append(windowFunctionDef.getName()).append("(");
int numWinFns = 1;
if (funcDef instanceof WindowTableFunctionDef) {
numWinFns = ((WindowTableFunctionDef) funcDef).getWindowFunctions().size();
}
List<Dependency> windowDeps = new ArrayList<>(numWinFns);

addArgs(sb, columns, lCtx, inpOp, op.getSchema(), windowFunctionDef.getArgs());
for (int winIdx = 0; winIdx < numWinFns; winIdx++) {
Dependency dep = new Dependency();
DependencyType newType = DependencyType.EXPRESSION;
dep.setType(newType);

windowFrameDef = windowFunctionDef.getWindowFrame();
Set<String> columns = new HashSet<>();
StringBuilder sb = new StringBuilder();
WindowFrameDef windowFrameDef = null;

if (sb.charAt(sb.length() - 2) == ',') {
sb.delete(sb.length() - 2, sb.length());
}
sb.append(")");
sb.append(" over (");
} else /* PartitionedTableFunctionDef */ {
// function name
sb.append(funcDef.getName()).append("(");
addArgs(sb, columns, lCtx, inpOp, funcDef.getRawInputShape().getRr().getRowSchema(), funcDef.getArgs());
if (!(funcDef.getTFunction() instanceof Noop)) {

// matchpath has argument pattern like matchpath(<input expression>, <argument methods: arg1(), arg2()...>)
if (funcDef.getInput() != null) {
sb.append("on ").append(funcDef.getInput().getAlias()).append(" ");
if (funcDef instanceof WindowTableFunctionDef) {
// function name
WindowFunctionDef windowFunctionDef = ((WindowTableFunctionDef) funcDef).getWindowFunctions().get(winIdx);
sb.append(windowFunctionDef.getName()).append("(");

int counter = 1;
for (PTFExpressionDef arg : funcDef.getArgs()) {
ExprNodeDesc exprNode = arg.getExprNode();
addArgs(sb, columns, lCtx, inpOp, op.getSchema(), windowFunctionDef.getArgs());

addIfNotNull(columns, exprNode.getCols());
windowFrameDef = windowFunctionDef.getWindowFrame();

sb.append("arg").append(counter++).append("(");
sb.append(ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), arg.getExprNode(), lCtx, inpOp, null));
sb.append("), ");
if (sb.charAt(sb.length() - 2) == ',') {
sb.delete(sb.length() - 2, sb.length());
}
sb.append(")");
sb.append(" over (");
} else /* PartitionedTableFunctionDef */ {
// function name
sb.append(funcDef.getName()).append("(");
addArgs(sb, columns, lCtx, inpOp, funcDef.getRawInputShape().getRr().getRowSchema(), funcDef.getArgs());

// matchpath has argument pattern like matchpath(<input expression>, <argument methods: arg1(), arg2()...>)
if (funcDef.getInput() != null) {
sb.append("on ").append(funcDef.getInput().getAlias()).append(" ");

int counter = 1;
for (PTFExpressionDef arg : funcDef.getArgs()) {
ExprNodeDesc exprNode = arg.getExprNode();

addIfNotNull(columns, exprNode.getCols());

sb.append("arg").append(counter++).append("(");
sb.append(
ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), arg.getExprNode(),
lCtx, inpOp, null));
sb.append("), ");
}

sb.delete(sb.length() - 2, sb.length());
sb.delete(sb.length() - 2, sb.length());
}
}
}
}

/*
Collect partition by and distribute by information.
Please note, at the expression node level, there is no difference between those.
That means distribute by gets a string partition by in the expression string.
*/
if (funcDef.getPartition() != null ) {
List<PTFExpressionDef> partitionExpressions = funcDef.getPartition().getExpressions();
if (funcDef.getPartition() != null) {
List<PTFExpressionDef> partitionExpressions = funcDef.getPartition().getExpressions();

boolean isPartitionByAdded = false;
for (PTFExpressionDef partitionExpr : partitionExpressions) {
ExprNodeDesc partitionExprNode = partitionExpr.getExprNode();
boolean isPartitionByAdded = false;
for (PTFExpressionDef partitionExpr : partitionExpressions) {
ExprNodeDesc partitionExprNode = partitionExpr.getExprNode();

if (partitionExprNode.getCols() != null && !partitionExprNode.getCols().isEmpty()) {
if (!isPartitionByAdded) {
sb.append("partition by ");
isPartitionByAdded = true;
}
if (partitionExprNode.getCols() != null && !partitionExprNode.getCols().isEmpty()) {
if (!isPartitionByAdded) {
sb.append("partition by ");
isPartitionByAdded = true;
}

addIfNotNull(columns, partitionExprNode.getCols());
addIfNotNull(columns, partitionExprNode.getCols());

if (partitionExprNode instanceof ExprNodeColumnDesc) {
sb.append(ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), partitionExprNode, lCtx, inpOp, null));
sb.append(", ");
}
if (partitionExprNode instanceof ExprNodeColumnDesc) {
sb.append(
ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), partitionExprNode,
lCtx, inpOp, null));
sb.append(", ");
}

sb.delete(sb.length() - 2, sb.length());
sb.delete(sb.length() - 2, sb.length());
}
}
}

}
}

/*
Collects the order by and sort by information.
Please note, at the expression node level, there is no difference between those.
That means sort by gets a string partition by in the expression string.
*/
if (funcDef.getOrder() != null) {
if (funcDef.getOrder() != null) {
/*
Order by is sometimes added by the compiler to make the PTF call deterministic.
At this point of the code execution, we don't know if it is added by the compiler or
it was originally part of the query string.
*/
List<OrderExpressionDef> orderExpressions = funcDef.getOrder().getExpressions();
List<OrderExpressionDef> orderExpressions = funcDef.getOrder().getExpressions();

if (!sb.isEmpty() && sb.charAt(sb.length() - 1) != '(') {
sb.append(" ");
}
sb.append("order by ");
if (!sb.isEmpty() && sb.charAt(sb.length() - 1) != '(') {
sb.append(" ");
}
sb.append("order by ");

for (OrderExpressionDef orderExpr : orderExpressions) {
ExprNodeDesc orderExprNode = orderExpr.getExprNode();
addIfNotNull(columns, orderExprNode.getCols());
for (OrderExpressionDef orderExpr : orderExpressions) {
ExprNodeDesc orderExprNode = orderExpr.getExprNode();
addIfNotNull(columns, orderExprNode.getCols());

sb.append(ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), orderExprNode, lCtx, inpOp, null));
if (PTFInvocationSpec.Order.DESC.equals(orderExpr.getOrder())) {
sb.append(" desc");
sb.append(
ExprProcFactory.getExprString(funcDef.getRawInputShape().getRr().getRowSchema(), orderExprNode, lCtx,
inpOp, null));
if (PTFInvocationSpec.Order.DESC.equals(orderExpr.getOrder())) {
sb.append(" desc");
}
sb.append(", ");
}
sb.append(", ");
}

sb.delete(sb.length() - 2, sb.length());
}
sb.delete(sb.length() - 2, sb.length());
}

/*
Window frame is sometimes added by the compiler to make the PTF call deterministic.
At this point of the code execution, we don't know if it is added by the compiler or
it was originally part of the query string.
*/
if (windowFrameDef != null) {
sb.append(" ").append(windowFrameDef.getWindowType()).append(" between ");
if (windowFrameDef != null) {
sb.append(" ").append(windowFrameDef.getWindowType()).append(" between ");

appendBoundary(windowFrameDef.getStart(), sb, " preceding");
appendBoundary(windowFrameDef.getStart(), sb, " preceding");

sb.append(" and ");
sb.append(" and ");

appendBoundary(windowFrameDef.getEnd(), sb, " following");
}
appendBoundary(windowFrameDef.getEnd(), sb, " following");
}

sb.append(")");
dep.setExpr(sb.toString());
sb.append(")");
dep.setExpr(sb.toString());

LinkedHashSet<BaseColumnInfo> colSet = new LinkedHashSet<>();
for(ColumnInfo ci : inpOp.getSchema().getSignature()) {
Dependency d = lCtx.getIndex().getDependency(inpOp, ci);
if (d != null) {
newType = LineageCtx.getNewDependencyType(d.getType(), newType);
if (!ci.isHiddenVirtualCol() && columns.contains(ci.getInternalName())) {
colSet.addAll(d.getBaseCols());
LinkedHashSet<BaseColumnInfo> colSet = new LinkedHashSet<>();
for (ColumnInfo ci : inpOp.getSchema().getSignature()) {
Dependency d = lCtx.getIndex().getDependency(inpOp, ci);
if (d != null) {
newType = LineageCtx.getNewDependencyType(d.getType(), newType);
if (!ci.isHiddenVirtualCol() && columns.contains(ci.getInternalName())) {
colSet.addAll(d.getBaseCols());
}
}
}
}

dep.setType(newType);
dep.setBaseCols(colSet);
dep.setType(newType);
dep.setBaseCols(colSet);
windowDeps.add(dep);
}

// This dependency is then set for all the colinfos of the script operator
int windowIdx = 0;
for(ColumnInfo ci : op.getSchema().getSignature()) {
Dependency d = dep;
Dependency depCi = lCtx.getIndex().getDependency(inpOp, ci);
if (depCi != null) {
d = depCi;
}
Dependency d = null;
Dependency depCi = lCtx.getIndex().getDependency(inpOp, ci);
if (depCi != null) {
d = depCi;
} else if (windowIdx < windowDeps.size() && windowDeps.size() > 1) {
d = windowDeps.get(windowIdx++);
} else {
d = windowDeps.getFirst();
}
lCtx.getIndex().putDependency(op, ci, d);
}

Expand Down
8 changes: 8 additions & 0 deletions ql/src/test/queries/clientpositive/lineage7.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
set hive.exec.post.hooks=org.apache.hadoop.hive.ql.hooks.LineageLogger;

create table table_1 (id1 int, id2 int, key string);

create table table_2 as select
sum(id1) over(partition by key ) sum1,
sum(id2) over(partition by key ) sum2
from table_1
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ POSTHOOK: Lineage: part_1_n1.p_mfgr SIMPLE [(part)part.FieldSchema(name:p_mfgr,
POSTHOOK: Lineage: part_1_n1.p_name SIMPLE [(part)part.FieldSchema(name:p_name, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n1.p_size SIMPLE [(part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_1_n1.r EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n1.s EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n1.s EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_retailprice, type:double, comment:null), ]
POSTHOOK: Lineage: part_2_n1.cud EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n1.dr EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n1.fv1 EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
Expand All @@ -1305,7 +1305,7 @@ POSTHOOK: Lineage: part_2_n1.p_size SIMPLE [(part)part.FieldSchema(name:p_size,
POSTHOOK: Lineage: part_2_n1.r EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n1.s2 EXPRESSION [(part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n1.c EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n1.ca EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n1.ca EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n1.fv EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n1.p_mfgr SIMPLE [(part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n1.p_name SIMPLE [(part)part.FieldSchema(name:p_name, type:string, comment:null), ]
Expand Down
14 changes: 14 additions & 0 deletions ql/src/test/results/clientpositive/llap/lineage7.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PREHOOK: query: create table table_1 (id1 int, id2 int, key string)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@table_1
PREHOOK: query: create table table_2 as select
sum(id1) over(partition by key ) sum1,
sum(id2) over(partition by key ) sum2
from table_1
PREHOOK: type: CREATETABLE_AS_SELECT
PREHOOK: Input: default@table_1
PREHOOK: Output: database:default
PREHOOK: Output: default@table_2
Result schema has 2 fields, but we don't get as many dependencies
{"version":"1.0","engine":"tez","database":"default","hash":"f81777f9774d12cc77dd583ea9ff99b3","queryText":"create table table_2 as select\nsum(id1) over(partition by key ) sum1,\nsum(id2) over(partition by key ) sum2\nfrom table_1","edges":[{"sources":[2,3],"targets":[0],"expression":"sum(table_1.id1) over (partition by table_1.key order by table_1.key ROWS between unbounded and unbounded)","edgeType":"PROJECTION"},{"sources":[4,3],"targets":[1],"expression":"sum(table_1.id2) over (partition by table_1.key order by table_1.key ROWS between unbounded and unbounded)","edgeType":"PROJECTION"}],"vertices":[{"id":0,"vertexType":"COLUMN","vertexId":"default.table_2.sum1"},{"id":1,"vertexType":"COLUMN","vertexId":"default.table_2.sum2"},{"id":2,"vertexType":"COLUMN","vertexId":"default.table_1.id1"},{"id":3,"vertexType":"COLUMN","vertexId":"default.table_1.key"},{"id":4,"vertexType":"COLUMN","vertexId":"default.table_1.id2"}]}
Original file line number Diff line number Diff line change
Expand Up @@ -6667,7 +6667,7 @@ POSTHOOK: Lineage: part_1_n0.p_mfgr SIMPLE [(part)part.FieldSchema(name:p_mfgr,
POSTHOOK: Lineage: part_1_n0.p_name SIMPLE [(part)part.FieldSchema(name:p_name, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n0.p_size SIMPLE [(part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_1_n0.r EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n0.s EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_1_n0.s EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_retailprice, type:double, comment:null), ]
POSTHOOK: Lineage: part_2_n0.cud EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n0.dr EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n0.fv1 EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
Expand All @@ -6677,7 +6677,7 @@ POSTHOOK: Lineage: part_2_n0.p_size SIMPLE [(part)part.FieldSchema(name:p_size,
POSTHOOK: Lineage: part_2_n0.r EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_2_n0.s2 EXPRESSION [(part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n0.c EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n0.ca EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n0.ca EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n0.fv EXPRESSION [(part)part.FieldSchema(name:p_name, type:string, comment:null), (part)part.FieldSchema(name:p_mfgr, type:string, comment:null), (part)part.FieldSchema(name:p_size, type:int, comment:null), ]
POSTHOOK: Lineage: part_3_n0.p_mfgr SIMPLE [(part)part.FieldSchema(name:p_mfgr, type:string, comment:null), ]
POSTHOOK: Lineage: part_3_n0.p_name SIMPLE [(part)part.FieldSchema(name:p_name, type:string, comment:null), ]
Expand Down
Loading