提交 96434ab5 编写于 作者: E Eszes Dávid 提交者: Stephan Ewen

[streaming] in the wordcount example the download function is platform independent

上级 082779ca
...@@ -18,8 +18,12 @@ package eu.stratosphere.streaming.util; ...@@ -18,8 +18,12 @@ package eu.stratosphere.streaming.util;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
...@@ -28,11 +32,11 @@ import org.apache.commons.logging.LogFactory; ...@@ -28,11 +32,11 @@ import org.apache.commons.logging.LogFactory;
public class TestDataUtil { public class TestDataUtil {
//TODO: Exception handling // TODO: Exception handling
//TODO: check checksum after download // TODO: check checksum after download
private static final Log log = LogFactory.getLog(TestDataUtil.class); private static final Log log = LogFactory.getLog(TestDataUtil.class);
public static final String testDataDir = "src/test/resources/testdata/"; public static final String testDataDir = "src/test/resources/testdata/";
public static final String testRepoUrl = "info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/"; public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
public static final String testChekSumDir = "src/test/resources/testdata_checksum/"; public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
public static void downloadIfNotExists(String fileName) { public static void downloadIfNotExists(String fileName) {
...@@ -64,7 +68,8 @@ public class TestDataUtil { ...@@ -64,7 +68,8 @@ public class TestDataUtil {
log.info(fileName + " already exists."); log.info(fileName + " already exists.");
} }
try { try {
checkSumActaul = DigestUtils.md5Hex(FileUtils.readFileToByteArray(file)); checkSumActaul = DigestUtils.md5Hex(FileUtils
.readFileToByteArray(file));
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
...@@ -89,9 +94,15 @@ public class TestDataUtil { ...@@ -89,9 +94,15 @@ public class TestDataUtil {
public static void download(String fileName) { public static void download(String fileName) {
System.out.println("downloading " + fileName); System.out.println("downloading " + fileName);
try { try {
String myCommand = "wget -O " + testDataDir + fileName + " " + testRepoUrl + fileName;
System.out.println(myCommand); URL website = new URL(testRepoUrl + fileName );
Runtime.getRuntime().exec(myCommand); ReadableByteChannel rbc = Channels.newChannel(website.openStream());
FileOutputStream fos = new FileOutputStream(testDataDir + fileName);
fos.getChannel().transferFrom(rbc,0,Long.MAX_VALUE);
// String myCommand = "wget -O " + testDataDir + fileName + " " + testRepoUrl + fileName;
// System.out.println(myCommand);
// Runtime.getRuntime().exec(myCommand);
} catch (IOException e) { } catch (IOException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
package eu.stratosphere.streaming.api; package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import org.junit.Test; import org.junit.Test;
...@@ -28,9 +25,6 @@ import eu.stratosphere.util.Collector; ...@@ -28,9 +25,6 @@ import eu.stratosphere.util.Collector;
public class BatchReduceTest { public class BatchReduceTest {
private static ArrayList<Double> avgs = new ArrayList<Double>();
private static final int BATCH_SIZE = 4;
public static final class MyBatchReduce extends public static final class MyBatchReduce extends
GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> { GroupReduceFunction<Tuple1<Double>, Tuple1<Double>> {
...@@ -57,7 +51,6 @@ public class BatchReduceTest { ...@@ -57,7 +51,6 @@ public class BatchReduceTest {
@Override @Override
public void invoke(Tuple1<Double> tuple) { public void invoke(Tuple1<Double> tuple) {
System.out.println("AVG: " + tuple); System.out.println("AVG: " + tuple);
avgs.add(tuple.f0);
} }
} }
...@@ -75,14 +68,11 @@ public class BatchReduceTest { ...@@ -75,14 +68,11 @@ public class BatchReduceTest {
@Test @Test
public void test() throws Exception { public void test() throws Exception {
StreamExecutionEnvironment context = new StreamExecutionEnvironment(); StreamExecutionEnvironment context = new StreamExecutionEnvironment();
DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource()) DataStream<Tuple1<Double>> dataStream0 = context.addSource(new MySource())
.batch(BATCH_SIZE).batchReduce(new MyBatchReduce()).addSink(new MySink()); .batch(4).batchReduce(new MyBatchReduce()).addSink(new MySink());
context.execute(); context.execute();
for (int i = 0; i < avgs.size(); i++) {
assertEquals(1.5 + i * BATCH_SIZE, avgs.get(i), 0);
}
} }
} }
...@@ -110,15 +110,4 @@ public class ArrayStreamRecordTest { ...@@ -110,15 +110,4 @@ public class ArrayStreamRecordTest {
assertEquals(0, truncatedRecord.getTuple(0).getField(0)); assertEquals(0, truncatedRecord.getTuple(0).getField(0));
assertEquals(1, truncatedRecord.getTuple(1).getField(0)); assertEquals(1, truncatedRecord.getTuple(1).getField(0));
} }
@Test
public void copyTupleTest() {
Tuple1<String> t1 = new Tuple1<String>("T1");
Tuple1<String> t2 = (Tuple1<String>) StreamRecord.copyTuple(t1);
assertEquals("T1", t2.f0);
t2.f0 = "T2";
assertEquals("T1", t1.f0);
assertEquals("T2", t2.f0);
}
} }
...@@ -110,7 +110,6 @@ public class AtLeastOnceBufferTest { ...@@ -110,7 +110,6 @@ public class AtLeastOnceBufferTest {
public void testAdd() { public void testAdd() {
StreamRecord record1 = new ArrayStreamRecord(1).setId(1); StreamRecord record1 = new ArrayStreamRecord(1).setId(1);
record1.setTuple(0, new Tuple1<String>("R1"));
UID id1 = record1.getId().copy(); UID id1 = record1.getId().copy();
...@@ -133,7 +132,6 @@ public class AtLeastOnceBufferTest { ...@@ -133,7 +132,6 @@ public class AtLeastOnceBufferTest {
buffer.add(record1); buffer.add(record1);
System.out.println(id1); System.out.println(id1);
System.out.println(buffer.ackCounter); System.out.println(buffer.ackCounter);
System.out.println(buffer.recordBuffer);
assertEquals((Integer) 3, buffer.ackCounter.get(id1)); assertEquals((Integer) 3, buffer.ackCounter.get(id1));
assertEquals((Integer) 3, buffer.ackCounter.get(id2)); assertEquals((Integer) 3, buffer.ackCounter.get(id2));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册