提交 de706dde 编写于 作者: D Dawid Wysakowicz 提交者: Dawid Wysakowicz

[FLINK-17307] Add collector to deserialize in RMQ

上级 0b699b4d
......@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourc
import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import com.rabbitmq.client.Channel;
......@@ -209,17 +210,11 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollector collector = new RMQCollector(ctx);
while (running) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
synchronized (ctx.getCheckpointLock()) {
OUT result = schema.deserialize(delivery.getBody());
if (schema.isEndOfStream(result)) {
break;
}
if (!autoAck) {
final long deliveryTag = delivery.getEnvelope().getDeliveryTag();
if (usesCorrelationId) {
......@@ -235,11 +230,44 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
sessionIds.add(deliveryTag);
}
ctx.collect(result);
schema.deserialize(delivery.getBody(), collector);
if (collector.isEndOfStreamSignalled()) {
this.running = false;
return;
}
}
}
}
private class RMQCollector implements Collector<OUT> {
private final SourceContext<OUT> ctx;
private boolean endOfStreamSignalled = false;
private RMQCollector(SourceContext<OUT> ctx) {
this.ctx = ctx;
}
@Override
public void collect(OUT record) {
if (endOfStreamSignalled || schema.isEndOfStream(record)) {
this.endOfStreamSignalled = true;
return;
}
ctx.collect(record);
}
public boolean isEndOfStreamSignalled() {
return endOfStreamSignalled;
}
@Override
public void close() {
}
}
@Override
public void cancel() {
running = false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册