提交 7ea384b9 编写于 作者: H huangxingbo 提交者: Dian Fu

[FLINK-20712][python] Support join_lateral/left_outer_join_lateral to accept...

[FLINK-20712][python] Support join_lateral/left_outer_join_lateral to accept table function directly

This closes #14500.
上级 0c36b666
......@@ -310,7 +310,7 @@ class Table(object):
right._j_table, _get_java_expression(join_predicate)), self._t_env)
def join_lateral(self,
table_function_call: Union[str, Expression],
table_function_call: Union[str, Expression, UserDefinedTableFunctionWrapper],
join_predicate: Union[str, Expression[bool]] = None) -> 'Table':
"""
Joins this Table with an user-defined TableFunction. This join is similar to a SQL inner
......@@ -326,12 +326,27 @@ class Table(object):
>>> from pyflink.table import expressions as expr
>>> tab.join_lateral(expr.call('split', ' ').alias('b'), expr.col('a') == expr.col('b'))
>>> # take all the columns as inputs
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split_row(row: Row):
... for s in row[1].split(","):
... yield row[0], s
>>> tab.join_lateral(split_row.alias("a", "b"))
:param table_function_call: An expression representing a table function call.
:param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
exist.
:return: The result Table.
"""
if isinstance(table_function_call, UserDefinedTableFunctionWrapper):
table_function_call._set_takes_row_as_input()
if hasattr(table_function_call, "_alias_names"):
alias_names = getattr(table_function_call, "_alias_names")
table_function_call = table_function_call(with_columns(col("*"))) \
.alias(*alias_names)
else:
raise AttributeError('table_function_call must be followed by a alias function'
'e.g. table_function.alias("a", "b")')
if join_predicate is None:
return Table(self._j_table.joinLateral(
_get_java_expression(table_function_call)), self._t_env)
......@@ -342,7 +357,8 @@ class Table(object):
self._t_env)
def left_outer_join_lateral(self,
table_function_call: Union[str, Expression],
table_function_call: Union[str, Expression,
UserDefinedTableFunctionWrapper],
join_predicate: Union[str, Expression[bool]] = None) -> 'Table':
"""
Joins this Table with an user-defined TableFunction. This join is similar to
......@@ -358,12 +374,27 @@ class Table(object):
>>> tab.left_outer_join_lateral("split(text, ' ') as (b)")
>>> from pyflink.table import expressions as expr
>>> tab.left_outer_join_lateral(expr.call('split', ' ').alias('b'))
>>> # take all the columns as inputs
>>> @udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
... def split_row(row: Row):
... for s in row[1].split(","):
... yield row[0], s
>>> tab.left_outer_join_lateral(split_row.alias("a", "b"))
:param table_function_call: An expression representing a table function call.
:param join_predicate: Optional, The join predicate expression string, join ON TRUE if not
exist.
:return: The result Table.
"""
if isinstance(table_function_call, UserDefinedTableFunctionWrapper):
table_function_call._set_takes_row_as_input()
if hasattr(table_function_call, "_alias_names"):
alias_names = getattr(table_function_call, "_alias_names")
table_function_call = table_function_call(with_columns(col("*"))) \
.alias(*alias_names)
else:
raise AttributeError('table_function_call must be followed by a alias function'
'e.g. table_function.alias("a", "b")')
if join_predicate is None:
return Table(self._j_table.leftOuterJoinLateral(
_get_java_expression(table_function_call)), self._t_env)
......
......@@ -97,8 +97,9 @@ class RowBasedOperationTests(object):
DataTypes.FIELD("b", DataTypes.STRING())]))
table_sink = source_sink_utils.TestAppendSink(
['a', 'b'],
[DataTypes.BIGINT(), DataTypes.STRING()])
['a', 'b', 'c', 'd', 'e', 'f'],
[DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.BIGINT(),
DataTypes.STRING(), DataTypes.BIGINT(), DataTypes.STRING()])
self.t_env.register_table_sink("Results", table_sink)
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
......@@ -108,10 +109,13 @@ class RowBasedOperationTests(object):
t.flat_map(split) \
.flat_map(split) \
.join_lateral(split.alias("a", "b")) \
.left_outer_join_lateral(split.alias("c", "d")) \
.execute_insert("Results") \
.wait()
actual = source_sink_utils.results()
self.assert_equals(actual, ["1,2", "1,3", "2,1", "1,5", "1,6", "1,7"])
self.assert_equals(actual, ["1,2,1,2,1,2", "1,3,1,3,1,3", "2,1,2,1,2,1",
"1,5,1,5,1,5", "1,6,1,6,1,6", "1,7,1,7,1,7"])
class BatchRowBasedOperationITTests(RowBasedOperationTests, PyFlinkBlinkBatchTableTestCase):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册