提交 1fdd7e60 编写于 作者: S Stephan Ewen

Fix buffer leak in TaskManager / test tasks

上级 ae57c7c0
......@@ -44,6 +44,7 @@ import org.apache.flink.runtime.io.network.gates.OutputGate;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
......@@ -596,6 +597,10 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
} catch (CancelTaskException e) {
releaseEnvelope(envelope);
throw e;
} catch (Throwable t) {
releaseEnvelope(envelope);
ExceptionUtils.rethrow(t, "Error while requesting receiver list.");
return null; // silence the compiler
}
}
......
......@@ -138,4 +138,15 @@ public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
}
}
}
public void clearBuffers() {
if (this.serializers != null) {
for (RecordSerializer<?> s: this.serializers) {
Buffer b = s.getCurrentBuffer();
if (b != null) {
b.recycleBuffer();
}
}
}
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.io.network.bufferprovider;
import java.util.Queue;
......
......@@ -877,10 +877,15 @@ public class JobManagerITCase {
@Override
public void invoke() throws Exception {
writer.initializeSerializers();
writer.emit(new IntegerRecord(42));
writer.emit(new IntegerRecord(1337));
writer.flush();
try {
writer.initializeSerializers();
writer.emit(new IntegerRecord(42));
writer.emit(new IntegerRecord(1337));
writer.flush();
}
finally {
writer.clearBuffers();
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册