提交 d8fd04a7 编写于 作者: N Nico Kruber 提交者: Stephan Ewen

[FLINK-5521] [runtime] remove unused KvStateRequestSerializer#serializeList

Also make sure that the serialization via the state backends' list states
matches the deserialization of the KvStateRequestSerializer#deserializeList
method.
So far, it was used this way but not made sure via tests.

This closes #3135
上级 ac815d78
......@@ -36,7 +36,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
......@@ -434,40 +433,6 @@ public final class KvStateRequestSerializer {
}
}
/**
* Serializes all values of the Iterable with the given serializer.
*
* @param values Values of type T to serialize
* @param serializer Serializer for T
* @param <T> Type of the values
* @return Serialized values or <code>null</code> if values <code>null</code> or empty
* @throws IOException On failure during serialization
*/
public static <T> byte[] serializeList(Iterable<T> values, TypeSerializer<T> serializer) throws IOException {
if (values != null) {
Iterator<T> it = values.iterator();
if (it.hasNext()) {
// Serialize
DataOutputSerializer dos = new DataOutputSerializer(32);
while (it.hasNext()) {
serializer.serialize(it.next(), dos);
// This byte added here in order to have the binary format
// prescribed by RocksDB.
dos.write(0);
}
return dos.getCopyOfBuffer();
} else {
return null;
}
} else {
return null;
}
}
/**
* Deserializes all values with the given serializer.
*
......
......@@ -21,11 +21,19 @@ package org.apache.flink.runtime.query.netty.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.junit.Test;
import java.util.ArrayList;
......@@ -34,6 +42,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public class KvStateRequestSerializerTest {
......@@ -228,17 +237,62 @@ public class KvStateRequestSerializerTest {
*/
@Test
public void testListSerialization() throws Exception {
final long key = 0l;
// objects for heap state list serialisation
final HeapKeyedStateBackend<Long> longHeapKeyedStateBackend =
new HeapKeyedStateBackend<>(
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
1, new KeyGroupRange(0, 0)
);
longHeapKeyedStateBackend.setCurrentKey(key);
final ListState<Long> listState = longHeapKeyedStateBackend
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
testListSerialization(key, listState);
}
/**
* Verifies that the serialization of a list using the given list state
* matches the deserialization with {@link KvStateRequestSerializer#deserializeList}.
*
* @param key
* key of the list state
* @param listState
* list state using the {@link VoidNamespace}, must also be a {@link
* KvState} instance
*
* @throws Exception
*/
public static void testListSerialization(final long key,
final ListState<Long> listState) throws Exception {
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
final KvState<VoidNamespace> listKvState =
(KvState<VoidNamespace>) listState;
listKvState.setCurrentNamespace(VoidNamespace.INSTANCE);
// List
int numElements = 10;
final int numElements = 10;
List<Long> expectedValues = new ArrayList<>();
final List<Long> expectedValues = new ArrayList<>();
for (int i = 0; i < numElements; i++) {
expectedValues.add(ThreadLocalRandom.current().nextLong());
final long value = ThreadLocalRandom.current().nextLong();
expectedValues.add(value);
listState.add(value);
}
byte[] serializedValues = KvStateRequestSerializer.serializeList(expectedValues, valueSerializer);
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, LongSerializer.INSTANCE,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
final byte[] serializedValues =
listKvState.getSerializedValue(serializedKey);
List<Long> actualValues = KvStateRequestSerializer.deserializeList(serializedValues, valueSerializer);
assertEquals(expectedValues, actualValues);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.query;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import java.io.File;
import static org.mockito.Mockito.mock;
/**
* Additional tests for the serialization and deserialization of {@link
* KvStateRequestSerializer} with a RocksDB state back-end.
*/
public final class KVStateRequestSerializerRocksDBTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* Extension of {@link RocksDBKeyedStateBackend} to make {@link
* #createListState(TypeSerializer, ListStateDescriptor)} public for use in
* the tests.
*
* @param <K> key type
*/
final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
RocksDBKeyedStateBackend2(final JobID jobId,
final String operatorIdentifier,
final ClassLoader userCodeClassLoader,
final File instanceBasePath, final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions,
final TaskKvStateRegistry kvStateRegistry,
final TypeSerializer<K> keySerializer, final int numberOfKeyGroups,
final KeyGroupRange keyGroupRange) throws Exception {
super(jobId, operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
numberOfKeyGroups, keyGroupRange);
}
@Override
public <N, T> ListState<T> createListState(
final TypeSerializer<N> namespaceSerializer,
final ListStateDescriptor<T> stateDesc) throws Exception {
return super.createListState(namespaceSerializer, stateDesc);
}
}
/**
* Tests list serialization and deserialization match.
*
* @see KvStateRequestSerializerTest#testListSerialization()
* KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
* test
*/
@Test
public void testListSerialization() throws Exception {
final long key = 0l;
TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackend2<>(
new JobID(), "no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0)
);
longHeapKeyedStateBackend.setCurrentKey(key);
final ListState<Long> listState = longHeapKeyedStateBackend
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
KvStateRequestSerializerTest.testListSerialization(key, listState);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册