提交 556c6d22 编写于 作者: Z Zhijiang 提交者: zhijiang

[FLINK-16257][network] Remove useless ResultPartitionID from AddCredit message

The ResultPartitionID in AddCredit message is never used on upstream side, so we can remove it to cleanup the codes.
There would have another two benefits to do so:

1. Reduce the total message size from previous 52 bytes to 20 bytes.
2. Decouple the dependency with InputChannel#getPartitionId.
上级 6deca76e
......@@ -353,7 +353,6 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
//It is no need to notify credit for the released channel.
if (!inputChannel.isReleased()) {
AddCredit msg = new AddCredit(
inputChannel.getPartitionId(),
inputChannel.getAndResetUnannouncedCredit(),
inputChannel.getInputChannelId());
......
......@@ -647,16 +647,13 @@ public abstract class NettyMessage {
private static final byte ID = 6;
final ResultPartitionID partitionId;
final int credit;
final InputChannelID receiverId;
AddCredit(ResultPartitionID partitionId, int credit, InputChannelID receiverId) {
AddCredit(int credit, InputChannelID receiverId) {
checkArgument(credit > 0, "The announced credit should be greater than 0");
this.partitionId = partitionId;
this.credit = credit;
this.receiverId = receiverId;
}
......@@ -666,10 +663,7 @@ public abstract class NettyMessage {
ByteBuf result = null;
try {
result = allocateBuffer(allocator, ID, 16 + 16 + 4 + 16);
partitionId.getPartitionId().writeTo(result);
partitionId.getProducerId().writeTo(result);
result = allocateBuffer(allocator, ID, 4 + 16);
result.writeInt(credit);
receiverId.writeTo(result);
......@@ -685,14 +679,10 @@ public abstract class NettyMessage {
}
static AddCredit readFrom(ByteBuf buffer) {
ResultPartitionID partitionId =
new ResultPartitionID(
IntermediateResultPartitionID.fromByteBuf(buffer),
ExecutionAttemptID.fromByteBuf(buffer));
int credit = buffer.readInt();
InputChannelID receiverId = InputChannelID.fromByteBuf(buffer);
return new AddCredit(partitionId, credit, receiverId);
return new AddCredit(credit, receiverId);
}
@Override
......
......@@ -162,10 +162,9 @@ public class NettyMessageSerializationTest {
}
{
NettyMessage.AddCredit expected = new NettyMessage.AddCredit(new ResultPartitionID(), random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID());
NettyMessage.AddCredit expected = new NettyMessage.AddCredit(random.nextInt(Integer.MAX_VALUE) + 1, new InputChannelID());
NettyMessage.AddCredit actual = encodeAndDecode(expected);
assertEquals(expected.partitionId, actual.partitionId);
assertEquals(expected.credit, actual.credit);
assertEquals(expected.receiverId, actual.receiverId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册