Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
5308efd5
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
5308efd5
编写于
5月 07, 2014
作者:
S
sebastian kunert
提交者:
StephanEwen
5月 15, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
implemented infrastructure for string style annotations
上级
fe9627c7
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
71 addition
and
226 deletion
+71
-226
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/FunctionAnnotation.java
...u/stratosphere/api/java/functions/FunctionAnnotation.java
+62
-223
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanFlatMapOperator.java
...e/api/java/operators/translation/PlanFlatMapOperator.java
+9
-3
未找到文件。
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/FunctionAnnotation.java
浏览文件 @
5308efd5
...
...
@@ -13,10 +13,13 @@
**********************************************************************************************************************/
package
eu.stratosphere.api.java.functions
;
import
java.lang.annotation.Annotation
;
import
java.lang.annotation.ElementType
;
import
java.lang.annotation.Retention
;
import
java.lang.annotation.RetentionPolicy
;
import
java.lang.annotation.Target
;
import
java.util.HashSet
;
import
java.util.Set
;
import
com.google.common.primitives.Ints
;
...
...
@@ -81,35 +84,9 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFields
{
int
[]
value
()
default
{};
int
[]
outTuplePos
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
outCustomPos
()
default
{};
String
[]
value
();
}
/**
* Specifies that all fields of an input tuple or custom object that are unchanged in the output of
* a {@link MapFunction}, or {@link ReduceFunction}).
*
* A field is considered to be constant if its value is not changed and copied to the same position of
* output record.
*
* <b>
* It is very important to follow a conservative strategy when specifying constant fields.
* Only fields that are always constant (regardless of value, stub call, etc.) to the output may be
* inserted! Otherwise, the correct execution of a program can not be guaranteed.
* So if in doubt, do not add a field to this set.
* </b>
*
* This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation.
*
* If this annotation and the {@link ConstantFieldsExcept} annotation is not set, it is
* assumed that <i>no</i> field is constant.
*/
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
AllFieldsConstants
{}
/**
* Specifies the fields of an input tuple or custom object of the first input that are unchanged in
* the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction})
...
...
@@ -139,10 +116,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsFirst
{
int
[]
value
()
default
{};
int
[]
outTuplePos
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
outCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -173,10 +147,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsSecond
{
int
[]
value
()
default
{};
int
[]
outTuplePos
()
default
{};
String
[]
outCustomPos
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -208,8 +179,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsExcept
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -241,8 +211,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsFirstExcept
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
...
...
@@ -275,8 +244,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsSecondExcept
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -287,8 +255,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFields
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -299,8 +266,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFieldsSecond
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
...
...
@@ -311,8 +277,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFieldsFirst
{
int
[]
value
()
default
{};
String
[]
inCustomPos
()
default
{};
String
[]
value
();
}
/**
* Private constructor to prevent instantiation. This class is intended only as a container.
...
...
@@ -322,77 +287,6 @@ public class FunctionAnnotation {
// --------------------------------------------------------------------------------------------
// Function Annotation Handling
// --------------------------------------------------------------------------------------------
private
static
boolean
checkValidity
(
ConstantFields
constantSet
)
{
int
counter
=
0
;
if
(
constantSet
.
value
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outTuplePos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
inCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
counter
>
2
)
{
return
false
;
}
return
true
;
}
private
static
boolean
checkValidity
(
ConstantFieldsFirst
constantSet
)
{
int
counter
=
0
;
if
(
constantSet
.
value
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outTuplePos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
inCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
counter
>
2
)
{
return
false
;
}
return
true
;
}
private
static
boolean
checkValidity
(
ConstantFieldsSecond
constantSet
)
{
int
counter
=
0
;
if
(
constantSet
.
value
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outTuplePos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
outCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
constantSet
.
inCustomPos
().
length
>
0
)
{
counter
++;
};
if
(
counter
>
2
)
{
return
false
;
}
return
true
;
}
/**
* Reads the annotations of a user defined function with one input and returns semantic properties according to the constant fields annotated.
...
...
@@ -402,68 +296,38 @@ public class FunctionAnnotation {
* @return The DualInputSemanticProperties containing the constant fields.
*/
public
static
SingleInputSemanticProperties
readSingleConstantAnnotations
(
UserCodeWrapper
<?>
udf
,
TypeInformation
<?>
input
,
TypeInformation
<?>
output
)
{
if
(!
input
.
isTupleType
()
||
!
output
.
isTupleType
())
{
return
null
;
}
AllFieldsConstants
allConstants
=
udf
.
getUserCodeAnnotation
(
AllFieldsConstants
.
class
);
public
static
Set
<
Annotation
>
readSingleConstantAnnotations
(
UserCodeWrapper
<?>
udf
)
{
ConstantFields
constantSet
=
udf
.
getUserCodeAnnotation
(
ConstantFields
.
class
);
ConstantFieldsExcept
notConstantSet
=
udf
.
getUserCodeAnnotation
(
ConstantFieldsExcept
.
class
);
ReadFields
readfieldSet
=
udf
.
getUserCodeAnnotation
(
ReadFields
.
class
);
int
inputArity
=
input
.
getArity
();
int
outputArity
=
output
.
getArity
();
if
(
notConstantSet
!=
null
&&
(
constantSet
!=
null
||
allConstants
!=
null
))
{
throw
new
RuntimeException
(
"Either ConstantFields or ConstantFieldsExcept can be specified, not both."
);
}
Set
<
Annotation
>
result
=
null
;
if
(
constantSet
!=
null
&&
!
checkValidity
(
constantSet
))
{
throw
new
RuntimeException
(
"Only two parameters of the annotation should be used at once."
);
}
SingleInputSemanticProperties
semanticProperties
=
new
SingleInputSemanticProperties
();
if
(
readfieldSet
!=
null
&&
readfieldSet
.
value
().
length
>
0
)
{
semanticProperties
.
setReadFields
(
new
FieldSet
(
readfieldSet
.
value
()));
if
(
notConstantSet
!=
null
&&
constantSet
!=
null
)
{
throw
new
RuntimeException
(
"Either ConstantFields or ConstantFieldsExcept can be specified, not both."
);
}
// extract notConstantSet from annotation
if
(
notConstantSet
!=
null
&&
notConstantSet
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
inputArity
&&
i
<
outputArity
;
i
++)
{
if
(!
Ints
.
contains
(
notConstantSet
.
value
(),
i
))
{
semanticProperties
.
addForwardedField
(
i
,
i
);
};
if
(
notConstantSet
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
notConstantSet
);
}
if
(
allConstants
!=
null
)
{
for
(
int
i
=
0
;
i
<
inputArity
&&
i
<
outputArity
;
i
++)
{
semanticProperties
.
addForwardedField
(
i
,
i
);
if
(
constantSet
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
constantSet
);
}
// extract constantSet from annotation
if
(
constantSet
!=
null
)
{
if
(
constantSet
.
outTuplePos
().
length
==
0
&&
constantSet
.
value
().
length
>
0
)
{
for
(
int
value:
constantSet
.
value
())
{
semanticProperties
.
addForwardedField
(
value
,
value
);
}
}
else
if
(
constantSet
.
value
().
length
==
constantSet
.
outTuplePos
().
length
&&
constantSet
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
constantSet
.
value
().
length
;
i
++)
{
semanticProperties
.
addForwardedField
(
constantSet
.
value
()[
i
],
constantSet
.
outTuplePos
()[
i
]);
}
}
else
{
throw
new
RuntimeException
(
"Field 'from' and 'to' of the annotation should have the same length."
);
if
(
readfieldSet
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
readfieldSet
);
}
return
semanticProperties
;
return
result
;
}
// --------------------------------------------------------------------------------------------
...
...
@@ -476,15 +340,8 @@ public class FunctionAnnotation {
* @return The DualInputSemanticProperties containing the constant fields.
*/
public
static
DualInputSemanticProperties
readDualConstantAnnotations
(
UserCodeWrapper
<?>
udf
,
TypeInformation
<?>
input1
,
TypeInformation
<?>
input2
,
TypeInformation
<?>
output
)
{
if
(!
input1
.
isTupleType
()
||
!
input2
.
isTupleType
()
||
!
output
.
isTupleType
())
{
return
null
;
}
int
input1Arity
=
input1
.
getArity
();
int
input2Arity
=
input2
.
getArity
();
int
outputArity
=
output
.
getArity
();
public
static
Set
<
Annotation
>
readDualConstantAnnotations
(
UserCodeWrapper
<?>
udf
)
{
// get readSet annotation from stub
ConstantFieldsFirst
constantSet1
=
udf
.
getUserCodeAnnotation
(
ConstantFieldsFirst
.
class
);
ConstantFieldsSecond
constantSet2
=
udf
.
getUserCodeAnnotation
(
ConstantFieldsSecond
.
class
);
...
...
@@ -504,67 +361,49 @@ public class FunctionAnnotation {
throw
new
RuntimeException
(
"Either ConstantFieldsSecond or ConstantFieldsSecondExcept can be specified, not both."
);
}
if
(
constantSet1
!=
null
&&
constantSet2
!=
null
&&
(!
checkValidity
(
constantSet1
)
||
!
checkValidity
(
constantSet2
)))
{
throw
new
RuntimeException
(
"Only two parameters of the annotation should be used at once."
);
}
DualInputSemanticProperties
semanticProperties
=
new
DualInputSemanticProperties
();
Set
<
Annotation
>
result
=
null
;
if
(
readfieldSet1
!=
null
&&
readfieldSet2
.
value
().
length
>
0
)
{
semanticProperties
.
setReadFields1
(
new
FieldSet
(
readfieldSet1
.
value
()));
if
(
notConstantSet2
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
notConstantSet2
);
}
if
(
readfieldSet2
!=
null
&&
readfieldSet2
.
value
().
length
>
0
)
{
semanticProperties
.
setReadFields2
(
new
FieldSet
(
readfieldSet2
.
value
()));
if
(
constantSet2
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
constantSet2
);
}
// extract readSets from annotations
if
(
notConstantSet1
!=
null
&&
notConstantSet1
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
input1Arity
&&
i
<
outputArity
;
i
++)
{
if
(!
Ints
.
contains
(
notConstantSet1
.
value
(),
i
))
{
semanticProperties
.
addForwardedField1
(
i
,
i
);;
};
if
(
readfieldSet2
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
readfieldSet2
);
}
if
(
notConstantSet2
!=
null
&&
notConstantSet2
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
input2Arity
&&
i
<
outputArity
;
i
++)
{
if
(!
Ints
.
contains
(
notConstantSet2
.
value
(),
i
))
{
semanticProperties
.
addForwardedField2
(
i
,
i
);;
};
}
if
(
notConstantSet1
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
notConstantSet1
);
}
// extract readSets from annotations
if
(
constantSet1
!=
null
)
{
if
(
constantSet1
.
outTuplePos
().
length
==
0
&&
constantSet1
.
value
().
length
>
0
)
{
for
(
int
value:
constantSet1
.
value
())
{
semanticProperties
.
addForwardedField1
(
value
,
value
);
}
}
else
if
(
constantSet1
.
value
().
length
==
constantSet1
.
outTuplePos
().
length
&&
constantSet1
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
constantSet1
.
value
().
length
;
i
++)
{
semanticProperties
.
addForwardedField1
(
constantSet1
.
value
()[
i
],
constantSet1
.
outTuplePos
()[
i
]);
}
}
else
{
throw
new
RuntimeException
(
"Field 'from' and 'to' of the annotation should have the same length."
);
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
constantSet1
);
}
if
(
constantSet2
!=
null
)
{
if
(
constantSet2
.
outTuplePos
().
length
==
0
&&
constantSet1
.
value
().
length
>
0
)
{
for
(
int
value:
constantSet2
.
value
())
{
semanticProperties
.
addForwardedField1
(
value
,
value
);
}
}
else
if
(
constantSet2
.
value
().
length
==
constantSet2
.
outTuplePos
().
length
&&
constantSet2
.
value
().
length
>
0
)
{
for
(
int
i
=
0
;
i
<
constantSet2
.
value
().
length
;
i
++)
{
semanticProperties
.
addForwardedField2
(
constantSet2
.
value
()[
i
],
constantSet2
.
outTuplePos
()[
i
]);
}
}
else
{
throw
new
RuntimeException
(
"Field 'from' and 'to' of the ConstantFields annotation should have the same length."
);
if
(
readfieldSet1
!=
null
)
{
if
(
result
==
null
)
{
result
=
new
HashSet
<
Annotation
>();
}
result
.
add
(
readfieldSet1
);
}
return
semanticProperties
;
return
result
;
}
...
...
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanFlatMapOperator.java
浏览文件 @
5308efd5
...
...
@@ -14,14 +14,17 @@
**********************************************************************************************************************/
package
eu.stratosphere.api.java.operators.translation
;
import
java.lang.annotation.Annotation
;
import
java.util.Set
;
import
eu.stratosphere.api.common.functions.GenericFlatMap
;
import
eu.stratosphere.api.common.operators.base.FlatMapOperatorBase
;
import
eu.stratosphere.api.java.functions.FlatMapFunction
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields
;
import
eu.stratosphere.api.java.typeutils.TypeInformation
;
/**
*
*/
public
class
PlanFlatMapOperator
<
T
,
O
>
extends
FlatMapOperatorBase
<
GenericFlatMap
<
T
,
O
>>
implements
UnaryJavaPlanNode
<
T
,
O
>
{
...
...
@@ -34,6 +37,9 @@ public class PlanFlatMapOperator<T, O> extends FlatMapOperatorBase<GenericFlatMa
super
(
udf
,
name
);
this
.
inType
=
inType
;
this
.
outType
=
outType
;
Set
<
Annotation
>
annotations
=
FunctionAnnotation
.
readSingleConstantAnnotations
(
this
.
getUserCodeWrapper
());
System
.
out
.
println
(
annotations
);
}
@Override
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录