Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
df46f894
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
df46f894
编写于
12月 11, 2020
作者:
T
Timo Walther
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[FLINK-20522][table] Add built-in IFNULL function
This closes #14378.
上级
857a5b90
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
400 addition
and
10 deletion
+400
-10
docs/dev/table/functions/systemFunctions.md
docs/dev/table/functions/systemFunctions.md
+52
-1
flink-python/pyflink/table/expression.py
flink-python/pyflink/table/expression.py
+16
-0
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
.../org/apache/flink/table/api/internal/BaseExpressions.java
+122
-1
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
...che/flink/table/functions/BuiltInFunctionDefinitions.java
+19
-4
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
...ache/flink/table/types/inference/InputTypeStrategies.java
+14
-0
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
...rg/apache/flink/table/types/inference/TypeStrategies.java
+14
-2
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
...ypes/inference/strategies/CommonArgumentTypeStrategy.java
+73
-0
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
.../flink/table/types/inference/InputTypeStrategiesTest.java
+9
-1
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
...he/flink/table/planner/functions/MiscFunctionsITCase.java
+38
-1
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/scalar/IfNullFunction.java
.../flink/table/runtime/functions/scalar/IfNullFunction.java
+43
-0
未找到文件。
docs/dev/table/functions/systemFunctions.md
浏览文件 @
df46f894
...
...
@@ -4480,7 +4480,24 @@ IF(condition, true_value, false_value)
<p>
Only supported in blink planner.
</p>
<p>
E.g.,
<code>
IF(5 > 3, 5, 3)
</code>
returns 5.
</p>
</td>
</tr>
</tr>
<tr>
<td>
{% highlight text %}
IFNULL(input, null_replacement)
{% endhighlight %}
</td>
<td>
<p>
Returns
<i>
null_replacement
</i>
if
<i>
input
</i>
is NULL; otherwise
<i>
input
</i>
is returned.
</p>
<p>
Compared to
<code>
COALESCE
</code>
or
<code>
CASE WHEN
</code>
, this function returns a data
type that is very specific in terms of nullability. The returned type is the common type of
both arguments but only nullable if the
<i>
null_replacement
</i>
is nullable.
</p>
<p>
The function allows to pass nullable columns into a function or table that is declared with
a NOT NULL constraint.
</p>
<p>
E.g.,
<code>
IFNULL(nullable_column, 5)
</code>
returns never NULL.
</p>
</td>
</tr>
<tr>
<td>
...
...
@@ -4543,6 +4560,23 @@ BOOLEAN.?(VALUE1, VALUE2)
<p>
E.g.,
<code>
(42 > 5).?('A', 'B')
</code>
returns "A".
</p>
</td>
</tr>
<tr>
<td>
{% highlight java %}
input.ifNull(nullReplacement)
{% endhighlight %}
</td>
<td>
<p>
Returns
<i>
nullReplacement
</i>
if
<i>
input
</i>
is NULL; otherwise
<i>
input
</i>
is returned.
</p>
<p>
This function returns a data type that is very specific in terms of nullability. The returned
type is the common type of both arguments but only nullable if the
<i>
nullReplacement
</i>
is
nullable.
</p>
<p>
The function allows to pass nullable columns into a function or table that is declared with
a NOT NULL constraint.
</p>
<p>
E.g.,
<code>
$("nullable_column").ifNull(5)
</code>
returns never NULL.
</p>
</td>
</tr>
</tbody>
</table>
</div>
...
...
@@ -4568,6 +4602,23 @@ BOOLEAN.?(VALUE1, VALUE2)
<p>
E.g.,
<code>
(42 > 5).?("A", "B")
</code>
returns "A".
</p>
</td>
</tr>
<tr>
<td>
{% highlight scala %}
input.ifNull(nullReplacement)
{% endhighlight %}
</td>
<td>
<p>
Returns
<i>
nullReplacement
</i>
if
<i>
input
</i>
is NULL; otherwise
<i>
input
</i>
is returned.
</p>
<p>
This function returns a data type that is very specific in terms of nullability. The returned
type is the common type of both arguments but only nullable if the
<i>
nullReplacement
</i>
is
nullable.
</p>
<p>
The function allows to pass nullable columns into a function or table that is declared with
a NOT NULL constraint.
</p>
<p>
E.g.,
<code>
$("nullable_column").ifNull(5)
</code>
returns never NULL.
</p>
</td>
</tr>
</tbody>
</table>
</div>
...
...
flink-python/pyflink/table/expression.py
浏览文件 @
df46f894
...
...
@@ -578,6 +578,22 @@ class Expression(Generic[T]):
"""
return
_ternary_op
(
"then"
)(
self
,
if_true
,
if_false
)
def
if_null
(
self
,
null_replacement
)
->
'Expression'
:
"""
Returns null_replacement if the given expression is null; otherwise the expression is
returned.
This function returns a data type that is very specific in terms of nullability. The
returned type is the common type of both arguments but only nullable if the
null_replacement is nullable.
The function allows to pass nullable columns into a function or table that is declared
with a NOT NULL constraint.
e.g. col("nullable_column").if_null(5) returns never null.
"""
return
_binary_op
(
"if_null"
)(
self
,
null_replacement
)
@
property
def
is_null
(
self
)
->
'Expression[bool]'
:
"""
...
...
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
浏览文件 @
df46f894
...
...
@@ -43,7 +43,110 @@ import static org.apache.flink.table.expressions.ApiExpressionUtils.toMonthInter
import
static
org
.
apache
.
flink
.
table
.
expressions
.
ApiExpressionUtils
.
typeLiteral
;
import
static
org
.
apache
.
flink
.
table
.
expressions
.
ApiExpressionUtils
.
unresolvedCall
;
import
static
org
.
apache
.
flink
.
table
.
expressions
.
ApiExpressionUtils
.
valueLiteral
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.*;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ABS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ACOS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
AND
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ARRAY_ELEMENT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ASIN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
AT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ATAN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
AVG
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
BETWEEN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
BIN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
CARDINALITY
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
CAST
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
CEIL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
CHAR_LENGTH
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
COLLECT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
COS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
COSH
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
COT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
COUNT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
DEGREES
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
DISTINCT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
DIVIDE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
EQUALS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
EXP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
EXTRACT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
FLATTEN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
FLOOR
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
FROM_BASE64
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
GET
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
GREATER_THAN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
GREATER_THAN_OR_EQUAL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
HEX
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IF
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IF_NULL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
INIT_CAP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_FALSE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_NOT_FALSE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_NOT_NULL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_NOT_TRUE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_NULL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
IS_TRUE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LESS_THAN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LESS_THAN_OR_EQUAL
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LIKE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LOG
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LOG10
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LOG2
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LOWER
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LPAD
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
LTRIM
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
MAX
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
MD5
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
MIN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
MINUS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
MOD
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
NOT_BETWEEN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
NOT_EQUALS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
OR
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ORDER_ASC
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ORDER_DESC
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
OVER
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
OVERLAY
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
PLUS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
POSITION
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
POWER
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
PROCTIME
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
RADIANS
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
REGEXP_EXTRACT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
REGEXP_REPLACE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
REPEAT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
REPLACE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ROUND
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
ROWTIME
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
RPAD
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
RTRIM
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA1
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA2
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA224
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA256
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA384
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SHA512
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SIGN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SIMILAR
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SIN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SINH
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SQRT
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
STDDEV_POP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
STDDEV_SAMP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SUBSTRING
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SUM
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
SUM0
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TAN
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TANH
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TIMES
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TO_BASE64
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TRIM
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
TRUNCATE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
UPPER
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
VAR_POP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
VAR_SAMP
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
WINDOW_END
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
BuiltInFunctionDefinitions
.
WINDOW_START
;
import
static
org
.
apache
.
flink
.
table
.
types
.
utils
.
TypeConversions
.
fromLegacyInfoToDataType
;
/**
...
...
@@ -204,6 +307,24 @@ public abstract class BaseExpressions<InType, OutType> {
IF
,
toExpr
(),
objectToExpression
(
ifTrue
),
objectToExpression
(
ifFalse
)));
}
/**
* Returns {@code nullReplacement} if the given expression is NULL; otherwise the expression is
* returned.
*
* <p>This function returns a data type that is very specific in terms of nullability. The
* returned type is the common type of both arguments but only nullable if the {@code
* nullReplacement} is nullable.
*
* <p>The function allows to pass nullable columns into a function or table that is declared
* with a NOT NULL constraint.
*
* <p>E.g., <code>$('nullable_column').ifNull(5)</code> returns never NULL.
*/
public
OutType
ifNull
(
InType
nullReplacement
)
{
return
toApiSpecificExpression
(
unresolvedCall
(
IF_NULL
,
toExpr
(),
objectToExpression
(
nullReplacement
)));
}
/** Returns true if the given expression is null. */
public
OutType
isNull
()
{
return
toApiSpecificExpression
(
unresolvedCall
(
IS_NULL
,
toExpr
()));
...
...
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
浏览文件 @
df46f894
...
...
@@ -40,6 +40,8 @@ import java.util.Set;
import
static
org
.
apache
.
flink
.
table
.
functions
.
FunctionKind
.
AGGREGATE
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
FunctionKind
.
OTHER
;
import
static
org
.
apache
.
flink
.
table
.
functions
.
FunctionKind
.
SCALAR
;
import
static
org
.
apache
.
flink
.
table
.
types
.
inference
.
InputTypeStrategies
.
ANY
;
import
static
org
.
apache
.
flink
.
table
.
types
.
inference
.
InputTypeStrategies
.
COMMON_ARG_NULLABLE
;
import
static
org
.
apache
.
flink
.
table
.
types
.
inference
.
InputTypeStrategies
.
LITERAL
;
import
static
org
.
apache
.
flink
.
table
.
types
.
inference
.
InputTypeStrategies
.
NO_ARGS
;
import
static
org
.
apache
.
flink
.
table
.
types
.
inference
.
InputTypeStrategies
.
OUTPUT_IF_NULL
;
...
...
@@ -74,7 +76,7 @@ import static org.apache.flink.table.types.inference.TypeStrategies.varyingStrin
public
final
class
BuiltInFunctionDefinitions
{
// --------------------------------------------------------------------------------------------
//
Debugging
functions
//
New stack built-in
functions
// --------------------------------------------------------------------------------------------
public
static
final
BuiltInFunctionDefinition
TYPE_OF
=
...
...
@@ -85,17 +87,30 @@ public final class BuiltInFunctionDefinitions {
or
(
sequence
(
new
String
[]
{
"input"
},
new
ArgumentTypeStrategy
[]
{
InputTypeStrategies
.
ANY
}),
new
ArgumentTypeStrategy
[]
{
ANY
}),
sequence
(
new
String
[]
{
"input"
,
"force_serializable"
},
new
ArgumentTypeStrategy
[]
{
InputTypeStrategies
.
ANY
,
and
(
logical
(
LogicalTypeRoot
.
BOOLEAN
),
LITERAL
)
ANY
,
and
(
logical
(
LogicalTypeRoot
.
BOOLEAN
),
LITERAL
)
})))
.
outputTypeStrategy
(
explicit
(
DataTypes
.
STRING
()))
.
runtimeClass
(
"org.apache.flink.table.runtime.functions.scalar.TypeOfFunction"
)
.
build
();
public
static
final
BuiltInFunctionDefinition
IF_NULL
=
BuiltInFunctionDefinition
.
newBuilder
()
.
name
(
"IFNULL"
)
.
kind
(
SCALAR
)
.
inputTypeStrategy
(
sequence
(
new
String
[]
{
"input"
,
"null_replacement"
},
new
ArgumentTypeStrategy
[]
{
COMMON_ARG_NULLABLE
,
COMMON_ARG_NULLABLE
}))
.
outputTypeStrategy
(
TypeStrategies
.
IF_NULL
)
.
runtimeClass
(
"org.apache.flink.table.runtime.functions.scalar.IfNullFunction"
)
.
build
();
// --------------------------------------------------------------------------------------------
// Logic functions
// --------------------------------------------------------------------------------------------
...
...
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/InputTypeStrategies.java
浏览文件 @
df46f894
...
...
@@ -24,6 +24,7 @@ import org.apache.flink.table.types.DataType;
import
org.apache.flink.table.types.inference.strategies.AndArgumentTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.AnyArgumentTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.CastInputTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy
;
import
org.apache.flink.table.types.inference.strategies.CompositeArgumentTypeStrategy
;
...
...
@@ -200,6 +201,19 @@ public final class InputTypeStrategies {
/** Strategy that checks that the argument has a composite type. */
public
static
final
ArgumentTypeStrategy
COMPOSITE
=
new
CompositeArgumentTypeStrategy
();
/**
* Argument type strategy that checks and casts for a common, least restrictive type of all
* arguments.
*/
public
static
final
ArgumentTypeStrategy
COMMON_ARG
=
new
CommonArgumentTypeStrategy
(
false
);
/**
* Argument type strategy that checks and casts for a common, least restrictive type of all
* arguments. But leaves nullability untouched.
*/
public
static
final
ArgumentTypeStrategy
COMMON_ARG_NULLABLE
=
new
CommonArgumentTypeStrategy
(
true
);
/**
* Strategy for an argument that corresponds to an explicitly defined type casting. Implicit
* casts will be inserted if possible.
...
...
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeStrategies.java
浏览文件 @
df46f894
...
...
@@ -173,8 +173,8 @@ public final class TypeStrategies {
};
/**
*
<<<<<<< HEAD Type strategy that returns the sum of an exact numeric addition that includes at
*
least one
decimal.
*
Type strategy that returns the sum of an exact numeric addition that includes at least one
* decimal.
*/
public
static
final
TypeStrategy
DECIMAL_PLUS
=
callContext
->
{
...
...
@@ -405,6 +405,18 @@ public final class TypeStrategies {
});
};
/** Type strategy specific for avoiding nulls. */
public
static
final
TypeStrategy
IF_NULL
=
callContext
->
{
final
List
<
DataType
>
argumentDataTypes
=
callContext
.
getArgumentDataTypes
();
final
DataType
inputDataType
=
argumentDataTypes
.
get
(
0
);
final
DataType
nullReplacementDataType
=
argumentDataTypes
.
get
(
1
);
if
(!
inputDataType
.
getLogicalType
().
isNullable
())
{
return
Optional
.
of
(
inputDataType
);
}
return
Optional
.
of
(
nullReplacementDataType
);
};
// --------------------------------------------------------------------------------------------
@SuppressWarnings
(
"BooleanMethodIsAlwaysInverted"
)
...
...
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/CommonArgumentTypeStrategy.java
0 → 100644
浏览文件 @
df46f894
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.table.types.inference.strategies
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.table.functions.FunctionDefinition
;
import
org.apache.flink.table.types.DataType
;
import
org.apache.flink.table.types.inference.ArgumentTypeStrategy
;
import
org.apache.flink.table.types.inference.CallContext
;
import
org.apache.flink.table.types.inference.Signature
;
import
org.apache.flink.table.types.logical.LogicalType
;
import
org.apache.flink.table.types.logical.utils.LogicalTypeMerging
;
import
org.apache.flink.table.types.utils.TypeConversions
;
import
java.util.List
;
import
java.util.Optional
;
import
java.util.stream.Collectors
;
/**
* Argument type strategy that checks and casts for a common, least restrictive type of all
* arguments.
*
* <p>Nullability can be preserved if needed.
*/
@Internal
public
final
class
CommonArgumentTypeStrategy
implements
ArgumentTypeStrategy
{
private
static
final
Signature
.
Argument
COMMON_ARGUMENT
=
Signature
.
Argument
.
of
(
"<COMMON>"
);
private
final
boolean
preserveNullability
;
public
CommonArgumentTypeStrategy
(
boolean
preserveNullability
)
{
this
.
preserveNullability
=
preserveNullability
;
}
@Override
public
Optional
<
DataType
>
inferArgumentType
(
CallContext
callContext
,
int
argumentPos
,
boolean
throwOnFailure
)
{
final
List
<
LogicalType
>
actualTypes
=
callContext
.
getArgumentDataTypes
().
stream
()
.
map
(
DataType:
:
getLogicalType
)
.
collect
(
Collectors
.
toList
());
return
LogicalTypeMerging
.
findCommonType
(
actualTypes
)
.
map
(
commonType
->
preserveNullability
?
commonType
.
copy
(
actualTypes
.
get
(
argumentPos
).
isNullable
())
:
commonType
)
.
map
(
TypeConversions:
:
fromLogicalToDataType
);
}
@Override
public
Signature
.
Argument
getExpectedArgument
(
FunctionDefinition
functionDefinition
,
int
argumentPos
)
{
return
COMMON_ARGUMENT
;
}
}
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/InputTypeStrategiesTest.java
浏览文件 @
df46f894
...
...
@@ -619,7 +619,15 @@ public class InputTypeStrategiesTest extends InputTypeStrategiesTestBase {
.
namedArguments
(
"sameName"
)
.
calledWithArgumentTypes
(
DataTypes
.
BOOLEAN
())
.
expectErrorMessage
(
"Invalid input arguments. Expected signatures are:\nf(STRING)\nf(INT)"
));
"Invalid input arguments. Expected signatures are:\nf(STRING)\nf(INT)"
),
TestSpec
.
forStrategy
(
"Common argument type strategy"
,
sequence
(
InputTypeStrategies
.
COMMON_ARG
,
InputTypeStrategies
.
COMMON_ARG
))
.
calledWithArgumentTypes
(
DataTypes
.
INT
(),
DataTypes
.
BIGINT
())
.
expectSignature
(
"f(<COMMON>, <COMMON>)"
)
.
expectArgumentTypes
(
DataTypes
.
BIGINT
(),
DataTypes
.
BIGINT
()));
}
/** Simple pojo that should be converted to a Structured type. */
...
...
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/functions/MiscFunctionsITCase.java
浏览文件 @
df46f894
...
...
@@ -20,9 +20,11 @@ package org.apache.flink.table.planner.functions;
import
org.apache.flink.table.api.DataTypes
;
import
org.apache.flink.table.functions.BuiltInFunctionDefinitions
;
import
org.apache.flink.table.functions.ScalarFunction
;
import
org.junit.runners.Parameterized
;
import
java.math.BigDecimal
;
import
java.util.Arrays
;
import
java.util.List
;
...
...
@@ -51,6 +53,41 @@ public class MiscFunctionsITCase extends BuiltInFunctionTestBase {
call
(
"TYPEOF"
,
$
(
"f1"
),
true
),
"CHAR(11) NOT NULL"
,
DataTypes
.
STRING
())
.
testSqlResult
(
"TYPEOF(NULL)"
,
"NULL"
,
DataTypes
.
STRING
()));
.
testSqlResult
(
"TYPEOF(NULL)"
,
"NULL"
,
DataTypes
.
STRING
()),
TestSpec
.
forFunction
(
BuiltInFunctionDefinitions
.
IF_NULL
)
.
onFieldsWithData
(
null
,
new
BigDecimal
(
"123.45"
))
.
andDataTypes
(
DataTypes
.
INT
().
nullable
(),
DataTypes
.
DECIMAL
(
5
,
2
).
notNull
())
.
withFunction
(
TakesNotNull
.
class
)
.
testResult
(
$
(
"f0"
).
ifNull
(
$
(
"f0"
)),
"IFNULL(f0, f0)"
,
null
,
DataTypes
.
INT
().
nullable
())
.
testResult
(
$
(
"f0"
).
ifNull
(
$
(
"f1"
)),
"IFNULL(f0, f1)"
,
new
BigDecimal
(
"123.45"
),
DataTypes
.
DECIMAL
(
12
,
2
).
notNull
())
.
testResult
(
$
(
"f1"
).
ifNull
(
$
(
"f0"
)),
"IFNULL(f1, f0)"
,
new
BigDecimal
(
"123.45"
),
DataTypes
.
DECIMAL
(
12
,
2
).
notNull
())
.
testResult
(
$
(
"f1"
).
ifNull
(
$
(
"f0"
)),
"IFNULL(f1, f0)"
,
new
BigDecimal
(
"123.45"
),
DataTypes
.
DECIMAL
(
12
,
2
).
notNull
())
.
testSqlResult
(
"TakesNotNull(IFNULL(f0, 12))"
,
12
,
DataTypes
.
INT
().
notNull
()));
}
// --------------------------------------------------------------------------------------------
/** Function that takes a NOT NULL argument. */
public
static
class
TakesNotNull
extends
ScalarFunction
{
public
int
eval
(
int
i
)
{
return
i
;
}
}
}
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/functions/scalar/IfNullFunction.java
0 → 100644
浏览文件 @
df46f894
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.flink.table.runtime.functions.scalar
;
import
org.apache.flink.annotation.Internal
;
import
org.apache.flink.table.functions.BuiltInFunctionDefinitions
;
import
org.apache.flink.table.functions.SpecializedFunction.SpecializedContext
;
import
javax.annotation.Nullable
;
/** Implementation of {@link BuiltInFunctionDefinitions#IF_NULL}. */
@Internal
public
class
IfNullFunction
extends
BuiltInScalarFunction
{
public
IfNullFunction
(
SpecializedContext
context
)
{
super
(
BuiltInFunctionDefinitions
.
IF_NULL
,
context
);
}
public
@Nullable
Object
eval
(
Object
input
,
Object
nullReplacement
)
{
// we rely on the casting functionality via input type strategy
// to determine the common data type
if
(
input
==
null
)
{
return
nullReplacement
;
}
return
input
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录