Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
doujutun3207
flink
提交
885b5e7a
F
flink
项目概览
doujutun3207
/
flink
与 Fork 源项目一致
从无法访问的项目Fork
通知
24
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
F
flink
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
885b5e7a
编写于
5月 13, 2014
作者:
S
Sebastian Kunert
提交者:
StephanEwen
5月 15, 2014
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
implemented parsing of the strings in SemanticPropUtil
上级
526a1ed3
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
202 addition
and
24 deletion
+202
-24
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/FunctionAnnotation.java
...u/stratosphere/api/java/functions/FunctionAnnotation.java
+6
-6
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
.../eu/stratosphere/api/java/functions/SemanticPropUtil.java
+192
-17
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanFlatMapOperator.java
...e/api/java/operators/translation/PlanFlatMapOperator.java
+4
-1
未找到文件。
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/FunctionAnnotation.java
浏览文件 @
885b5e7a
...
...
@@ -179,7 +179,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsExcept
{
String
[]
value
();
String
value
();
}
/**
...
...
@@ -211,7 +211,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsFirstExcept
{
String
[]
value
();
String
value
();
}
...
...
@@ -244,7 +244,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ConstantFieldsSecondExcept
{
String
[]
value
();
String
value
();
}
/**
...
...
@@ -255,7 +255,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFields
{
String
[]
value
();
String
value
();
}
/**
...
...
@@ -266,7 +266,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFieldsSecond
{
String
[]
value
();
String
value
();
}
/**
...
...
@@ -277,7 +277,7 @@ public class FunctionAnnotation {
@Target
(
ElementType
.
TYPE
)
@Retention
(
RetentionPolicy
.
RUNTIME
)
public
@interface
ReadFieldsFirst
{
String
[]
value
();
String
value
();
}
/**
* Private constructor to prevent instantiation. This class is intended only as a container.
...
...
stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/SemanticPropUtil.java
浏览文件 @
885b5e7a
...
...
@@ -3,19 +3,37 @@ package eu.stratosphere.api.java.functions;
import
java.lang.annotation.Annotation
;
import
java.util.Iterator
;
import
java.util.Set
;
import
java.util.regex.Matcher
;
import
java.util.regex.Pattern
;
import
eu.stratosphere.api.common.operators.DualInputSemanticProperties
;
import
eu.stratosphere.api.common.operators.SemanticProperties
;
import
eu.stratosphere.api.common.operators.SingleInputSemanticProperties
;
import
eu.stratosphere.api.common.operators.util.FieldSet
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsFirst
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsSecond
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsFirstExcept
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsSecondExcept
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ReadFieldsFirst
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ReadFieldsSecond
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsExcept
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ReadFields
;
import
eu.stratosphere.api.java.typeutils.TypeInformation
;
public
class
SemanticPropUtil
{
public
SingleInputSemanticProperties
getSemanticPropsSingle
(
Set
<
Annotation
>
set
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
private
final
static
String
REGEX_ANNOTATION
=
"\\s*(\\d+)\\s*->(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))"
;
public
static
SingleInputSemanticProperties
getSemanticPropsSingle
(
Set
<
Annotation
>
set
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
Iterator
<
Annotation
>
it
=
set
.
iterator
();
SingleInputSemanticProperties
result
=
null
;
//non tuple types are not yet supported for annotations
if
(!
inType
.
isTupleType
()
||
!
outType
.
isTupleType
())
{
return
null
;
}
while
(
it
.
hasNext
())
{
if
(
result
==
null
)
{
result
=
new
SingleInputSemanticProperties
();
...
...
@@ -25,28 +43,185 @@ public class SemanticPropUtil {
if
(
ann
instanceof
ConstantFields
)
{
ConstantFields
cf
=
(
ConstantFields
)
ann
;
}
else
if
(
ann
instanceof
ConstantFieldsExcept
)
{
parseConstantFields
(
cf
.
value
(),
result
,
inType
,
outType
);
}
else
if
(
ann
instanceof
ConstantFieldsExcept
)
{
ConstantFieldsExcept
cfe
=
(
ConstantFieldsExcept
)
ann
;
parseConstantFieldsExcept
(
cfe
.
value
(),
result
,
inType
,
outType
);
}
else
if
(
ann
instanceof
ReadFields
)
{
ReadFields
rf
=
(
ReadFields
)
ann
;
parseReadFields
(
rf
.
value
(),
result
,
inType
,
outType
);
}
}
return
null
;
}
private
void
parseConstantFields
(
ConstantFields
cf
,
SingleInputSemanticProperties
sm
)
{
}
private
void
parseConstantFieldsExcept
(
ConstantFieldsExcept
cfe
,
SingleInputSemanticProperties
sm
)
{
return
result
;
}
private
void
parseReadFields
(
ReadFields
rf
,
SingleInputSemanticProperties
sm
)
{
private
static
void
parseConstantFields
(
String
[]
cf
,
SingleInputSemanticProperties
sm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
for
(
String
s:
cf
)
{
readConstantSet
(
sm
,
s
,
inType
,
outType
,
0
);
}
}
private
static
void
readConstantSet
(
SemanticProperties
sp
,
String
s
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
,
int
input
)
{
Pattern
check
=
Pattern
.
compile
(
REGEX_ANNOTATION
);
Matcher
matcher
=
check
.
matcher
(
s
);
int
sourceField
=
0
;
if
(!
matcher
.
matches
())
{
throw
new
RuntimeException
(
"Wrong annotation String format. Please read the documentation."
);
}
sourceField
=
Integer
.
valueOf
(
matcher
.
group
(
1
));
if
(!
isValidField
(
inType
,
sourceField
))
{
throw
new
IndexOutOfBoundsException
(
"Annotation: Field "
+
sourceField
+
" not available in the input tuple."
);
}
FieldSet
fs
=
readFieldSetFromString
(
matcher
.
group
(
2
),
inType
,
outType
);
if
(
sp
instanceof
SingleInputSemanticProperties
)
{
((
SingleInputSemanticProperties
)
sp
).
addForwardedField
(
sourceField
,
fs
);
}
else
if
(
sp
instanceof
DualInputSemanticProperties
)
{
if
(
input
==
0
)
{
((
DualInputSemanticProperties
)
sp
).
addForwardedField1
(
sourceField
,
fs
);
}
else
if
(
input
==
1
)
{
((
DualInputSemanticProperties
)
sp
).
addForwardedField2
(
sourceField
,
fs
);
}
}
}
private
static
void
parseConstantFieldsFirst
(
String
[]
cff
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
Pattern
check
=
Pattern
.
compile
(
REGEX_ANNOTATION
);
for
(
String
s:
cff
)
{
readConstantSet
(
dm
,
s
,
inType
,
outType
,
0
);
}
}
private
static
void
parseConstantFieldsSecond
(
String
[]
cfs
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
Pattern
check
=
Pattern
.
compile
(
REGEX_ANNOTATION
);
for
(
String
s:
cfs
)
{
readConstantSet
(
dm
,
s
,
inType
,
outType
,
1
);
}
}
private
static
void
parseConstantFieldsFirstExcept
(
String
cffe
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
cffe
,
inType
,
outType
);
for
(
int
i
=
0
;
i
<
outType
.
getArity
();
i
++)
{
if
(!
fs
.
contains
(
i
))
{
dm
.
addForwardedField1
(
i
,
i
);
}
}
}
private
static
void
parseConstantFieldsSecondExcept
(
String
cfse
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
cfse
,
inType
,
outType
);
for
(
int
i
=
0
;
i
<
outType
.
getArity
();
i
++)
{
if
(!
fs
.
contains
(
i
))
{
dm
.
addForwardedField2
(
i
,
i
);
}
}
}
private
static
void
parseReadFieldsFirst
(
String
rf
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
rf
,
inType
,
outType
);
dm
.
addReadFields1
(
fs
);
}
private
static
void
parseReadFieldsSecond
(
String
rf
,
DualInputSemanticProperties
dm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
rf
,
inType
,
outType
);
dm
.
addReadFields2
(
fs
);
}
private
static
boolean
isValidField
(
TypeInformation
<?>
type
,
int
field
)
{
if
(
field
>
type
.
getArity
()
||
field
<
0
)
{
return
false
;
}
return
true
;
}
private
static
void
parseConstantFieldsExcept
(
String
cfe
,
SingleInputSemanticProperties
sm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
cfe
,
inType
,
outType
);
for
(
int
i
=
0
;
i
<
outType
.
getArity
();
i
++)
{
if
(!
fs
.
contains
(
i
))
{
sm
.
addForwardedField
(
i
,
i
);
}
}
}
private
static
FieldSet
readFieldSetFromString
(
String
s
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
Pattern
check
=
Pattern
.
compile
(
"\\s*(\\d+\\s*,\\s*)*(\\d+\\s*)"
);
Pattern
digit
=
Pattern
.
compile
(
"\\d+"
);
Matcher
matcher
=
check
.
matcher
(
s
);
if
(!
matcher
.
matches
())
{
throw
new
RuntimeException
(
"Wrong annotation String format. Please read the documentation."
);
}
matcher
=
digit
.
matcher
(
s
);
FieldSet
fs
=
new
FieldSet
();
while
(
matcher
.
find
())
{
int
field
=
Integer
.
valueOf
(
matcher
.
group
());
if
(!
isValidField
(
outType
,
field
)
||
!
isValidField
(
inType
,
field
))
{
throw
new
IndexOutOfBoundsException
(
"Annotation: Field "
+
field
+
" not available in the output tuple."
);
}
fs
.
add
(
field
);
}
return
fs
;
}
private
static
void
parseReadFields
(
String
rf
,
SingleInputSemanticProperties
sm
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
FieldSet
fs
=
readFieldSetFromString
(
rf
,
inType
,
outType
);
sm
.
addReadFields
(
fs
);
}
public
DualInputSemanticProperties
getSemanticPropsDua
(
Set
<
Annotation
>
set
,
TypeInformation
<?>
inType1
,
TypeInformation
<?>
inType2
,
TypeInformation
<?>
outType
)
{
return
null
;
public
static
SingleInputSemanticProperties
getSemanticPropsSingleFromString
(
String
[]
ConstantSet
,
String
constantSetExcept
,
String
ReadSet
,
TypeInformation
<?>
inType
,
TypeInformation
<?>
outType
)
{
return
null
;
}
public
static
DualInputSemanticProperties
getSemanticPropsDualFromString
(
String
[]
constantSetFirst
,
String
[]
constantSetSecond
,
String
constantSetFirstExcept
,
String
constantSetSecondExcept
,
String
readFieldsFirst
,
String
readFieldsSecond
,
TypeInformation
<?>
inType1
,
TypeInformation
<?>
inType2
,
TypeInformation
<?>
outType
)
{
return
null
;
}
public
static
DualInputSemanticProperties
getSemanticPropsDual
(
Set
<
Annotation
>
set
,
TypeInformation
<?>
inType1
,
TypeInformation
<?>
inType2
,
TypeInformation
<?>
outType
)
{
Iterator
<
Annotation
>
it
=
set
.
iterator
();
DualInputSemanticProperties
result
=
null
;
//non tuple types are not yet supported for annotations
if
(!
inType1
.
isTupleType
()
||
!
inType2
.
isTupleType
()
||
!
outType
.
isTupleType
())
{
return
null
;
}
while
(
it
.
hasNext
())
{
if
(
result
==
null
)
{
result
=
new
DualInputSemanticProperties
();
}
Annotation
ann
=
it
.
next
();
if
(
ann
instanceof
ConstantFieldsFirst
)
{
ConstantFieldsFirst
cff
=
(
ConstantFieldsFirst
)
ann
;
parseConstantFieldsFirst
(
cff
.
value
(),
result
,
inType1
,
outType
);
}
else
if
(
ann
instanceof
ConstantFieldsSecond
)
{
ConstantFieldsSecond
cfs
=
(
ConstantFieldsSecond
)
ann
;
parseConstantFieldsSecond
(
cfs
.
value
(),
result
,
inType2
,
outType
);
}
else
if
(
ann
instanceof
ConstantFieldsFirstExcept
)
{
ConstantFieldsFirstExcept
cffe
=
(
ConstantFieldsFirstExcept
)
ann
;
parseConstantFieldsFirstExcept
(
cffe
.
value
(),
result
,
inType1
,
outType
);
}
else
if
(
ann
instanceof
ConstantFieldsSecondExcept
)
{
ConstantFieldsSecondExcept
cfse
=
(
ConstantFieldsSecondExcept
)
ann
;
parseConstantFieldsSecondExcept
(
cfse
.
value
(),
result
,
inType2
,
outType
);
}
else
if
(
ann
instanceof
ReadFieldsFirst
)
{
ReadFieldsFirst
rff
=
(
ReadFieldsFirst
)
ann
;
parseReadFieldsFirst
(
rff
.
value
(),
result
,
inType1
,
outType
);
}
else
if
(
ann
instanceof
ReadFieldsSecond
)
{
ReadFieldsSecond
rfs
=
(
ReadFieldsSecond
)
ann
;
parseReadFieldsSecond
(
rfs
.
value
(),
result
,
inType2
,
outType
);
}
}
return
result
;
}
}
stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/translation/PlanFlatMapOperator.java
浏览文件 @
885b5e7a
...
...
@@ -18,10 +18,12 @@ import java.lang.annotation.Annotation;
import
java.util.Set
;
import
eu.stratosphere.api.common.functions.GenericFlatMap
;
import
eu.stratosphere.api.common.operators.SingleInputSemanticProperties
;
import
eu.stratosphere.api.common.operators.base.FlatMapOperatorBase
;
import
eu.stratosphere.api.java.functions.FlatMapFunction
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation
;
import
eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields
;
import
eu.stratosphere.api.java.functions.SemanticPropUtil
;
import
eu.stratosphere.api.java.typeutils.TypeInformation
;
...
...
@@ -39,7 +41,8 @@ public class PlanFlatMapOperator<T, O> extends FlatMapOperatorBase<GenericFlatMa
this
.
outType
=
outType
;
Set
<
Annotation
>
annotations
=
FunctionAnnotation
.
readSingleConstantAnnotations
(
this
.
getUserCodeWrapper
());
System
.
out
.
println
(
annotations
);
SingleInputSemanticProperties
sp
=
SemanticPropUtil
.
getSemanticPropsSingle
(
annotations
,
this
.
inType
,
this
.
outType
);
}
@Override
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录