提交 63751058 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] RMQSink and Test

上级 04dbc25f
......@@ -28,16 +28,25 @@ public class WordCountPerformanceSplitter extends FlatMapFunction<Tuple1<String>
PerformanceCounter pCounter = new
PerformanceCounter("SplitterEmitCounter", 1000, 1000, 30000,
"/home/mbalassi/strato-perf.csv");
"/home/judit/strato/perf/broadcast4.csv");
@Override
public void flatMap(Tuple1<String> inTuple, Collector<Tuple1<String>> out) throws Exception {
for (String word : inTuple.f0.split(" ")) {
outTuple.f0 = word;
// pTimer.startTimer();
out.collect(outTuple);
// pTimer.stopTimer();
pCounter.count();
}
}
// @Override
// public String getResult() {
// pCounter.writeCSV();
// pTimer.writeCSV();
// return "";
// }
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.rabbitmq;
import java.io.IOException;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.SinkFunction;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
/**
* Source for sending messages to a RabbitMQ queue. The source currently only
* support string messages. Other types will be added soon.
*
*/
public class RMQSink extends SinkFunction<Tuple1<String>>{
private static final long serialVersionUID = 1L;
private String QUEUE_NAME;
private String HOST_NAME;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
public RMQSink(String HOST_NAME, String QUEUE_NAME) {
this.HOST_NAME = HOST_NAME;
this.QUEUE_NAME = QUEUE_NAME;
}
public void initializeConnection(){
factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
try {
connection = factory.newConnection();
channel = connection.createChannel();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke(Tuple1<String> tuple) {
initializeConnection();
try {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = tuple.f0;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
channel.close();
connection.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
......@@ -46,6 +46,7 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("s1: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -56,6 +57,7 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 5; i < 10; i++) {
System.out.println("s2: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -66,6 +68,7 @@ public class MapTest {
@Override
public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
for (int i = 10; i < 15; i++) {
System.out.println("s3: "+i);
collector.collect(new Tuple1<Integer>(i));
}
}
......@@ -85,6 +88,7 @@ public class MapTest {
@Override
public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
joinSetResult.add(value.f0);
System.out.println(value.f0);
return new Tuple1<Integer>(value.f0);
}
}
......@@ -167,6 +171,7 @@ public class MapTest {
@Override
public void invoke(Tuple1<Integer> tuple) {
System.out.println("doing nothing");
}
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import static org.junit.Assert.*;
import java.util.HashSet;
import java.util.Set;
import org.junit.Test;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.rabbitmq.RMQSink;
import eu.stratosphere.streaming.rabbitmq.RMQSource;
import eu.stratosphere.util.Collector;
public class RMQTest {
public static final class MySink extends SinkFunction<Tuple1<String>> {
@Override
public void invoke(Tuple1<String> tuple) {
result.add(tuple.f0);
}
}
private static Set<String> expected = new HashSet<String>();
private static Set<String> result = new HashSet<String>();
private static void fillExpected() {
expected.add("one");
expected.add("two");
expected.add("three");
expected.add("four");
expected.add("five");
}
@Test
public void RMQTest1() throws Exception {
StreamExecutionEnvironment env = new StreamExecutionEnvironment();
DataStream<Tuple1<String>> dataStream1 = env
.addSource(new RMQSource("localhost", "hello"), 1)
.addSink(new MySink());
DataStream<Tuple1<String>> dataStream2 = env
.fromElements("one", "two", "three", "four", "five", "q")
.addSink(new RMQSink("localhost", "hello"));
env.execute();
fillExpected();
assertEquals(expected, result);
}
}
......@@ -29,7 +29,7 @@ import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollectorManagerTest {
public class StreamCollector2Test {
StreamCollectorManager<Tuple> collector;
......@@ -55,7 +55,9 @@ public class StreamCollectorManagerTest {
fOut.add(rw2);
collector = new StreamCollectorManager<Tuple>(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut);
Tuple1<Integer> t = new Tuple1<Integer>();
StreamCollector<Tuple> sc1 = new StreamCollector<Tuple>(1, batchTimeout, channelID, null);
t.f0 = 0;
collector.collect(t);
......
......@@ -16,24 +16,31 @@
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import org.junit.Test;
import org.mockito.Mockito;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamcomponent.MockRecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.MockRecordWriterFactory;
public class StreamCollectorTest {
@Test
public void testStreamCollector() {
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(10, 1000, 0, null);
StreamCollector collector = new StreamCollector(10, 1000, 0, null);
assertEquals(10, collector.batchSize);
}
@Test
public void testCollect() {
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, 1000, 0, null);
StreamCollector collector = new StreamCollector(2, 1000, 0, null);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......@@ -43,7 +50,8 @@ public class StreamCollectorTest {
@Test
public void testBatchSize() throws InterruptedException {
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(3, 100, 0, null);
System.out.println("---------------");
StreamCollector collector = new StreamCollector(3, 100, 0, null);
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
collector.collect(new Tuple1<Integer>(0));
......@@ -51,13 +59,14 @@ public class StreamCollectorTest {
Thread.sleep(200);
collector.collect(new Tuple1<Integer>(2));
collector.collect(new Tuple1<Integer>(3));
System.out.println("---------------");
}
@Test
public void recordWriter() {
MockRecordWriter recWriter = MockRecordWriterFactory.create();
StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, 1000, 0, null, recWriter);
StreamCollector collector = new StreamCollector(2, 1000, 0, null, recWriter);
collector.collect(new Tuple1<Integer>(3));
collector.collect(new Tuple1<Integer>(4));
collector.collect(new Tuple1<Integer>(5));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册