提交 cad38ede 编写于 作者: J jfeher 提交者: Stephan Ewen

[streaming] connectors logging and deploy update

上级 0b721125
......@@ -107,6 +107,22 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
......
......@@ -18,11 +18,14 @@
package org.apache.flink.streaming.connectors.flume;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class FlumeTopology {
private static final Log LOG = LogFactory.getLog(FlumeTopology.class);
public static class MyFlumeSink extends FlumeSink<String> {
private static final long serialVersionUID = 1L;
......@@ -45,6 +48,17 @@ public class FlumeTopology {
}
public static final class MyFlumePrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(String value) {
LOG.info("String: <" + value + "> arrived from Flume");
}
}
public static class MyFlumeSource extends FlumeSource<String> {
private static final long serialVersionUID = 1L;
......@@ -70,7 +84,7 @@ public class FlumeTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env
.addSource(new MyFlumeSource("localhost", 41414))
.print();
.addSink(new MyFlumePrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env
......
......@@ -18,13 +18,17 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class KafkaTopology {
private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
public static final class MySource implements SourceFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
......@@ -72,6 +76,15 @@ public class KafkaTopology {
}
}
public static final class MyKafkaPrintSink implements SinkFunction<Tuple1<String>> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(Tuple1<String> value) {
LOG.info("String: " + value + " arrived from Kafka");
}
}
private static final int SOURCE_PARALELISM = 1;
......@@ -82,7 +95,7 @@ public class KafkaTopology {
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream1 = env
.addSource(new MyKafkaSource("localhost:2181", "group", "test", 1), SOURCE_PARALELISM)
.print();
.addSink(new MyKafkaPrintSink());
@SuppressWarnings("unused")
DataStream<Tuple1<String>> stream2 = env
......
......@@ -17,12 +17,17 @@
package org.apache.flink.streaming.connectors.rabbitmq;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.lang.SerializationUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaTopology;
public class RMQTopology {
private static final Log LOG = LogFactory.getLog(KafkaTopology.class);
public static final class MyRMQSink extends RMQSink<String> {
public MyRMQSink(String HOST_NAME, String QUEUE_NAME) {
super(HOST_NAME, QUEUE_NAME);
......@@ -39,7 +44,18 @@ public class RMQTopology {
}
}
public static final class MyRMQPrintSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public void invoke(String value) {
LOG.info("String: <" + value + "> arrived from RMQ");
}
}
public static final class MyRMQSource extends RMQSource<String> {
public MyRMQSource(String HOST_NAME, String QUEUE_NAME) {
......@@ -66,7 +82,7 @@ public class RMQTopology {
@SuppressWarnings("unused")
DataStream<String> dataStream1 = env
.addSource(new MyRMQSource("localhost", "hello"))
.print();
.addSink(new MyRMQPrintSink());
@SuppressWarnings("unused")
DataStream<String> dataStream2 = env
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册