提交 0fd098fe 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] KafkaSink, added shutdown to KafkaSource

上级 b53e58f7
......@@ -12,11 +12,20 @@
<packaging>jar</packaging>
<properties>
<stratosphere.version>0.6-SNAPSHOT</stratosphere.version>
<stratosphere.version>0.5</stratosphere.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<repositories>
<repository>
<id>dms-repo</id>
<url>https://dms.sztaki.hu/maven-public</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>eu.stratosphere</groupId>
......
......@@ -213,10 +213,6 @@ public class DataStream<T extends Tuple> {
new FlatMapInvokable<T, R>(flatMapper), parallelism);
}
public <R extends Tuple> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper) {
return flatMap(flatMapper, 1);
}
/**
* Applies a Map transformation on a DataStream. The transformation calls a
* MapFunction for each element of the DataStream. Each MapFunction call
......@@ -236,10 +232,6 @@ public class DataStream<T extends Tuple> {
parallelism);
}
public <R extends Tuple> DataStream<R> map(MapFunction<T, R> mapper) {
return map(mapper, 1);
}
/**
* Applies a reduce transformation on preset chunks of the DataStream. The
* transformation calls a GroupReduceFunction for each tuple batch of the
......@@ -263,11 +255,6 @@ public class DataStream<T extends Tuple> {
new BatchReduceInvokable<T, R>(reducer), parallelism);
}
public <R extends Tuple> DataStream<R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
return batchReduce(reducer, batchSize, 1);
}
/**
* Applies a Filter transformation on a DataStream. The transformation calls
* a FilterFunction for each element of the DataStream and retains only
......@@ -286,10 +273,6 @@ public class DataStream<T extends Tuple> {
new FilterInvokable<T>(filter), parallelism);
}
public DataStream<T> filter(FilterFunction<T> filter) {
return filter(filter, 1);
}
/**
* Sets the given sink function.
*
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.util.Arrays;
import java.util.Collection;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
public class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
......@@ -28,6 +28,8 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
......@@ -35,8 +37,6 @@ import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -63,8 +63,8 @@ public class JobGraphBuilder {
protected String maxParallelismVertexName;
protected int maxParallelism;
protected FaultToleranceType faultToleranceType;
private int batchSize = 1;
private long batchTimeout = 1000;
private int batchSize;
private long batchTimeout;
/**
* Creates a new JobGraph with the given name
......@@ -87,12 +87,24 @@ public class JobGraphBuilder {
this.faultToleranceType = faultToleranceType;
}
public void setDefaultBatchSize(int batchSize) {
this.batchSize = batchSize;
}
/**
* Creates a new JobGraph with the given parameters
*
* @param jobGraphName
* Name of the JobGraph
* @param faultToleranceType
* Type of fault tolerance
* @param defaultBatchSize
* Default number of records to send at one emit
* @param defaultBatchTimeoutMillis
* defaultBatchTimeoutMillis
*/
public void setBatchTimeout(int timeout) {
this.batchTimeout = timeout;
public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType,
int defaultBatchSize, long defaultBatchTimeoutMillis) {
this(jobGraphName, faultToleranceType);
this.batchSize = defaultBatchSize;
this.batchTimeout = defaultBatchTimeoutMillis;
}
/**
......@@ -112,14 +124,14 @@ public class JobGraphBuilder {
* Number of parallel instances on one task manager
*/
public void setSource(String sourceName, UserSourceInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInvokableClass(StreamSource.class);
source.setInputClass(StreamSource.class);
setComponent(sourceName, source, InvokableObject, operatorName, serializedFunction,
parallelism);
parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
......@@ -144,12 +156,12 @@ public class JobGraphBuilder {
*/
public void setTask(String taskName,
UserTaskInvokable<? extends Tuple, ? extends Tuple> TaskInvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setInvokableClass(StreamTask.class);
task.setTaskClass(StreamTask.class);
setComponent(taskName, task, TaskInvokableObject, operatorName, serializedFunction,
parallelism);
parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
......@@ -173,11 +185,12 @@ public class JobGraphBuilder {
* Number of parallel instances on one task manager
*/
public void setSink(String sinkName, UserSinkInvokable<? extends Tuple> InvokableObject,
String operatorName, byte[] serializedFunction, int parallelism) {
String operatorName, byte[] serializedFunction, int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setInvokableClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction, parallelism);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, sink, InvokableObject, operatorName, serializedFunction,
parallelism, subtasksPerInstance);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
......@@ -205,11 +218,10 @@ public class JobGraphBuilder {
*/
private void setComponent(String componentName, AbstractJobVertex component,
Serializable InvokableObject, String operatorName, byte[] serializedFunction,
int parallelism) {
int parallelism, int subtasksPerInstance) {
component.setNumberOfSubtasks(parallelism);
// TODO remove all NumberOfSubtasks setting
// component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*
* @param parallelism
* Number of parallel cores utilized.
*/
@Override
public void execute() {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getDegreeOfParallelism());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.util.Collector;
/**
* Source Function used to generate the number sequence
*
*/
public class SequenceSource extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
......@@ -16,8 +16,8 @@
package eu.stratosphere.streaming.api;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -19,8 +19,8 @@ import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
......
......@@ -35,62 +35,56 @@ import eu.stratosphere.util.Collector;
* construct streaming topologies.
*
*/
public abstract class StreamExecutionEnvironment {
protected JobGraphBuilder jobGraphBuilder;
public class StreamExecutionEnvironment {
JobGraphBuilder jobGraphBuilder;
private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
private int degreeOfParallelism = -1;
private float clusterSize = 1;
/**
* Constructor for creating StreamExecutionEnvironment
* General constructor specifying the batch size in which the tuples are
* transmitted and their timeout boundary.
*
* @param defaultBatchSize
* number of tuples in a batch
* @param defaultBatchTimeoutMillis
* timeout boundary in milliseconds
*/
protected StreamExecutionEnvironment() {
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE);
}
public void setDefaultBatchSize(int batchSize) {
if (batchSize < 1) {
public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) {
if (defaultBatchSize < 1) {
throw new IllegalArgumentException("Batch size must be positive.");
} else {
jobGraphBuilder.setDefaultBatchSize(batchSize);
}
}
public void setBatchTimeout(int timeout) {
if (timeout < 1) {
if (defaultBatchTimeoutMillis < 1) {
throw new IllegalArgumentException("Batch timeout must be positive.");
} else {
jobGraphBuilder.setBatchTimeout(timeout);
}
jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE,
defaultBatchSize, defaultBatchTimeoutMillis);
}
/**
* Partitioning strategy on the stream.
* Constructor for transmitting tuples individually with a 1 second timeout.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
public int getDegreeOfParallelism() {
return degreeOfParallelism;
public StreamExecutionEnvironment() {
this(1, 1000);
}
public void setDegreeOfParallelism(int degreeOfParallelism) {
if (degreeOfParallelism < 1)
throw new IllegalArgumentException("Degree of parallelism must be at least one.");
this.degreeOfParallelism = degreeOfParallelism;
}
public static LocalStreamEnvironment createLocalEnvironment() {
return createLocalEnvironment(defaultLocalDop);
/**
* Set the number of machines in the executing cluster. Used for setting
* task parallelism.
*
* @param clusterSize
* cluster size
* @return environment
*/
public StreamExecutionEnvironment setClusterSize(int clusterSize) {
this.clusterSize = clusterSize;
return this;
}
public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
LocalStreamEnvironment lee = new LocalStreamEnvironment();
lee.setDegreeOfParallelism(degreeOfParallelism);
return lee;
/**
* Partitioning strategy on the stream.
*/
public static enum ConnectionType {
SHUFFLE, BROADCAST, FIELD
}
/**
......@@ -172,7 +166,8 @@ public abstract class StreamExecutionEnvironment {
DataStream<R> returnStream = new DataStream<R>(this, functionName);
jobGraphBuilder.setTask(returnStream.getId(), functionInvokable, functionName,
serializeToByteArray(function), parallelism);
serializeToByteArray(function), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
......@@ -197,7 +192,8 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = new DataStream<T>(this, "sink");
jobGraphBuilder.setSink(returnStream.getId(), new SinkInvokable<T>(sinkFunction), "sink",
serializeToByteArray(sinkFunction), parallelism);
serializeToByteArray(sinkFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
connectGraph(inputStream, returnStream.getId());
......@@ -217,6 +213,33 @@ public abstract class StreamExecutionEnvironment {
return addSource(new SequenceSource(from, to), 1);
}
/**
* Source Function used to generate the number sequence
*
*/
private static final class SequenceSource extends SourceFunction<Tuple1<Long>> {
private static final long serialVersionUID = 1L;
long from;
long to;
Tuple1<Long> outTuple = new Tuple1<Long>();
public SequenceSource(long from, long to) {
this.from = from;
this.to = to;
}
@Override
public void invoke(Collector<Tuple1<Long>> collector) throws Exception {
for (long i = from; i <= to; i++) {
outTuple.f0 = i;
collector.collect(outTuple);
}
}
}
/**
* Creates a new DataStream that contains the given elements. The elements
* must all be of the same type, for example, all of the String or Integer.
......@@ -234,7 +257,7 @@ public abstract class StreamExecutionEnvironment {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data[0]), 1);
"elements", serializeToByteArray(data[0]), 1, 1);
return returnStream.copy();
}
......@@ -255,11 +278,42 @@ public abstract class StreamExecutionEnvironment {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.setSource(returnStream.getId(), new FromElementsSource<X>(data),
"elements", serializeToByteArray(data.toArray()[0]), 1);
"elements", serializeToByteArray(data.toArray()[0]), 1, 1);
return returnStream.copy();
}
/**
* SourceFunction created to use with fromElements and fromCollection
*
* @param <T>
* type of the returned stream
*/
private static class FromElementsSource<T> extends SourceFunction<Tuple1<T>> {
private static final long serialVersionUID = 1L;
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
public FromElementsSource(T... elements) {
this.iterable = (Iterable<T>) Arrays.asList(elements);
}
public FromElementsSource(Collection<T> elements) {
this.iterable = (Iterable<T>) elements;
}
@Override
public void invoke(Collector<Tuple1<T>> collector) throws Exception {
for (T element : iterable) {
outTuple.f0 = element;
collector.collect(outTuple);
}
}
}
/**
* Ads a sink to the data stream closing it. To parallelism is defaulted to
* 1.
......@@ -312,10 +366,13 @@ public abstract class StreamExecutionEnvironment {
return returnStream;
}
// TODO: Link to JobGraph and ClusterUtil
/**
* Executes the JobGraph.
**/
public abstract void execute();
* Executes the JobGraph of the on a mini cluster of CLusterUtil.
*/
public void execute() {
ClusterUtil.runOnMiniCluster(jobGraphBuilder.getJobGraph());
}
public void executeCluster() {
ClusterUtil.runOnLocalCluster(jobGraphBuilder.getJobGraph(), "10.1.3.150", 6123);
......@@ -338,7 +395,8 @@ public abstract class StreamExecutionEnvironment {
DataStream<T> returnStream = new DataStream<T>(this, "source");
jobGraphBuilder.setSource(returnStream.getId(), sourceFunction, "source",
serializeToByteArray(sourceFunction), parallelism);
serializeToByteArray(sourceFunction), parallelism,
(int) Math.ceil(parallelism / clusterSize));
return returnStream.copy();
}
......
......@@ -37,13 +37,13 @@ import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
......@@ -65,7 +65,7 @@ import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector;
public final class StreamComponentHelper {
public final class StreamComponentHelper<T extends AbstractInvokable> {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
private static int numComponents = 0;
......@@ -194,30 +194,41 @@ public final class StreamComponentHelper {
}
}
public AbstractRecordReader getConfigInputs(AbstractInvokable taskBase,
Configuration taskConfiguration) throws StreamComponentException {
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
if (numberOfInputs < 2) {
return new StreamRecordReader(taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
if (taskBase instanceof StreamTask) {
return new StreamRecordReader((StreamTask) taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
} else if (taskBase instanceof StreamSink) {
return new StreamRecordReader((StreamSink) taskBase, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
} else {
@SuppressWarnings("unchecked")
MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs];
for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<StreamRecord>(taskBase);
if (taskBase instanceof StreamTask) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamTask) taskBase);
} else if (taskBase instanceof StreamSink) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamSink) taskBase);
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigInputs");
}
}
return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
}
}
public void setConfigOutputs(AbstractInvokable taskBase, Configuration taskConfiguration,
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
......@@ -227,8 +238,15 @@ public final class StreamComponentHelper {
setPartitioner(taskConfiguration, i, partitioners);
ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
outputs.add(new RecordWriter<StreamRecord>(taskBase, outputPartitioner));
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigOutputs");
}
if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i));
} else {
......@@ -265,30 +283,30 @@ public final class StreamComponentHelper {
return userFunction;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserSinkInvokable<Tuple> getSinkInvokable(Configuration config) {
@SuppressWarnings("rawtypes")
public UserSinkInvokable getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable<Tuple>) getInvokable(userFunctionClass, config);
return (UserSinkInvokable) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserTaskInvokable<Tuple, Tuple> getTaskInvokable(Configuration config) {
@SuppressWarnings("rawtypes")
public UserTaskInvokable getTaskInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable<Tuple, Tuple>) getInvokable(userFunctionClass, config);
return (UserTaskInvokable) getInvokable(userFunctionClass, config);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserSourceInvokable<Tuple> getSourceInvokable(Configuration config) {
@SuppressWarnings("rawtypes")
public UserSourceInvokable getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable<Tuple>) getInvokable(userFunctionClass, config);
return (UserSourceInvokable) getInvokable(userFunctionClass, config);
}
// TODO find a better solution for this
......@@ -310,7 +328,6 @@ public final class StreamComponentHelper {
private void setPartitioner(Configuration config, int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class);
......@@ -343,8 +360,8 @@ public final class StreamComponentHelper {
}
}
public void invokeRecords(StreamRecordInvokable<Tuple, Tuple> userFunction,
AbstractRecordReader inputs) throws Exception {
public void invokeRecords(StreamRecordInvokable userFunction, AbstractRecordReader inputs)
throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs;
while (recordReader.hasNext()) {
......
......@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
......
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
......@@ -9,6 +10,7 @@
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
......@@ -17,19 +19,19 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.nephele.io.AbstractSingleGateRecordReader;
import eu.stratosphere.nephele.io.InputChannelResult;
import eu.stratosphere.nephele.io.MutableRecordDeserializerFactory;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractSingleGateRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.runtime.io.gates.InputChannelResult;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* A record writer connects an input gate to an application. It allows the
* application query for incoming records and read them from input gate.
*
* @param <StreamRecord>
* The type of the record that can be read from this record reader.
*/
public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRecord> implements
Reader<StreamRecord> {
......@@ -37,6 +39,7 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<Tuple> deserializationDelegate;
private TupleSerializer<Tuple> tupleSerializer;
/**
* Stores the last read record.
*/
......@@ -59,14 +62,41 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
* @param recordType
* The class of records that can be read from the record reader.
* @param deserializationDelegate
* deserializationDelegate
* Deserialization delegate
* @param tupleSerializer
* Tuple serializer
*/
public StreamRecordReader(AbstractTask taskBase, Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
// super(taskBase, MutableRecordDeserializerFactory.<StreamRecord>
// get(), 0);
super(taskBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
}
/**
* Constructs a new record reader and registers a new input gate with the
* application's environment.
*
* @param outputBase
* The application that instantiated the record reader.
* @param recordType
* The class of records that can be read from the record reader.
* @param deserializationDelegate
* Deserialization delegate
* @param tupleSerializer
* tupleSerializer
* Tuple serializer
*/
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType,
public StreamRecordReader(AbstractOutputTask outputBase,
Class<? extends StreamRecord> recordType,
DeserializationDelegate<Tuple> deserializationDelegate,
TupleSerializer<Tuple> tupleSerializer) {
super(taskBase);
// super(outputBase, MutableRecordDeserializerFactory.<StreamRecord>
// get(), 0);
super(outputBase, MutableRecordDeserializerFactory.<StreamRecord> get(), 0);
this.recordType = recordType;
this.deserializationDelegate = deserializationDelegate;
this.tupleSerializer = tupleSerializer;
......@@ -103,11 +133,10 @@ public class StreamRecordReader extends AbstractSingleGateRecordReader<StreamRec
return true;
case END_OF_SUPERSTEP:
if (incrementEndOfSuperstepEventAndCheck()) {
if (incrementEndOfSuperstepEventAndCheck())
return false;
} else {
else
break; // fall through and wait for next record/event
}
case TASK_EVENT:
handleEvent(this.inputGate.getCurrentEvent());
......
......@@ -18,25 +18,25 @@ package eu.stratosphere.streaming.api.streamcomponent;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractInvokable {
public class StreamSink extends AbstractOutputTask {
private static final Log log = LogFactory.getLog(StreamSink.class);
private AbstractRecordReader inputs;
private UserSinkInvokable<Tuple> userFunction;
private StreamComponentHelper streamSinkHelper;
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
userFunction = null;
streamSinkHelper = new StreamComponentHelper();
streamSinkHelper = new StreamComponentHelper<StreamSink>();
}
@Override
......@@ -54,9 +54,8 @@ public class StreamSink extends AbstractInvokable {
}
}
// FaultToleranceType faultToleranceType =
// FaultToleranceType.from(taskConfiguration
// .getInteger("faultToleranceType", 0));
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceType", 0));
userFunction = streamSinkHelper.getSinkInvokable(taskConfiguration);
}
......@@ -66,7 +65,6 @@ public class StreamSink extends AbstractInvokable {
if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoked");
}
streamSinkHelper.invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoke finished");
......
......@@ -21,37 +21,50 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.examples.DummyIS;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamSource extends AbstractInvokable {
public class StreamSource extends AbstractInputTask<DummyIS> {
private static final Log log = LogFactory.getLog(StreamSource.class);
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserSourceInvokable<Tuple> userFunction;
private UserSourceInvokable userFunction;
private static int numSources;
private int sourceInstanceID;
private String name;
// private FaultToleranceUtil recordBuffer;
// private FaultToleranceType faultToleranceType;
StreamComponentHelper streamSourceHelper;
private FaultToleranceUtil recordBuffer;
private FaultToleranceType faultToleranceType;
StreamComponentHelper<StreamSource> streamSourceHelper;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
streamSourceHelper = new StreamComponentHelper();
streamSourceHelper = new StreamComponentHelper<StreamSource>();
numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
}
@Override
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<DummyIS> getInputSplitType() {
return null;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
......@@ -72,10 +85,10 @@ public class StreamSource extends AbstractInvokable {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
userFunction = (UserSourceInvokable<Tuple>) streamSourceHelper
userFunction = (UserSourceInvokable) streamSourceHelper
.getSourceInvokable(taskConfiguration);
// streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
// streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
}
@Override
......@@ -83,10 +96,6 @@ public class StreamSource extends AbstractInvokable {
if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
}
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
userFunction.invoke(streamSourceHelper.collector);
}
......
......@@ -21,31 +21,32 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public class StreamTask extends AbstractInvokable {
public class StreamTask extends AbstractTask {
private static final Log log = LogFactory.getLog(StreamTask.class);
private AbstractRecordReader inputs;
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserTaskInvokable<Tuple, Tuple> userFunction;
private UserTaskInvokable userFunction;
private static int numTasks;
private int taskInstanceID;
private String name;
private StreamComponentHelper streamTaskHelper;
// private FaultToleranceType faultToleranceType;
private StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultToleranceType faultToleranceType;
Configuration taskConfiguration;
// private FaultToleranceUtil recordBuffer;
private FaultToleranceUtil recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -54,7 +55,7 @@ public class StreamTask extends AbstractInvokable {
userFunction = null;
numTasks = StreamComponentHelper.newComponent();
taskInstanceID = numTasks;
streamTaskHelper = new StreamComponentHelper();
streamTaskHelper = new StreamComponentHelper<StreamTask>();
}
@Override
......@@ -78,11 +79,10 @@ public class StreamTask extends AbstractInvokable {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
userFunction = (UserTaskInvokable<Tuple, Tuple>) streamTaskHelper
.getTaskInvokable(taskConfiguration);
userFunction = (UserTaskInvokable) streamTaskHelper.getTaskInvokable(taskConfiguration);
// streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
// streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
}
@Override
......@@ -90,11 +90,6 @@ public class StreamTask extends AbstractInvokable {
if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
}
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
streamTaskHelper.invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) {
......
......@@ -19,10 +19,10 @@ import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.nephele.io.AbstractUnionRecordReader;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.Reader;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.runtime.io.api.AbstractUnionRecordReader;
import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.Reader;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public final class UnionStreamRecordReader extends AbstractUnionRecordReader<StreamRecord>
......
......@@ -52,7 +52,7 @@ public class BasicTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = env.addSource(new BasicSource(), SOURCE_PARALELISM)
.map(new BasicMap(), PARALELISM);
......
......@@ -111,7 +111,7 @@ public class CellInfoLocal {
//In this example two different source then connect the two stream and apply a function for the connected stream
// TODO add arguments
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<Boolean, Integer, Long, Integer>> querySource = env.addSource(
new QuerySource(), SOURCE_PARALELISM);
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class KMeansLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class PageRankLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env
......
......@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.util.LogUtils;
public class SSSPLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// @SuppressWarnings("unused")
// DataStream<Tuple2<String, Integer>> dataStream = env
......
......@@ -34,7 +34,7 @@ public class JoinLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new JoinSourceOne(),
SOURCE_PARALELISM);
......
......@@ -124,7 +124,7 @@ public class IncrementalLearningSkeleton {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Integer>> model =
env.addSource(new TrainingDataSource(), SOURCE_PARALELISM)
......
......@@ -154,7 +154,7 @@ public class IncrementalOLS {
public static void main(String[] args) {
StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Boolean, Double[]>> model =
env.addSource(new TrainingDataSource(), SOURCE_PARALELISM)
......
......@@ -36,7 +36,7 @@ public class WindowJoinLocal {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple4<String, String, Integer, Long>> dataStream1 = env.addSource(
new WindowJoinSourceOne(), SOURCE_PARALELISM);
......
......@@ -27,7 +27,7 @@ public class WindowSumLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, Long>> dataStream = env
.addSource(new WindowSumSource(), SOURCE_PARALELISM)
......
......@@ -27,7 +27,7 @@ public class WindowWordCountLocal {
// This example will count the occurrence of each word in the input file with a sliding window.
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
@SuppressWarnings("unused")
DataStream<Tuple3<String, Integer, Long>> dataStream = env
......
......@@ -23,16 +23,18 @@ import eu.stratosphere.streaming.util.TestDataUtil;
public class WordCountLocal {
// This example will count the occurrence of each word in the input file.
public static void main(String[] args) {
TestDataUtil.downloadIfNotExists("hamlet.txt");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextFile("src/test/resources/testdata/hamlet.txt")
.flatMap(new WordCountSplitter()).partitionBy(0).map(new WordCountCounter());
.flatMap(new WordCountSplitter(), 1)
.partitionBy(0)
.map(new WordCountCounter(), 1);
dataStream.print();
env.execute();
......
......@@ -20,7 +20,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceCounter;
......
package eu.stratosphere.streaming.kafka;
import java.util.*;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SinkFunction;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaSink extends SinkFunction<Tuple1<String>>{
private static final long serialVersionUID = 1L;
static kafka.javaapi.producer.Producer<Integer, String> producer;
static Properties props;
private String topicId;
private String brokerAddr;
public KafkaSink(String topicId, String brokerAddr){
this.topicId=topicId;
this.brokerAddr=brokerAddr;
}
public void initialize() {
props = new Properties();
props.put("metadata.broker.list", brokerAddr);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<Integer, String>(config);
}
@Override
public void invoke(Tuple1<String> tuple) {
initialize();
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topicId, tuple.f0);
producer.send(data);
producer.close();
}
}
......@@ -81,5 +81,6 @@ public class KafkaSource extends SourceFunction<Tuple1<String>> {
outTuple.f0 = message;
collector.collect(outTuple);
}
consumer.shutdown();
}
}
......@@ -17,18 +17,36 @@ package eu.stratosphere.streaming.kafka;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.DataStream;
import eu.stratosphere.streaming.api.SourceFunction;
import eu.stratosphere.streaming.api.StreamExecutionEnvironment;
import eu.stratosphere.util.Collector;
public class KafkaTopology {
public static final class MySource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
// TODO Auto-generated method stub
for(int i=0; i<10; i++){
collector.collect(new Tuple1<String>(Integer.toString(i)));
}
}
}
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new KafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
DataStream<Tuple1<String>> stream1 = env.addSource(new KafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
context.execute();
DataStream<Tuple1<String>> stream2 = env
.addSource(new MySource(), 1)
.addSink(new KafkaSink("test", "localhost:9092"));
env.execute();
}
}
\ No newline at end of file
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Grouping by a key
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Group to the partitioner with the lowest id
......
......@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.partitioner;
import java.util.Random;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
//Randomly group, to distribute equally
......
......@@ -33,7 +33,7 @@ public class WordCountPerformanceLocal {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment().setClusterSize(2);
DataStream<Tuple2<String, Integer>> dataStream = env
.readTextStream("/home/strato/stratosphere-distrib/resources/hamlet.txt", 4)
......
......@@ -24,7 +24,7 @@ public class RMQTopology {
private static final int SOURCE_PARALELISM = 1;
public static void main(String[] args) {
StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> stream = context.addSource(new RMQSource("localhost", "hello"),
SOURCE_PARALELISM).print();
......
......@@ -30,14 +30,14 @@ public class ClusterUtil {
*
* @param jobGraph
*/
public static void runOnMiniCluster(JobGraph jobGraph, int numberOfTaskTrackers) {
public static void runOnMiniCluster(JobGraph jobGraph) {
System.out.println("Running on mini cluster");
Configuration configuration = jobGraph.getJobConfiguration();
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.setNumTaskTracker(numberOfTaskTrackers);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
System.out.println("Running on mini cluster");
try {
exec.start();
......@@ -45,8 +45,6 @@ public class ClusterUtil {
exec.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
......
......@@ -74,7 +74,7 @@ public class BatchReduceTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<Double>> dataStream0 = env.addSource(new MySource(),1)
.batchReduce(new MyBatchReduce(), BATCH_SIZE, PARALELISM).addSink(new MySink());
......
......@@ -27,15 +27,15 @@ public class BatchTest {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private static final int SINK_PARALELISM = 3;
private static final int SINK_PARALELISM = 5;
private static int count = 0;
private static boolean partitionCorrect = true;
private static final class MySource extends SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
private Tuple1<String> outTuple = new Tuple1<String>();
@Override
public void invoke(Collector<Tuple1<String>> collector) throws Exception {
for (int i = 0; i < 20; i++) {
......@@ -44,7 +44,7 @@ public class BatchTest {
}
}
}
private static final class MyMap extends FlatMapFunction<Tuple1<String>, Tuple1<String>> {
private static final long serialVersionUID = 1L;
......@@ -55,7 +55,6 @@ public class BatchTest {
}
private static final class MySink extends SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
......@@ -63,29 +62,25 @@ public class BatchTest {
count++;
}
}
private static final class MyPartitionSink extends SinkFunction<Tuple1<String>> {
int hash = -1000;
private static final long serialVersionUID = 1L;
int hash=-1000;
@Override
public void invoke(Tuple1<String> tuple) {
if (hash == -1000)
hash = tuple.f0.hashCode() % SINK_PARALELISM;
else {
if (hash != tuple.f0.hashCode() % SINK_PARALELISM)
partitionCorrect = false;
if(hash==-1000) hash=tuple.f0.hashCode() % SINK_PARALELISM;
else{
if(hash!=tuple.f0.hashCode() % SINK_PARALELISM) partitionCorrect=false;
}
}
}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
//batchTest
DataStream<Tuple1<String>> dataStream1 = env
.addSource(new MySource(), SOURCE_PARALELISM)
.flatMap(new MyMap(), PARALLELISM).batch(4)
......
......@@ -151,7 +151,7 @@ public class FlatMapTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
// flatmapTest
......
......@@ -266,7 +266,7 @@ public class MapTest {
public void mapTest() throws Exception {
//mapTest
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
fillFromCollectionSet();
......
......@@ -43,7 +43,7 @@ public class PrintTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
env.fromElements(2, 3, 4).print();
env.generateSequence(1, 10).print();
Set<Integer> a = new HashSet<Integer>();
......@@ -53,5 +53,6 @@ public class PrintTest {
env.execute();
}
}
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import java.util.ArrayList;
import java.util.List;
......@@ -24,7 +24,7 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
......
......@@ -47,6 +47,7 @@ public class TypeExtractTest {
Myclass f = new Myclass();
System.out.println(f.getClass().getGenericSuperclass());
TypeInformation<?> ts = TypeExtractor.createTypeInfo(MySuperlass.class, f.getClass(), 0,
null, null);
......
......@@ -16,16 +16,16 @@ package eu.stratosphere.streaming.api.streamcomponent;
import java.util.ArrayList;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class MockRecordWriter extends RecordWriter<StreamRecord> {
public ArrayList<StreamRecord> emittedRecords;
public MockRecordWriter(DataSourceTask<?> inputBase, Class<StreamRecord> outputClass) {
super(inputBase);
public MockRecordWriter(AbstractInputTask<?> inputBase, Class<StreamRecord> outputClass) {
super(inputBase, outputClass);
}
public boolean initList() {
......
......@@ -91,7 +91,7 @@ public class StreamComponentTest {
@BeforeClass
public static void runStream() {
StreamExecutionEnvironment context = StreamExecutionEnvironment.createLocalEnvironment();
StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> oneTask = context.addSource(new MySource(), SOURCE_PARALELISM)
.map(new MyTask(), PARALELISM).addSink(new MySink());
......
......@@ -21,7 +21,7 @@ import java.util.List;
import org.junit.Before;
import org.junit.Test;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class FaultToleranceUtilTest {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册