提交 5030bec4 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] TupleTest removed

上级 63346f7e
......@@ -19,12 +19,36 @@ This tutorial shows how to build Stratosphere Streaming on your own system. Plea
```
git clone https://github.com/stratosphere/stratosphere-streaming.git
cd stratosphere
mvn clean package
mvn clean assembly:assembly
```
### Run an example
To run an example counting the frequencies of unique words in the text of Shakespeare's [Hamlet](http://www.gutenberg.org/cache/epub/1787/pg1787.txt)
```
git clone https://github.com/stratosphere/stratosphere-streaming.git
cd stratosphere
mvn clean package
java -cp target/stratosphere-streaming-0.5-SNAPSHOT-jar-with-dependencies.jar eu.stratosphere.streaming.examples.wordcount.WordCountLocal
```
## Support
Don’t hesitate to ask!
[Open an issue](https://github.com/stratosphere/stratosphere-streaming/issues/new) on Github, if you found a bug or need any help.
The main project also has a [mailing list](https://groups.google.com/d/forum/stratosphere-dev) for both users and developers.
## Fork and Contribute
This is an active open-source project. We are always open to people who want to use the system or contribute to it.
Contact us if you are looking for implementation tasks that fit your skills.
The main project a list of [starter jobs](https://github.com/stratosphere/stratosphere/wiki/Starter-Jobs) in our wiki.
We use the GitHub Pull Request system for the development of Stratosphere. Just open a request if you want to contribute.
### What to contribute
* Bug reports
* Bug fixes
* Documentation
* Tools that ease the use and development of Stratosphere
* Well-written Stratosphere jobs
Let us know if you have created a system that uses Stratosphere, so that we can link to you.
package eu.stratosphere.streaming.api.streamrecord;
import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeInformation;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.types.StringValue;
public class TupleTest {
public Tuple readTuple(DataInput in) throws IOException {
StringValue typeVal = new StringValue();
typeVal.read(in);
// TODO: use Tokenizer
String[] types = typeVal.getValue().split(",");
Class[] basicTypes = new Class[types.length];
for (int i = 0; i < types.length; i++) {
try {
basicTypes[i] = Class.forName(types[i]);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
DeserializationDelegate<Tuple> dd = new DeserializationDelegate<Tuple>(
tupleSerializer);
dd.setInstance(tupleSerializer.createInstance());
dd.read(in);
return dd.getInstance();
}
private void writeTuple(Tuple tuple, DataOutput out) {
Class[] basicTypes = new Class[tuple.getArity()];
StringBuilder basicTypeNames = new StringBuilder();
for (int i = 0; i < basicTypes.length; i++) {
basicTypes[i] = tuple.getField(i).getClass();
basicTypeNames.append(basicTypes[i].getName() + ",");
}
TypeInformation<? extends Tuple> typeInfo = TupleTypeInfo
.getBasicTupleTypeInfo(basicTypes);
StringValue typeVal = new StringValue(basicTypeNames.toString());
@SuppressWarnings("unchecked")
TupleSerializer<Tuple> tupleSerializer = (TupleSerializer<Tuple>) typeInfo
.createSerializer();
SerializationDelegate<Tuple> serializationDelegate = new SerializationDelegate<Tuple>(
tupleSerializer);
serializationDelegate.setInstance(tuple);
try {
typeVal.write(out);
serializationDelegate.write(out);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void test() {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
int num = 42;
String str = "above clouds";
Tuple2<Integer, String> tuple = new Tuple2<Integer, String>(num, str);
try {
writeTuple(tuple, out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(
buff.toByteArray()));
Tuple2<Integer, String> tupleOut = (Tuple2<Integer, String>) readTuple(
in);
assertEquals(tupleOut.getField(0), 42);
} catch (IOException e) {
fail();
e.printStackTrace();
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册