提交 6019f085 编写于 作者: T tedyu 提交者: Stephan Ewen

[FLINK-2826] [runtime] Fix race condition and locking in...

[FLINK-2826] [runtime] Fix race condition and locking in BroadcastVariableMaterialization#decrementReferenceInternal

This closes #1339
上级 976bacc6
...@@ -137,7 +137,7 @@ public class BroadcastVariableMaterialization<T, C> { ...@@ -137,7 +137,7 @@ public class BroadcastVariableMaterialization<T, C> {
while ((element = readerIterator.next(element)) != null); while ((element = readerIterator.next(element)) != null);
synchronized (materializationMonitor) { synchronized (materializationMonitor) {
while (!this.materialized) { while (!this.materialized && !disposed) {
materializationMonitor.wait(); materializationMonitor.wait();
} }
} }
...@@ -209,7 +209,7 @@ public class BroadcastVariableMaterialization<T, C> { ...@@ -209,7 +209,7 @@ public class BroadcastVariableMaterialization<T, C> {
throw new IllegalStateException("The Broadcast Variable has been disposed"); throw new IllegalStateException("The Broadcast Variable has been disposed");
} }
synchronized (this) { synchronized (references) {
if (transformed != null) { if (transformed != null) {
if (transformed instanceof List) { if (transformed instanceof List) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -233,7 +233,7 @@ public class BroadcastVariableMaterialization<T, C> { ...@@ -233,7 +233,7 @@ public class BroadcastVariableMaterialization<T, C> {
throw new IllegalStateException("The Broadcast Variable has been disposed"); throw new IllegalStateException("The Broadcast Variable has been disposed");
} }
synchronized (this) { synchronized (references) {
if (transformed == null) { if (transformed == null) {
transformed = initializer.initializeBroadcastVariable(data); transformed = initializer.initializeBroadcastVariable(data);
data = null; data = null;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册