未验证 提交 4e358ef9 编写于 作者: S Sijie Guo 提交者: GitHub

[PROTOBUF] Fix protobuf generation on handling repeated long number … (#7540)

*Motivation*

The code generation for `repeated long` is not handled properly. (I am not sure how changes were made to PulsarApi.proto)

*Modification*

This pull request adds the code to handle generating code for `repeated long`.

*Test*

Add unit test to ensure `repeated long` is processed. Add test cases to cover both packed and non-package serialization for `repeated long`.

See more details about packed serialization: https://developers.google.com/protocol-buffers/docs/encoding#optional
上级 b6ceec41
......@@ -1268,6 +1268,7 @@ flexible messaging model and an intuitive client API.</description>
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/*.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude>
<exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionKey.java</exclude>
......@@ -1343,6 +1344,7 @@ flexible messaging model and an intuitive client API.</description>
and are included in source tree for convenience -->
<exclude>src/main/java/org/apache/bookkeeper/mledger/proto/MLDataFormats.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude>
<exclude>src/test/java/org/apache/pulsar/common/api/proto/TestApi.java</exclude>
<exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarMarkers.java</exclude>
<exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude>
<exclude>bin/proto/MLDataFormats_pb2.py</exclude>
......
......@@ -1575,6 +1575,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
case 42: {
int length = input.readRawVarint32();
int limit = input.pushLimit(length);
while (input.getBytesUntilLimit() > 0) {
addAckSet(input.readInt64());
}
input.popLimit(limit);
break;
}
}
}
}
......@@ -18857,6 +18866,15 @@ public final class PulsarApi {
ackSet_.add(input.readInt64());
break;
}
case 34: {
int length = input.readRawVarint32();
int limit = input.pushLimit(length);
while (input.getBytesUntilLimit() > 0) {
addAckSet(input.readInt64());
}
input.popLimit(limit);
break;
}
}
}
}
......@@ -346,4 +346,34 @@ public class ByteBufCodedInputStream {
buf.readerIndex(buf.readerIndex() + size);
}
public int pushLimit(int byteLimit) throws InvalidProtocolBufferException {
if (byteLimit < 0) {
throw new InvalidProtocolBufferException("CodedInputStream encountered an embedded string or message"
+ " which claimed to have negative size.");
}
byteLimit += buf.readerIndex();
final int oldLimit = buf.writerIndex();
if (byteLimit > oldLimit) {
throw new InvalidProtocolBufferException("While parsing a protocol message, the input ended unexpectedly"
+ " in the middle of a field. This could mean either than the input has been truncated or that an"
+ " embedded message misreported its own length.");
}
buf.writerIndex(byteLimit);
return oldLimit;
}
/**
* Discards the current limit, returning to the previous limit.
*
* @param oldLimit The old limit, as returned by {@code pushLimit}.
*/
public void popLimit(final int oldLimit) {
buf.writerIndex(oldLimit);
}
public int getBytesUntilLimit() {
return buf.readableBytes();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.protocol;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.annotations.Test;
public class RepeatedLongNonPackedTest {
@Test
public void testRepeatedLongPacked() throws Exception {
MessageIdData messageIdData = MessageIdData.newBuilder()
.setLedgerId(0L)
.setEntryId(0L)
.setPartition(0)
.setBatchIndex(0)
.addAckSet(1000)
.addAckSet(1001)
.addAckSet(1003)
.build();
int cmdSize = messageIdData.getSerializedSize();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
messageIdData.writeTo(outputStream);
messageIdData.recycle();
outputStream.recycle();
ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
MessageIdData newMessageIdData = MessageIdData.newBuilder()
.mergeFrom(inputStream, null)
.build();
inputStream.recycle();
assertEquals(3, newMessageIdData.getAckSetCount());
assertEquals(1000, newMessageIdData.getAckSet(0));
assertEquals(1001, newMessageIdData.getAckSet(1));
assertEquals(1003, newMessageIdData.getAckSet(2));
newMessageIdData.recycle();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.protocol;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.TestApi.MessageIdData;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.testng.annotations.Test;
public class RepeatedLongPackedTest {
@Test
public void testRepeatedLongPacked() throws Exception {
MessageIdData messageIdData = MessageIdData.newBuilder()
.setLedgerId(0L)
.setEntryId(0L)
.setPartition(0)
.setBatchIndex(0)
.addAckSet(1000)
.addAckSet(1001)
.addAckSet(1003)
.build();
int cmdSize = messageIdData.getSerializedSize();
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(cmdSize);
ByteBufCodedOutputStream outputStream = ByteBufCodedOutputStream.get(buf);
messageIdData.writeTo(outputStream);
messageIdData.recycle();
outputStream.recycle();
ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(buf);
MessageIdData newMessageIdData = MessageIdData.newBuilder()
.mergeFrom(inputStream, null)
.build();
inputStream.recycle();
assertEquals(3, newMessageIdData.getAckSetCount());
assertEquals(1000, newMessageIdData.getAckSet(0));
assertEquals(1001, newMessageIdData.getAckSet(1));
assertEquals(1003, newMessageIdData.getAckSet(2));
newMessageIdData.recycle();
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto2";
package pulsar.proto;
option java_package = "org.apache.pulsar.common.api.proto";
option optimize_for = LITE_RUNTIME;
message MessageIdData {
required uint64 ledgerId = 1;
required uint64 entryId = 2;
optional int32 partition = 3 [default = -1];
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5 [packed = true];
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册