提交 83e6a2b5 编写于 作者: M Maximilian Michels

Revert "[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ"

This reverts commit 6b01a890. The
introduced locks are not necessary. The checkpointing test case only
needs to be adapted to the checkpointing runtime behavior.
上级 5e2fb3cb
......@@ -196,9 +196,7 @@ public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OU
continue;
}
}
synchronized (sessionIdsPerSnapshot) {
sessionIds.add(deliveryTag);
}
sessionIds.add(deliveryTag);
}
ctx.collect(result);
......
......@@ -23,7 +23,6 @@ import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
......@@ -32,7 +31,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -105,83 +103,6 @@ public class RMQSourceTest {
sourceThread.join();
}
/**
* Make sure concurrent access to snapshotState() and notifyCheckpointComplete() don't cause
* an issue.
*
* Without proper synchronization, the test will fail with a concurrent modification exception
*
*/
@Test
public void testConcurrentAccess() throws Exception {
source.autoAck = false;
sourceThread.start();
final Tuple1<Throwable> error = new Tuple1<>(null);
Thread.sleep(5);
Thread snapshotThread = new Thread(new Runnable() {
public long id = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
try {
source.snapshotState(id++, 0);
} catch (Exception e) {
error.f0 = e;
break; // stop thread
}
}
}
});
Thread notifyThread = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.interrupted()) {
try {
// always remove all checkpoints
source.notifyCheckpointComplete(Long.MAX_VALUE);
} catch (Exception e) {
error.f0 = e;
break; // stop thread
}
}
}
});
snapshotThread.start();
notifyThread.start();
long deadline = System.currentTimeMillis() + 1000L;
while(System.currentTimeMillis() < deadline) {
if(!snapshotThread.isAlive()) {
notifyThread.interrupt();
break;
}
if(!notifyThread.isAlive()) {
snapshotThread.interrupt();
break;
}
Thread.sleep(10);
}
if(snapshotThread.isAlive()) {
snapshotThread.interrupt();
snapshotThread.join();
}
if(notifyThread.isAlive()) {
notifyThread.interrupt();
notifyThread.join();
}
if(error.f0 != null) {
error.f0.printStackTrace();
Assert.fail("Test failed with " + error.f0.getClass().getCanonicalName());
}
}
@Test
public void testCheckpointing() throws Exception {
source.autoAck = false;
......
......@@ -76,7 +76,6 @@ import org.slf4j.LoggerFactory;
* @param <Type> The type of the messages created by the source.
* @param <UId> The type of unique IDs which may be used to acknowledge elements.
*/
@SuppressWarnings("SynchronizeOnNonFinalField")
public abstract class MessageAcknowledgingSourceBase<Type, UId>
extends RichSourceFunction<Type>
implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
......@@ -167,45 +166,41 @@ public abstract class MessageAcknowledgingSourceBase<Type, UId>
LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, timestamp: {}",
idsForCurrentCheckpoint, checkpointId, checkpointTimestamp);
synchronized (pendingCheckpoints) {
pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
pendingCheckpoints.addLast(new Tuple2<>(checkpointId, idsForCurrentCheckpoint));
idsForCurrentCheckpoint = new ArrayList<>(64);
idsForCurrentCheckpoint = new ArrayList<>(64);
return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
}
return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
}
@Override
public void restoreState(SerializedCheckpointData[] state) throws Exception {
synchronized (pendingCheckpoints) {
pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
// build a set which contains all processed ids. It may be used to check if we have
// already processed an incoming message.
for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
}
pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
// build a set which contains all processed ids. It may be used to check if we have
// already processed an incoming message.
for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Committing Messages externally for checkpoint {}", checkpointId);
synchronized (pendingCheckpoints) {
for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext(); ) {
Tuple2<Long, List<UId>> checkpoint = iter.next();
long id = checkpoint.f0;
if (id <= checkpointId) {
LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
acknowledgeIDs(checkpointId, checkpoint.f1);
// remove deduplication data
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
// remove checkpoint data
iter.remove();
} else {
break;
}
for (Iterator<Tuple2<Long, List<UId>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
Tuple2<Long, List<UId>> checkpoint = iter.next();
long id = checkpoint.f0;
if (id <= checkpointId) {
LOG.trace("Committing Messages with following IDs {}", checkpoint.f1);
acknowledgeIDs(checkpointId, checkpoint.f1);
// remove deduplication data
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
// remove checkpoint data
iter.remove();
}
else {
break;
}
}
}
......
......@@ -107,16 +107,14 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
*/
protected final void acknowledgeIDs(long checkpointId, List<UId> uniqueIds) {
LOG.debug("Acknowledging ids for checkpoint {}", checkpointId);
synchronized (sessionIdsPerSnapshot) {
Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
while (iterator.hasNext()) {
final Tuple2<Long, List<SessionId>> next = iterator.next();
long id = next.f0;
if (id <= checkpointId) {
acknowledgeSessionIDs(next.f1);
// remove ids for this session
iterator.remove();
}
Iterator<Tuple2<Long, List<SessionId>>> iterator = sessionIdsPerSnapshot.iterator();
while (iterator.hasNext()) {
final Tuple2<Long, List<SessionId>> next = iterator.next();
long id = next.f0;
if (id <= checkpointId) {
acknowledgeSessionIDs(next.f1);
// remove ids for this session
iterator.remove();
}
}
}
......@@ -134,10 +132,8 @@ public abstract class MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
@Override
public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
synchronized (sessionIdsPerSnapshot) {
sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
sessionIds = new ArrayList<>(64);
}
sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, sessionIds));
sessionIds = new ArrayList<>(64);
return super.snapshotState(checkpointId, checkpointTimestamp);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册