提交 38f84496 编写于 作者: S sebastian kunert 提交者: StephanEwen

implemented semanticPropUtils class, added compatibility to operators

上级 885b5e7a
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.api.java.functions;
import java.lang.annotation.Annotation;
......@@ -23,205 +36,257 @@ import eu.stratosphere.api.java.typeutils.TypeInformation;
public class SemanticPropUtil {
private final static String REGEX_ANNOTATION = "\\s*(\\d+)\\s*->(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))";
private final static String REGEX_ANNOTATION = "\\s*(\\d+)\\s*->(\\s*(\\d+\\s*,\\s*)*(\\d+\\s*))";
public static SingleInputSemanticProperties getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
public static SingleInputSemanticProperties getSemanticPropsSingle(Set<Annotation> set, TypeInformation<?> inType, TypeInformation<?> outType) {
if (set == null) {
return null;
}
Iterator<Annotation> it = set.iterator();
SingleInputSemanticProperties result = null;
//non tuple types are not yet supported for annotations
if (!inType.isTupleType() || !outType.isTupleType()) {
return null;
}
//non tuple types are not yet supported for annotations
if (!inType.isTupleType() || !outType.isTupleType()) {
return null;
}
while (it.hasNext()) {
if (result == null) {
result = new SingleInputSemanticProperties();
}
Annotation ann = it.next();
if (ann instanceof ConstantFields) {
ConstantFields cf = (ConstantFields) ann;
parseConstantFields(cf.value(), result, inType, outType);
} else if (ann instanceof ConstantFieldsExcept) {
parseConstantFields(cf.value(), result, inType, outType);
} else if (ann instanceof ConstantFieldsExcept) {
ConstantFieldsExcept cfe = (ConstantFieldsExcept) ann;
parseConstantFieldsExcept(cfe.value(), result, inType, outType);
parseConstantFieldsExcept(cfe.value(), result, inType, outType);
} else if (ann instanceof ReadFields) {
ReadFields rf = (ReadFields) ann;
parseReadFields(rf.value(), result, inType, outType);
parseReadFields(rf.value(), result, inType, outType);
}
}
return result;
}
private static void parseConstantFields(String[] cf, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) {
for (String s: cf) {
readConstantSet(sm, s, inType, outType, 0);
}
}
private static void readConstantSet(SemanticProperties sp, String s, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
Pattern check = Pattern.compile(REGEX_ANNOTATION);
Matcher matcher = check.matcher(s);
int sourceField = 0;
if (!matcher.matches()) {
throw new RuntimeException("Wrong annotation String format. Please read the documentation.");
}
sourceField = Integer.valueOf(matcher.group(1));
if (!isValidField(inType, sourceField)) {
throw new IndexOutOfBoundsException("Annotation: Field " + sourceField + " not available in the input tuple.");
}
FieldSet fs = readFieldSetFromString(matcher.group(2), inType, outType);
if (sp instanceof SingleInputSemanticProperties) {
((SingleInputSemanticProperties) sp).addForwardedField(sourceField, fs);
} else if (sp instanceof DualInputSemanticProperties) {
if (input == 0) {
((DualInputSemanticProperties) sp).addForwardedField1(sourceField, fs);
} else if (input == 1) {
((DualInputSemanticProperties) sp).addForwardedField2(sourceField, fs);
}
}
}
private static void parseConstantFieldsFirst(String[] cff, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
Pattern check = Pattern.compile(REGEX_ANNOTATION);
for (String s: cff) {
readConstantSet(dm, s, inType, outType, 0);
}
}
private static void parseConstantFieldsSecond(String[] cfs, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
Pattern check = Pattern.compile(REGEX_ANNOTATION);
for (String s: cfs) {
readConstantSet(dm, s, inType, outType, 1);
}
}
private static void parseConstantFieldsFirstExcept(String cffe, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(cffe, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
dm.addForwardedField1(i, i);
}
}
}
private static void parseConstantFieldsSecondExcept(String cfse, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(cfse, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
dm.addForwardedField2(i, i);
}
}
}
private static void parseReadFieldsFirst(String rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(rf, inType, outType);
dm.addReadFields1(fs);
}
private static void parseReadFieldsSecond(String rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(rf, inType, outType);
dm.addReadFields2(fs);
}
private static boolean isValidField(TypeInformation<?> type, int field) {
if (field > type.getArity() || field < 0) {
return false;
}
return true;
}
if (cf == null) {
return;
}
for (String s : cf) {
readConstantSet(sm, s, inType, outType, 0);
}
}
private static void readConstantSet(SemanticProperties sp, String s, TypeInformation<?> inType, TypeInformation<?> outType, int input) {
Pattern check = Pattern.compile(REGEX_ANNOTATION);
Matcher matcher = check.matcher(s);
int sourceField = 0;
if (!matcher.matches()) {
throw new RuntimeException("Wrong annotation String format. Please read the documentation.");
}
sourceField = Integer.valueOf(matcher.group(1));
if (!isValidField(inType, sourceField)) {
throw new IndexOutOfBoundsException("Annotation: Field " + sourceField + " not available in the input tuple.");
}
FieldSet fs = readFieldSetFromString(matcher.group(2), inType, outType);
if (sp instanceof SingleInputSemanticProperties) {
((SingleInputSemanticProperties) sp).addForwardedField(sourceField, fs);
} else if (sp instanceof DualInputSemanticProperties) {
if (input == 0) {
((DualInputSemanticProperties) sp).addForwardedField1(sourceField, fs);
} else if (input == 1) {
((DualInputSemanticProperties) sp).addForwardedField2(sourceField, fs);
}
}
}
private static void parseConstantFieldsFirst(String[] cff, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (cff == null) {
return;
}
Pattern check = Pattern.compile(REGEX_ANNOTATION);
for (String s : cff) {
readConstantSet(dm, s, inType, outType, 0);
}
}
private static void parseConstantFieldsSecond(String[] cfs, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (cfs == null) {
return;
}
Pattern check = Pattern.compile(REGEX_ANNOTATION);
for (String s : cfs) {
readConstantSet(dm, s, inType, outType, 1);
}
}
private static void parseConstantFieldsFirstExcept(String cffe, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (cffe == null) {
return;
}
FieldSet fs = readFieldSetFromString(cffe, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
dm.addForwardedField1(i, i);
}
}
}
private static void parseConstantFieldsSecondExcept(String cfse, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (cfse == null) {
return;
}
FieldSet fs = readFieldSetFromString(cfse, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
dm.addForwardedField2(i, i);
}
}
}
private static void parseReadFieldsFirst(String rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (rf == null) {
return;
}
FieldSet fs = readFieldSetFromString(rf, inType, outType);
dm.addReadFields1(fs);
}
private static void parseReadFieldsSecond(String rf, DualInputSemanticProperties dm, TypeInformation<?> inType, TypeInformation<?> outType) {
if (rf == null) {
return;
}
FieldSet fs = readFieldSetFromString(rf, inType, outType);
dm.addReadFields2(fs);
}
private static boolean isValidField(TypeInformation<?> type, int field) {
if (field > type.getArity() || field < 0) {
return false;
}
return true;
}
private static void parseConstantFieldsExcept(String cfe, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(cfe, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
sm.addForwardedField(i,i);
}
}
}
private static FieldSet readFieldSetFromString(String s, TypeInformation<?> inType, TypeInformation<?> outType) {
Pattern check = Pattern.compile("\\s*(\\d+\\s*,\\s*)*(\\d+\\s*)");
Pattern digit = Pattern.compile("\\d+");
Matcher matcher = check.matcher(s);
if (!matcher.matches()) {
throw new RuntimeException("Wrong annotation String format. Please read the documentation.");
}
matcher = digit.matcher(s);
FieldSet fs = new FieldSet();
while (matcher.find()) {
int field = Integer.valueOf(matcher.group());
if (!isValidField(outType, field) || !isValidField(inType, field)) {
throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple.");
}
fs.add(field);
}
return fs;
}
if (cfe == null) {
return;
}
FieldSet fs = readFieldSetFromString(cfe, inType, outType);
for (int i = 0; i < outType.getArity(); i++) {
if (!fs.contains(i)) {
sm.addForwardedField(i, i);
}
}
}
private static FieldSet readFieldSetFromString(String s, TypeInformation<?> inType, TypeInformation<?> outType) {
Pattern check = Pattern.compile("\\s*(\\d+\\s*,\\s*)*(\\d+\\s*)");
Pattern digit = Pattern.compile("\\d+");
Matcher matcher = check.matcher(s);
if (!matcher.matches()) {
throw new RuntimeException("Wrong annotation String format. Please read the documentation.");
}
matcher = digit.matcher(s);
FieldSet fs = new FieldSet();
while (matcher.find()) {
int field = Integer.valueOf(matcher.group());
if (!isValidField(outType, field) || !isValidField(inType, field)) {
throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the output tuple.");
}
fs.add(field);
}
return fs;
}
private static void parseReadFields(String rf, SingleInputSemanticProperties sm, TypeInformation<?> inType, TypeInformation<?> outType) {
FieldSet fs = readFieldSetFromString(rf, inType, outType);
sm.addReadFields(fs);
if (rf == null) {
return;
}
FieldSet fs = readFieldSetFromString(rf, inType, outType);
sm.addReadFields(fs);
}
public static SingleInputSemanticProperties getSemanticPropsSingleFromString(String[] ConstantSet, String constantSetExcept, String ReadSet, TypeInformation<?> inType, TypeInformation<?> outType) {
return null;
}
public static SingleInputSemanticProperties getSemanticPropsSingleFromString(String[] constantSet, String constantSetExcept, String readSet, TypeInformation<?> inType, TypeInformation<?> outType) {
SingleInputSemanticProperties result = new SingleInputSemanticProperties();
parseConstantFields(constantSet, result, inType, outType);
parseConstantFieldsExcept(constantSetExcept, result, inType, outType);
parseReadFields(readSet, result, inType, outType);
return result;
}
public static DualInputSemanticProperties getSemanticPropsDualFromString(String[] constantSetFirst, String[] constantSetSecond, String constantSetFirstExcept,
String constantSetSecondExcept, String readFieldsFirst, String readFieldsSecond, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
return null;
}
public static DualInputSemanticProperties getSemanticPropsDualFromString(String[] constantSetFirst, String[] constantSetSecond, String constantSetFirstExcept,
String constantSetSecondExcept, String readFieldsFirst, String readFieldsSecond, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
DualInputSemanticProperties result = new DualInputSemanticProperties();
parseConstantFieldsFirst(constantSetFirst, result, inType1, outType);
parseConstantFieldsSecond(constantSetSecond, result, inType2, outType);
parseConstantFieldsFirstExcept(constantSetFirstExcept, result, inType1, outType);
parseConstantFieldsSecondExcept(constantSetSecondExcept, result, inType2, outType);
parseReadFieldsFirst(readFieldsFirst, result, inType1, outType);
parseReadFieldsSecond(readFieldsSecond, result, inType2, outType);
return result;
}
public static DualInputSemanticProperties getSemanticPropsDual(Set<Annotation> set, TypeInformation<?> inType1, TypeInformation<?> inType2, TypeInformation<?> outType) {
Iterator<Annotation> it = set.iterator();
DualInputSemanticProperties result = null;
//non tuple types are not yet supported for annotations
if (!inType1.isTupleType() || !inType2.isTupleType() || !outType.isTupleType()) {
return null;
}
while (it.hasNext()) {
if (result == null) {
result = new DualInputSemanticProperties();
}
Annotation ann = it.next();
if (ann instanceof ConstantFieldsFirst) {
ConstantFieldsFirst cff = (ConstantFieldsFirst) ann;
parseConstantFieldsFirst(cff.value(), result, inType1, outType);
} else if (ann instanceof ConstantFieldsSecond) {
ConstantFieldsSecond cfs = (ConstantFieldsSecond) ann;
parseConstantFieldsSecond(cfs.value(), result, inType2, outType);
} else if (ann instanceof ConstantFieldsFirstExcept) {
ConstantFieldsFirstExcept cffe = (ConstantFieldsFirstExcept) ann;
parseConstantFieldsFirstExcept(cffe.value(), result, inType1, outType);
} else if (ann instanceof ConstantFieldsSecondExcept) {
ConstantFieldsSecondExcept cfse = (ConstantFieldsSecondExcept) ann;
parseConstantFieldsSecondExcept(cfse.value(), result, inType2, outType);
} else if (ann instanceof ReadFieldsFirst) {
ReadFieldsFirst rff = (ReadFieldsFirst) ann;
parseReadFieldsFirst(rff.value(), result, inType1, outType);
} else if (ann instanceof ReadFieldsSecond) {
ReadFieldsSecond rfs = (ReadFieldsSecond) ann;
parseReadFieldsSecond(rfs.value(), result, inType2, outType);
}
}
return result;
if (set == null) {
return null;
}
Iterator<Annotation> it = set.iterator();
DualInputSemanticProperties result = null;
//non tuple types are not yet supported for annotations
if (!inType1.isTupleType() || !inType2.isTupleType() || !outType.isTupleType()) {
return null;
}
while (it.hasNext()) {
if (result == null) {
result = new DualInputSemanticProperties();
}
Annotation ann = it.next();
if (ann instanceof ConstantFieldsFirst) {
ConstantFieldsFirst cff = (ConstantFieldsFirst) ann;
parseConstantFieldsFirst(cff.value(), result, inType1, outType);
} else if (ann instanceof ConstantFieldsSecond) {
ConstantFieldsSecond cfs = (ConstantFieldsSecond) ann;
parseConstantFieldsSecond(cfs.value(), result, inType2, outType);
} else if (ann instanceof ConstantFieldsFirstExcept) {
ConstantFieldsFirstExcept cffe = (ConstantFieldsFirstExcept) ann;
parseConstantFieldsFirstExcept(cffe.value(), result, inType1, outType);
} else if (ann instanceof ConstantFieldsSecondExcept) {
ConstantFieldsSecondExcept cfse = (ConstantFieldsSecondExcept) ann;
parseConstantFieldsSecondExcept(cfse.value(), result, inType2, outType);
} else if (ann instanceof ReadFieldsFirst) {
ReadFieldsFirst rff = (ReadFieldsFirst) ann;
parseReadFieldsFirst(rff.value(), result, inType1, outType);
} else if (ann instanceof ReadFieldsSecond) {
ReadFieldsSecond rfs = (ReadFieldsSecond) ann;
parseReadFieldsSecond(rfs.value(), result, inType2, outType);
}
}
return result;
}
}
......@@ -15,14 +15,20 @@
package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericCoGrouper;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
public class PlanCogroupOperator<IN1, IN2, OUT>
import java.lang.annotation.Annotation;
import java.util.Set;
public class PlanCogroupOperator<IN1, IN2, OUT>
extends CoGroupOperatorBase<GenericCoGrouper<IN1, IN2, OUT>>
implements BinaryJavaPlanNode<IN1, IN2, OUT> {
private final TypeInformation<IN1> inType1;
private final TypeInformation<IN2> inType2;
private final TypeInformation<OUT> outType;
......@@ -31,10 +37,14 @@ public class PlanCogroupOperator<IN1, IN2, OUT>
CoGroupFunction<IN1, IN2, OUT> udf,
int[] keyPositions1, int[] keyPositions2, String name, TypeInformation<IN1> inType1, TypeInformation<IN2> inType2, TypeInformation<OUT> outType) {
super(udf, keyPositions1, keyPositions2, name);
this.inType1 = inType1;
this.inType2 = inType2;
this.outType = outType;
Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(this.getUserCodeWrapper());
DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDual(annotations, this.getInputType1(), this.getInputType2(), this.getReturnType());
this.setSemanticProperties(dsp);
}
@Override
......
......@@ -15,31 +15,41 @@
package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericCrosser;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.CrossOperatorBase;
import eu.stratosphere.api.java.functions.CrossFunction;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
public class PlanCrossOperator<IN1, IN2, OUT>
import java.lang.annotation.Annotation;
import java.util.Set;
public class PlanCrossOperator<IN1, IN2, OUT>
extends CrossOperatorBase<GenericCrosser<IN1, IN2, OUT>>
implements BinaryJavaPlanNode<IN1, IN2, OUT>{
private final TypeInformation<IN1> inType1;
private final TypeInformation<IN2> inType2;
private final TypeInformation<OUT> outType;
public PlanCrossOperator(
CrossFunction<IN1, IN2, OUT> udf,
String name,
TypeInformation<IN1> inType1, TypeInformation<IN2> inType2, TypeInformation<OUT> outType) {
super(udf, name);
this.inType1 = inType1;
this.inType2 = inType2;
this.outType = outType;
Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(this.getUserCodeWrapper());
DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDual(annotations, this.getInputType1(), this.getInputType2(), this.getReturnType());
this.setSemanticProperties(dsp);
}
@Override
public TypeInformation<OUT> getReturnType() {
return this.outType;
......
......@@ -14,37 +14,36 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java.operators.translation;
import java.lang.annotation.Annotation;
import java.util.Set;
import eu.stratosphere.api.common.functions.GenericFlatMap;
import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.FlatMapOperatorBase;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFields;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import java.lang.annotation.Annotation;
import java.util.Set;
public class PlanFlatMapOperator<T, O> extends FlatMapOperatorBase<GenericFlatMap<T, O>>
implements UnaryJavaPlanNode<T, O>
{
private final TypeInformation<T> inType;
private final TypeInformation<O> outType;
public PlanFlatMapOperator(FlatMapFunction<T, O> udf, String name, TypeInformation<T> inType, TypeInformation<O> outType) {
super(udf, name);
this.inType = inType;
this.outType = outType;
Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(this.getUserCodeWrapper());
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, this.inType, this.outType);
Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(this.getUserCodeWrapper());
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, this.getInputType(), this.getReturnType());
setSemanticProperties(sp);
}
@Override
public TypeInformation<O> getReturnType() {
return this.outType;
......
......@@ -15,11 +15,17 @@
package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericGroupReduce;
import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction.Combinable;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import java.lang.annotation.Annotation;
import java.util.Set;
/**
*
*/
......@@ -28,20 +34,24 @@ public class PlanGroupReduceOperator<IN, OUT> extends GroupReduceOperatorBase<Ge
{
private final TypeInformation<IN> inType;
private final TypeInformation<OUT> outType;
public PlanGroupReduceOperator(GroupReduceFunction<IN, OUT> udf, int[] logicalGroupingFields, String name,
public PlanGroupReduceOperator(GroupReduceFunction<IN, OUT> udf, int[] logicalGroupingFields, String name,
TypeInformation<IN> inputType, TypeInformation<OUT> outputType)
{
super(udf, logicalGroupingFields, name);
this.inType = inputType;
this.outType = outputType;
super.setCombinable(getUserCodeWrapper().getUserCodeAnnotation(Combinable.class) != null);
Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(this.getUserCodeWrapper());
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, this.inType, this.outType);
setSemanticProperties(sp);
}
@Override
public TypeInformation<OUT> getReturnType() {
return this.outType;
......
......@@ -15,14 +15,20 @@
package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.JoinOperatorBase;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.JoinFunction;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
public class PlanJoinOperator<IN1, IN2, OUT>
import java.lang.annotation.Annotation;
import java.util.Set;
public class PlanJoinOperator<IN1, IN2, OUT>
extends JoinOperatorBase<GenericJoiner<IN1, IN2, OUT>>
implements BinaryJavaPlanNode<IN1, IN2, OUT> {
private final TypeInformation<IN1> inType1;
private final TypeInformation<IN2> inType2;
private final TypeInformation<OUT> outType;
......@@ -31,12 +37,16 @@ public class PlanJoinOperator<IN1, IN2, OUT>
JoinFunction<IN1, IN2, OUT> udf,
int[] keyPositions1, int[] keyPositions2, String name, TypeInformation<IN1> inType1, TypeInformation<IN2> inType2, TypeInformation<OUT> outType) {
super(udf, keyPositions1, keyPositions2, name);
this.inType1 = inType1;
this.inType2 = inType2;
this.outType = outType;
Set<Annotation> annotations = FunctionAnnotation.readDualConstantAnnotations(this.getUserCodeWrapper());
DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDual(annotations, this.getInputType1(), this.getInputType2(), this.getReturnType());
this.setSemanticProperties(dsp);
}
@Override
public TypeInformation<OUT> getReturnType() {
return this.outType;
......
......@@ -15,10 +15,16 @@
package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericMap;
import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
import eu.stratosphere.api.common.operators.base.PlainMapOperatorBase;
import eu.stratosphere.api.java.functions.FunctionAnnotation;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import java.lang.annotation.Annotation;
import java.util.Set;
/**
*
*/
......@@ -27,16 +33,20 @@ public class PlanMapOperator<T, O> extends PlainMapOperatorBase<GenericMap<T, O>
{
private final TypeInformation<T> inType;
private final TypeInformation<O> outType;
public PlanMapOperator(MapFunction<T, O> udf, String name, TypeInformation<T> inType, TypeInformation<O> outType) {
super(udf, name);
this.inType = inType;
this.outType = outType;
Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(this.getUserCodeWrapper());
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, this.inType, this.outType);
setSemanticProperties(sp);
}
@Override
public TypeInformation<O> getReturnType() {
return this.outType;
......
......@@ -17,8 +17,12 @@ package eu.stratosphere.api.java.operators.translation;
import eu.stratosphere.api.common.functions.GenericReduce;
import eu.stratosphere.api.common.operators.base.ReduceOperatorBase;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import java.lang.annotation.Annotation;
import java.util.Set;
/**
*
*/
......@@ -32,6 +36,10 @@ public class PlanReduceOperator<T> extends ReduceOperatorBase<GenericReduce<T>>
public PlanReduceOperator(ReduceFunction<T> udf, int[] logicalGroupingFields, String name, TypeInformation<T> type) {
super(udf, logicalGroupingFields, name);
this.type = type;
Set<Annotation> annotations = FunctionAnnotation.readSingleConstantAnnotations(this.getUserCodeWrapper());
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingle(annotations, this.getInputType(), this.getReturnType());
setSemanticProperties(sp);
}
......@@ -44,5 +52,4 @@ public class PlanReduceOperator<T> extends ReduceOperatorBase<GenericReduce<T>>
public TypeInformation<T> getInputType() {
return this.type;
}
}
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
package eu.stratosphere.test.semanticprops;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.SingleInputSemanticProperties;
import eu.stratosphere.api.common.operators.util.FieldSet;
import eu.stratosphere.api.java.functions.SemanticPropUtil;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import junit.framework.Assert;
import org.junit.Test;
public class SemanticPropUtilTest {
@Test
public void testSimpleCase() {
String[] constantFields = {"1->1,2", "2->3"};
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingleFromString(constantFields, null, null,type, type);
FieldSet fs = sp.getForwardedField(1);
Assert.assertTrue(fs.size() == 2);
Assert.assertTrue(fs.contains(1));
Assert.assertTrue(fs.contains(2));
fs = sp.getForwardedField(2);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(3));
}
@Test
public void testConstantFieldsExcept() {
String constantFieldsExcept = "1";
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingleFromString(null, constantFieldsExcept, null,type, type);
FieldSet fs = sp.getForwardedField(0);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(0));
fs = sp.getForwardedField(1);
Assert.assertTrue(fs == null);
fs = sp.getForwardedField(2);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(2));
}
@Test
public void testReadFields() {
String readFields = "1, 2";
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingleFromString(null, null, readFields,type, type);
FieldSet fs = sp.getReadFields();
Assert.assertTrue(fs.size() == 2);
Assert.assertTrue(fs.contains(2));
Assert.assertTrue(fs.contains(1));
}
@Test
public void testSimpleCaseDual() {
String[] constantFieldsFirst = {"1->1,2", "2->3"};
String[] constantFieldsSecond = {"1->1,2", "2->3"};
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDualFromString(constantFieldsFirst, constantFieldsSecond, null, null, null, null, type, type, type);
FieldSet fs = dsp.getForwardedField1(1);
Assert.assertTrue(fs.size() == 2);
Assert.assertTrue(fs.contains(1));
Assert.assertTrue(fs.contains(2));
fs = dsp.getForwardedField1(2);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(3));
}
@Test
public void testFieldsExceptDual() {
String constantFieldsFirstExcept = "1,2";
String[] constantFieldsSecond = {"0->1"};
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
DualInputSemanticProperties dsp = SemanticPropUtil.getSemanticPropsDualFromString(null, constantFieldsSecond, constantFieldsFirstExcept, null, null, null, type, type, type);
FieldSet fs = dsp.getForwardedField1(0);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(0));
fs = dsp.getForwardedField1(1);
Assert.assertTrue(fs == null);
fs = dsp.getForwardedField1(2);
Assert.assertTrue(fs == null);
fs = dsp.getForwardedField2(0);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(1));
}
@Test
public void testStringParse1() {
String[] constantFields = {" 1-> 1 , 2", "2 ->3"};
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingleFromString(constantFields, null, null,type, type);
FieldSet fs = sp.getForwardedField(1);
Assert.assertTrue(fs.size() == 2);
Assert.assertTrue(fs.contains(1));
Assert.assertTrue(fs.contains(2));
fs = sp.getForwardedField(2);
Assert.assertTrue(fs.size() == 1);
Assert.assertTrue(fs.contains(3));
}
@Test
public void testStringParse2() {
String[] constantFields = {"notValid"};
TypeInformation<?> type = new TupleTypeInfo<Tuple3<Integer, Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
try {
SingleInputSemanticProperties sp = SemanticPropUtil.getSemanticPropsSingleFromString(constantFields, null, null, type, type);
} catch (Exception e) {
return;
}
Assert.fail();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册