提交 7081836e 编写于 作者: S Stephan Ewen

[FLINK-3303] [core] Move Tuple classes to flink-core

上级 54743866
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple10;
public class Tuple10Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> {
private List<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tuples = new ArrayList<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>();
private List<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tuples = new ArrayList<>();
public Tuple10Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9){
tuples.add(new Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9));
tuples.add(new Tuple10<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple11;
public class Tuple11Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> {
private List<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tuples = new ArrayList<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>();
private List<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tuples = new ArrayList<>();
public Tuple11Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10){
tuples.add(new Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10));
tuples.add(new Tuple11<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple12;
public class Tuple12Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> {
private List<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tuples = new ArrayList<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>();
private List<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tuples = new ArrayList<>();
public Tuple12Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11){
tuples.add(new Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11));
tuples.add(new Tuple12<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple13;
public class Tuple13Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> {
private List<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tuples = new ArrayList<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>();
private List<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tuples = new ArrayList<>();
public Tuple13Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12){
tuples.add(new Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12));
tuples.add(new Tuple13<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple14;
public class Tuple14Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> {
private List<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tuples = new ArrayList<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>();
private List<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tuples = new ArrayList<>();
public Tuple14Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13){
tuples.add(new Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13));
tuples.add(new Tuple14<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple15;
public class Tuple15Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> {
private List<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tuples = new ArrayList<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>();
private List<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tuples = new ArrayList<>();
public Tuple15Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14){
tuples.add(new Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14));
tuples.add(new Tuple15<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple16;
public class Tuple16Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> {
private List<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tuples = new ArrayList<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>();
private List<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tuples = new ArrayList<>();
public Tuple16Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15){
tuples.add(new Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15));
tuples.add(new Tuple16<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple17;
public class Tuple17Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> {
private List<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tuples = new ArrayList<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>();
private List<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tuples = new ArrayList<>();
public Tuple17Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16){
tuples.add(new Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16));
tuples.add(new Tuple17<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple18;
public class Tuple18Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> {
private List<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tuples = new ArrayList<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>();
private List<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tuples = new ArrayList<>();
public Tuple18Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17){
tuples.add(new Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17));
tuples.add(new Tuple18<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple19;
public class Tuple19Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> {
private List<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tuples = new ArrayList<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>();
private List<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tuples = new ArrayList<>();
public Tuple19Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18){
tuples.add(new Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18));
tuples.add(new Tuple19<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple1;
public class Tuple1Builder<T0> {
private List<Tuple1<T0>> tuples = new ArrayList<Tuple1<T0>>();
private List<Tuple1<T0>> tuples = new ArrayList<>();
public Tuple1Builder<T0> add(T0 value0){
tuples.add(new Tuple1<T0>(value0));
tuples.add(new Tuple1<>(value0));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple20;
public class Tuple20Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> {
private List<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tuples = new ArrayList<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>();
private List<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tuples = new ArrayList<>();
public Tuple20Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19){
tuples.add(new Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19));
tuples.add(new Tuple20<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple21;
public class Tuple21Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> {
private List<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tuples = new ArrayList<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>();
private List<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tuples = new ArrayList<>();
public Tuple21Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20){
tuples.add(new Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20));
tuples.add(new Tuple21<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple22;
public class Tuple22Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> {
private List<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tuples = new ArrayList<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>();
private List<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tuples = new ArrayList<>();
public Tuple22Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21){
tuples.add(new Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21));
tuples.add(new Tuple22<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple23;
public class Tuple23Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> {
private List<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tuples = new ArrayList<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>();
private List<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tuples = new ArrayList<>();
public Tuple23Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22){
tuples.add(new Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22));
tuples.add(new Tuple23<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple24;
public class Tuple24Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> {
private List<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tuples = new ArrayList<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>();
private List<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tuples = new ArrayList<>();
public Tuple24Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22, T23 value23){
tuples.add(new Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23));
tuples.add(new Tuple24<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple25;
public class Tuple25Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> {
private List<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tuples = new ArrayList<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>();
private List<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tuples = new ArrayList<>();
public Tuple25Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8, T9 value9, T10 value10, T11 value11, T12 value12, T13 value13, T14 value14, T15 value15, T16 value16, T17 value17, T18 value18, T19 value19, T20 value20, T21 value21, T22 value22, T23 value23, T24 value24){
tuples.add(new Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23, value24));
tuples.add(new Tuple25<>(value0, value1, value2, value3, value4, value5, value6, value7, value8, value9, value10, value11, value12, value13, value14, value15, value16, value17, value18, value19, value20, value21, value22, value23, value24));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
public class Tuple2Builder<T0, T1> {
private List<Tuple2<T0, T1>> tuples = new ArrayList<Tuple2<T0, T1>>();
private List<Tuple2<T0, T1>> tuples = new ArrayList<>();
public Tuple2Builder<T0, T1> add(T0 value0, T1 value1){
tuples.add(new Tuple2<T0, T1>(value0, value1));
tuples.add(new Tuple2<>(value0, value1));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
public class Tuple3Builder<T0, T1, T2> {
private List<Tuple3<T0, T1, T2>> tuples = new ArrayList<Tuple3<T0, T1, T2>>();
private List<Tuple3<T0, T1, T2>> tuples = new ArrayList<>();
public Tuple3Builder<T0, T1, T2> add(T0 value0, T1 value1, T2 value2){
tuples.add(new Tuple3<T0, T1, T2>(value0, value1, value2));
tuples.add(new Tuple3<>(value0, value1, value2));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple4;
public class Tuple4Builder<T0, T1, T2, T3> {
private List<Tuple4<T0, T1, T2, T3>> tuples = new ArrayList<Tuple4<T0, T1, T2, T3>>();
private List<Tuple4<T0, T1, T2, T3>> tuples = new ArrayList<>();
public Tuple4Builder<T0, T1, T2, T3> add(T0 value0, T1 value1, T2 value2, T3 value3){
tuples.add(new Tuple4<T0, T1, T2, T3>(value0, value1, value2, value3));
tuples.add(new Tuple4<>(value0, value1, value2, value3));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple5;
public class Tuple5Builder<T0, T1, T2, T3, T4> {
private List<Tuple5<T0, T1, T2, T3, T4>> tuples = new ArrayList<Tuple5<T0, T1, T2, T3, T4>>();
private List<Tuple5<T0, T1, T2, T3, T4>> tuples = new ArrayList<>();
public Tuple5Builder<T0, T1, T2, T3, T4> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4){
tuples.add(new Tuple5<T0, T1, T2, T3, T4>(value0, value1, value2, value3, value4));
tuples.add(new Tuple5<>(value0, value1, value2, value3, value4));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple6;
public class Tuple6Builder<T0, T1, T2, T3, T4, T5> {
private List<Tuple6<T0, T1, T2, T3, T4, T5>> tuples = new ArrayList<Tuple6<T0, T1, T2, T3, T4, T5>>();
private List<Tuple6<T0, T1, T2, T3, T4, T5>> tuples = new ArrayList<>();
public Tuple6Builder<T0, T1, T2, T3, T4, T5> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5){
tuples.add(new Tuple6<T0, T1, T2, T3, T4, T5>(value0, value1, value2, value3, value4, value5));
tuples.add(new Tuple6<>(value0, value1, value2, value3, value4, value5));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple7;
public class Tuple7Builder<T0, T1, T2, T3, T4, T5, T6> {
private List<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tuples = new ArrayList<Tuple7<T0, T1, T2, T3, T4, T5, T6>>();
private List<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tuples = new ArrayList<>();
public Tuple7Builder<T0, T1, T2, T3, T4, T5, T6> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6){
tuples.add(new Tuple7<T0, T1, T2, T3, T4, T5, T6>(value0, value1, value2, value3, value4, value5, value6));
tuples.add(new Tuple7<>(value0, value1, value2, value3, value4, value5, value6));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple8;
public class Tuple8Builder<T0, T1, T2, T3, T4, T5, T6, T7> {
private List<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tuples = new ArrayList<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>();
private List<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tuples = new ArrayList<>();
public Tuple8Builder<T0, T1, T2, T3, T4, T5, T6, T7> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7){
tuples.add(new Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>(value0, value1, value2, value3, value4, value5, value6, value7));
tuples.add(new Tuple8<>(value0, value1, value2, value3, value4, value5, value6, value7));
return this;
}
......
......@@ -32,10 +32,10 @@ import org.apache.flink.api.java.tuple.Tuple9;
public class Tuple9Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8> {
private List<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tuples = new ArrayList<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>();
private List<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tuples = new ArrayList<>();
public Tuple9Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8> add(T0 value0, T1 value1, T2 value2, T3 value3, T4 value4, T5 value5, T6 value6, T7 value7, T8 value8){
tuples.add(new Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>(value0, value1, value2, value3, value4, value5, value6, value7, value8));
tuples.add(new Tuple9<>(value0, value1, value2, value3, value4, value5, value6, value7, value8));
return this;
}
......
......@@ -18,15 +18,15 @@
package org.apache.flink.api.java.tuple;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Scanner;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
/**
* Source code generator for tuple classes and classes which depend on the arity of tuples.
*/
......@@ -34,7 +34,7 @@ class TupleGenerator {
// Parameters for tuple classes
private static final String ROOT_DIRECTORY = "./src/main/java";
private static final String ROOT_DIRECTORY = "./flink-core/src/main/java";
private static final String PACKAGE = "org.apache.flink.api.java.tuple";
......@@ -47,36 +47,17 @@ class TupleGenerator {
private static final String END_INDICATOR = "END_OF_TUPLE_DEPENDENT_CODE";
// Parameters for CsvReader
private static final String CSV_READER_PACKAGE = "org.apache.flink.api.java.io";
private static final String CSV_READER_CLASSNAME = "CsvReader";
// Parameters for TupleTypeInfo
private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
private static final String TUPLE_CLASSNAME = "Tuple";
// Parameters for ProjectOperator
private static final String PROJECT_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String PROJECT_OPERATOR_CLASSNAME = "ProjectOperator";
// Parameters for JoinOperator
private static final String JOIN_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String JOIN_OPERATOR_CLASSNAME = "JoinOperator";
// parameters for CrossOperator
private static final String CROSS_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String CROSS_OPERATOR_CLASSNAME = "CrossOperator";
// min. and max. tuple arity
private static final int FIRST = 1;
private static final int LAST = 25;
public static void main(String[] args) throws Exception {
System.err.println("Current directory "+System.getProperty("user.dir"));
String rootDir = ROOT_DIRECTORY;
......@@ -90,16 +71,8 @@ class TupleGenerator {
createTupleBuilderClasses(root);
modifyCsvReader(root);
modifyTupleType(root);
modifyProjectOperator(root);
modifyJoinProjectOperator(root);
modifyCrossProjectOperator(root);
}
private static File getPackage(File root, String packageString) {
......@@ -113,298 +86,57 @@ class TupleGenerator {
private static void insertCodeIntoFile(String code, File file) throws IOException {
String fileContent = Files.toString(file, Charsets.UTF_8);
Scanner s = new Scanner(fileContent);
StringBuilder sb = new StringBuilder();
String line = null;
boolean indicatorFound = false;
// add file beginning
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line + "\n");
if (line.contains(BEGIN_INDICATOR)) {
indicatorFound = true;
break;
try (Scanner s = new Scanner(fileContent)) {
StringBuilder sb = new StringBuilder();
String line;
boolean indicatorFound = false;
// add file beginning
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line).append("\n");
if (line.contains(BEGIN_INDICATOR)) {
indicatorFound = true;
break;
}
}
}
if(!indicatorFound) {
System.out.println("No indicator found in '" + file + "'. Will skip code generation.");
s.close();
return;
}
// add generator signature
sb.append("\t// GENERATED FROM " + TupleGenerator.class.getName() + ".\n");
// add tuple dependent code
sb.append(code + "\n");
// skip generated code
while (s.hasNextLine() && (line = s.nextLine()) != null) {
if (line.contains(END_INDICATOR)) {
sb.append(line + "\n");
break;
if(!indicatorFound) {
System.out.println("No indicator found in '" + file + "'. Will skip code generation.");
s.close();
return;
}
// add generator signature
sb.append("\t// GENERATED FROM ").append(TupleGenerator.class.getName()).append(".\n");
// add tuple dependent code
sb.append(code).append("\n");
// skip generated code
while (s.hasNextLine() && (line = s.nextLine()) != null) {
if (line.contains(END_INDICATOR)) {
sb.append(line).append("\n");
break;
}
}
// add file ending
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line).append("\n");
}
Files.write(sb.toString(), file, Charsets.UTF_8);
}
// add file ending
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line + "\n");
}
s.close();
Files.write(sb.toString(), file, Charsets.UTF_8);
}
private static void modifyCrossProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#fieldIndexes} \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectCross<I1, I2, OUT> projectionCross = null;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projectionCross = (ProjectCross<I1, I2, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projectionCross;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a pair of crossed elements to a {@link Tuple} with the previously selected fields. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected data set.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectCross<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes);\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectCross<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds1, this.ds2, this.fieldIndexes, this.isFieldInFirst, tType, this, hint);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, CROSS_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, CROSS_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes} \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see org.apache.flink.api.java.operators.ProjectOperator.Projection\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectOperator<T, OUT> projOperator;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projOperator = (ProjectOperator<T, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projOperator;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectOperator<T, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectOperator<T, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds, this.fieldIndexes, tType);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, PROJECT_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, PROJECT_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyJoinProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes}\n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectJoin<I1, I2, OUT> projectJoin = null;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projectJoin = (ProjectJoin<I1, I2, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projectJoin;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. \n");
sb.append("\t\t * Requires the classes of the fields of the resulting tuples. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected data set.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectJoin<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes);\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectJoin<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds1, this.ds2, this.keys1, this.keys2, this.hint, this.fieldIndexes, this.isFieldInFirst, tType, this);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, JOIN_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, JOIN_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyTupleType(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
sb.append("\tprivate static final Class<?>[] CLASSES = new Class<?>[] {\n\t\tTuple0.class");
for (int i = FIRST; i <= LAST; i++) {
sb.append(", Tuple" + i + ".class");
sb.append(", Tuple").append(i).append(".class");
}
sb.append("\n\t};");
......@@ -414,92 +146,6 @@ class TupleGenerator {
insertCodeIntoFile(sb.toString(), tupleTypeInfoClass);
}
private static void modifyCsvReader(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder(1000);
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// java doc
sb.append("\t/**\n");
sb.append("\t * Specifies the types for the CSV fields. This method parses the CSV data to a ").append(numFields).append("-tuple\n");
sb.append("\t * which has fields of the specified types.\n");
sb.append("\t * This method is overloaded for each possible length of the tuples to support type safe\n");
sb.append("\t * creation of data sets through CSV parsing.\n");
sb.append("\t *\n");
for (int pos = 0; pos < numFields; pos++) {
sb.append("\t * @param type").append(pos);
sb.append(" The type of CSV field ").append(pos).append(" and the type of field ");
sb.append(pos).append(" in the returned tuple type.\n");
}
sb.append("\t * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data.\n");
sb.append("\t */\n");
// method signature
sb.append("\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> DataSource<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> types(");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append("Class<");
sb.append(GEN_TYPE_PREFIX + i);
sb.append("> type" + i);
}
sb.append(") {\n");
// get TupleTypeInfo
sb.append("\t\tTupleTypeInfo<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append("type" + i);
}
sb.append(");\n");
// create csv input format
sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> inputFormat = new TupleCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(path, types, this.includedMask);\n");
// configure input format
sb.append("\t\tconfigureInputFormat(inputFormat);\n");
// return
sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(executionContext, inputFormat, types, Utils.getCallLocationName());\n");
// end of method
sb.append("\t}\n");
}
// insert code into file
File dir = getPackage(root, CSV_READER_PACKAGE);
File csvReaderClass = new File(dir, CSV_READER_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), csvReaderClass);
}
private static void appendTupleTypeGenerics(StringBuilder sb, int numFields) {
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append(GEN_TYPE_PREFIX + i);
}
}
private static void createTupleClasses(File root) throws FileNotFoundException {
File dir = getPackage(root, PACKAGE);
......@@ -809,9 +455,7 @@ class TupleGenerator {
// Class-Attributes - a list of tuples
w.print("\tprivate List<Tuple" + numFields);
printGenericsString(w, numFields);
w.print("> tuples = new ArrayList<Tuple" + numFields );
printGenericsString(w, numFields);
w.println(">();");
w.println("> tuples = new ArrayList<>();");
w.println();
// add(...) function for adding a single tuple
......@@ -825,9 +469,7 @@ class TupleGenerator {
w.print(GEN_TYPE_PREFIX + i + " value" + i);
}
w.println("){");
w.print("\t\ttuples.add(new Tuple" + numFields);
printGenericsString(w, numFields);
w.print("(");
w.print("\t\ttuples.add(new Tuple" + numFields + "<>(");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
w.print(", ");
......
......@@ -27,7 +27,7 @@ public class Tuple2Test {
@Test
public void testSwapValues() {
Tuple2<String, Integer> toSwap = new Tuple2<String, Integer>(new String("Test case"), 25);
Tuple2<String, Integer> toSwap = new Tuple2<>("Test case", 25);
Tuple2<Integer, String> swapped = toSwap.swap();
Assert.assertEquals(swapped.f0, toSwap.f1);
......@@ -37,7 +37,7 @@ public class Tuple2Test {
@Test(expected = NullFieldException.class)
public void testGetFieldNotNull() {
Tuple2<String, Integer> tuple = new Tuple2<String, Integer>(new String("Test case"), null);
Tuple2<String, Integer> tuple = new Tuple2<>("Test case", null);
Assert.assertEquals("Test case", tuple.getFieldNotNull(0));
tuple.getFieldNotNull(1);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.tuple;
import java.io.File;
import java.io.IOException;
import java.util.Scanner;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
/**
* Source code generator for tuple classes and classes which depend on the arity of tuples.
*/
class TupleGenerator {
// Parameters for tuple classes
private static final String ROOT_DIRECTORY = "./flink-java/src/main/java";
private static final String GEN_TYPE_PREFIX = "T";
// Parameters for tuple-dependent classes
private static final String BEGIN_INDICATOR = "BEGIN_OF_TUPLE_DEPENDENT_CODE";
private static final String END_INDICATOR = "END_OF_TUPLE_DEPENDENT_CODE";
// Parameters for CsvReader
private static final String CSV_READER_PACKAGE = "org.apache.flink.api.java.io";
private static final String CSV_READER_CLASSNAME = "CsvReader";
// Parameters for ProjectOperator
private static final String PROJECT_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String PROJECT_OPERATOR_CLASSNAME = "ProjectOperator";
// Parameters for JoinOperator
private static final String JOIN_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String JOIN_OPERATOR_CLASSNAME = "JoinOperator";
// parameters for CrossOperator
private static final String CROSS_OPERATOR_PACKAGE = "org.apache.flink.api.java.operators";
private static final String CROSS_OPERATOR_CLASSNAME = "CrossOperator";
// min. and max. tuple arity
private static final int FIRST = 1;
private static final int LAST = 25;
public static void main(String[] args) throws Exception {
System.err.println("Current directory "+System.getProperty("user.dir"));
String rootDir = ROOT_DIRECTORY;
if(args.length > 0) {
rootDir = args[0] + "/" + ROOT_DIRECTORY;
}
System.err.println("Using root directory: "+rootDir);
File root = new File(rootDir);
modifyCsvReader(root);
modifyProjectOperator(root);
modifyJoinProjectOperator(root);
modifyCrossProjectOperator(root);
}
private static File getPackage(File root, String packageString) {
File dir = new File(root, packageString.replace('.', '/'));
if (!dir.exists() && dir.isDirectory()) {
System.err.println("None existent directory: " + dir.getAbsolutePath());
System.exit(1);
}
return dir;
}
private static void insertCodeIntoFile(String code, File file) throws IOException {
String fileContent = Files.toString(file, Charsets.UTF_8);
try (Scanner s = new Scanner(fileContent)) {
StringBuilder sb = new StringBuilder();
String line;
boolean indicatorFound = false;
// add file beginning
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line).append("\n");
if (line.contains(BEGIN_INDICATOR)) {
indicatorFound = true;
break;
}
}
if(!indicatorFound) {
System.out.println("No indicator found in '" + file + "'. Will skip code generation.");
s.close();
return;
}
// add generator signature
sb.append("\t// GENERATED FROM ").append(TupleGenerator.class.getName()).append(".\n");
// add tuple dependent code
sb.append(code).append("\n");
// skip generated code
while (s.hasNextLine() && (line = s.nextLine()) != null) {
if (line.contains(END_INDICATOR)) {
sb.append(line).append("\n");
break;
}
}
// add file ending
while (s.hasNextLine() && (line = s.nextLine()) != null) {
sb.append(line).append("\n");
}
s.close();
Files.write(sb.toString(), file, Charsets.UTF_8);
}
}
private static void modifyCrossProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#fieldIndexes} \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectCross<I1, I2, OUT> projectionCross = null;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projectionCross = (ProjectCross<I1, I2, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projectionCross;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a pair of crossed elements to a {@link Tuple} with the previously selected fields. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected data set.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectCross<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes);\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectCross<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds1, this.ds2, this.fieldIndexes, this.isFieldInFirst, tType, this, hint);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, CROSS_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, CROSS_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.ProjectOperator.Projection#fieldIndexes} \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see org.apache.flink.api.java.operators.ProjectOperator.Projection\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectOperator<T, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectOperator<T, OUT> projOperator;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projOperator = (ProjectOperator<T, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projOperator;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a {@link Tuple} {@link DataSet} to the previously selected fields. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectOperator<T, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, ds.getType());\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectOperator<T, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds, this.fieldIndexes, tType);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, PROJECT_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, PROJECT_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyJoinProjectOperator(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder();
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Chooses a projectTupleX according to the length of\n");
sb.append("\t\t * {@link org.apache.flink.api.java.operators.JoinOperator.JoinProjection#fieldIndexes}\n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected DataSet.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\t@SuppressWarnings(\"unchecked\")\n");
sb.append("\t\tpublic <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectTupleX() {\n");
sb.append("\t\t\tProjectJoin<I1, I2, OUT> projectJoin = null;\n\n");
sb.append("\t\t\tswitch (fieldIndexes.length) {\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
sb.append("\t\t\tcase " + numFields +":" + " projectJoin = (ProjectJoin<I1, I2, OUT>) projectTuple"+numFields+"(); break;\n");
}
sb.append("\t\t\tdefault: throw new IllegalStateException(\"Excessive arity in tuple.\");\n");
sb.append("\t\t\t}\n\n");
sb.append("\t\t\treturn projectJoin;\n");
// method end
sb.append("\t\t}\n");
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// method comment
sb.append("\t\t/**\n");
sb.append("\t\t * Projects a pair of joined elements to a {@link Tuple} with the previously selected fields. \n");
sb.append("\t\t * Requires the classes of the fields of the resulting tuples. \n");
sb.append("\t\t * \n");
sb.append("\t\t * @return The projected data set.\n");
sb.append("\t\t * \n");
sb.append("\t\t * @see Tuple\n");
sb.append("\t\t * @see DataSet\n");
sb.append("\t\t */\n");
// method signature
sb.append("\t\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> ProjectJoin<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> projectTuple"+numFields+"(");
sb.append(") {\n");
// extract field types
sb.append("\t\t\tTypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes);\n");
// create new tuple type info
sb.append("\t\t\tTupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> tType = new TupleTypeInfo<Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(fTypes);\n\n");
// create and return new project operator
sb.append("\t\t\treturn new ProjectJoin<I1, I2, Tuple"+numFields+"<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(this.ds1, this.ds2, this.keys1, this.keys2, this.hint, this.fieldIndexes, this.isFieldInFirst, tType, this);\n");
// method end
sb.append("\t\t}\n");
}
// insert code into file
File dir = getPackage(root, JOIN_OPERATOR_PACKAGE);
File projectOperatorClass = new File(dir, JOIN_OPERATOR_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), projectOperatorClass);
}
private static void modifyCsvReader(File root) throws IOException {
// generate code
StringBuilder sb = new StringBuilder(1000);
for (int numFields = FIRST; numFields <= LAST; numFields++) {
// method begin
sb.append("\n");
// java doc
sb.append("\t/**\n");
sb.append("\t * Specifies the types for the CSV fields. This method parses the CSV data to a ").append(numFields).append("-tuple\n");
sb.append("\t * which has fields of the specified types.\n");
sb.append("\t * This method is overloaded for each possible length of the tuples to support type safe\n");
sb.append("\t * creation of data sets through CSV parsing.\n");
sb.append("\t *\n");
for (int pos = 0; pos < numFields; pos++) {
sb.append("\t * @param type").append(pos);
sb.append(" The type of CSV field ").append(pos).append(" and the type of field ");
sb.append(pos).append(" in the returned tuple type.\n");
}
sb.append("\t * @return The {@link org.apache.flink.api.java.DataSet} representing the parsed CSV data.\n");
sb.append("\t */\n");
// method signature
sb.append("\tpublic <");
appendTupleTypeGenerics(sb, numFields);
sb.append("> DataSource<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> types(");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append("Class<");
sb.append(GEN_TYPE_PREFIX + i);
sb.append("> type" + i);
}
sb.append(") {\n");
// get TupleTypeInfo
sb.append("\t\tTupleTypeInfo<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> types = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append("type" + i);
}
sb.append(");\n");
// create csv input format
sb.append("\t\tCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">> inputFormat = new TupleCsvInputFormat<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(path, types, this.includedMask);\n");
// configure input format
sb.append("\t\tconfigureInputFormat(inputFormat);\n");
// return
sb.append("\t\treturn new DataSource<Tuple" + numFields + "<");
appendTupleTypeGenerics(sb, numFields);
sb.append(">>(executionContext, inputFormat, types, Utils.getCallLocationName());\n");
// end of method
sb.append("\t}\n");
}
// insert code into file
File dir = getPackage(root, CSV_READER_PACKAGE);
File csvReaderClass = new File(dir, CSV_READER_CLASSNAME + ".java");
insertCodeIntoFile(sb.toString(), csvReaderClass);
}
private static void appendTupleTypeGenerics(StringBuilder sb, int numFields) {
for (int i = 0; i < numFields; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append(GEN_TYPE_PREFIX + i);
}
}
private static String HEADER =
"/*\n"
+ " * Licensed to the Apache Software Foundation (ASF) under one\n"
+ " * or more contributor license agreements. See the NOTICE file\n"
+ " * distributed with this work for additional information\n"
+ " * regarding copyright ownership. The ASF licenses this file\n"
+ " * to you under the Apache License, Version 2.0 (the\n"
+ " * \"License\"); you may not use this file except in compliance\n"
+ " * with the License. You may obtain a copy of the License at\n"
+ " *\n"
+ " * http://www.apache.org/licenses/LICENSE-2.0\n"
+ " *\n"
+ " * Unless required by applicable law or agreed to in writing, software\n"
+ " * distributed under the License is distributed on an \"AS IS\" BASIS,\n"
+ " * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n"
+ " * See the License for the specific language governing permissions and\n"
+ " * limitations under the License.\n"
+ " */" +
"\n" +
"\n" +
"\n" +
"// --------------------------------------------------------------\n" +
"// THIS IS A GENERATED SOURCE FILE. DO NOT EDIT!\n" +
"// GENERATED FROM " + TupleGenerator.class.getName() + ".\n" +
"// --------------------------------------------------------------\n\n\n";
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册