diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java index 0697d7456123033b05f012c93ad0f0252c3fb3ea..1c2ca5d245bdd843a164eaf86b076841ad9c5684 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java @@ -18,19 +18,17 @@ */ package org.apache.pulsar.functions.instance.state; +import static java.nio.charset.StandardCharsets.UTF_8; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import org.apache.bookkeeper.api.kv.Table; -import org.apache.bookkeeper.api.kv.options.DeleteOption; -import org.apache.bookkeeper.api.kv.options.Option; import org.apache.bookkeeper.api.kv.options.Options; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * This class accumulates the state updates from one function. * @@ -55,6 +53,10 @@ public class StateContextImpl implements StateContext { @Override public CompletableFuture put(String key, ByteBuffer value) { if(value != null) { + // Set position to off the buffer to the beginning. + // If a user used an operation like ByteBuffer.allocate(4).putInt(count) to create a ByteBuffer to store to the state store + // the position of the buffer will be at the end and nothing will be written to table service + value.position(0); return table.put( Unpooled.wrappedBuffer(key.getBytes(UTF_8)), Unpooled.wrappedBuffer(value)); @@ -81,6 +83,10 @@ public class StateContextImpl implements StateContext { if (data != null) { ByteBuffer result = ByteBuffer.allocate(data.readableBytes()); data.readBytes(result); + // Set position to off the buffer to the beginning, since the position after the read is going to be end of the buffer + // If we do not rewind to the begining here, users will have to explicitly do this in their function code + // in order to use any of the ByteBuffer operations + result.position(0); return result; } return null;