提交 99d3c11e 编写于 作者: T tuteng 提交者: Sijie Guo

Support Pulsar schema for pulsar kafka client wrapper (#4534)

Fixes https://github.com/apache/pulsar/issues/4228

Master Issue: https://github.com/apache/pulsar/issues/4228

### Motivation

Use Pulsar schema in pulsar kafka client.

### Modifications

Support schema of pulsar for pulsar kafka client

### Verifying this change

Add Unit test
上级 1b64a6e1
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.examples;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerAvroExample {
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
@SuppressWarnings("resource")
Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<Foo, Bar> records = consumer.poll(100);
records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
}
private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.examples;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerAvroExample {
public static void main(String[] args) {
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
log.info("Message {} sent successfully", i);
}
producer.close();
}
private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.examples.utils;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@Data
@ToString
@EqualsAndHashCode
public class Bar {
private boolean field1;
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat.examples.utils;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
@Data
@ToString
@EqualsAndHashCode
public class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
......@@ -41,7 +41,6 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
......@@ -49,20 +48,22 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
......@@ -74,8 +75,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final PulsarClient client;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final Schema<K> keySchema;
private final Schema<V> valueSchema;
private final String groupId;
private final boolean isAutoCommit;
......@@ -110,66 +111,75 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
public PulsarKafkaConsumer(Map<String, Object> configs) {
this(configs, null, null);
this(new ConsumerConfig(configs), null, null);
}
public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(configs),
new PulsarKafkaSchema<K>(keyDeserializer), new PulsarKafkaSchema<V>(valueDeserializer));
}
public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ConsumerConfig(configs), keySchema, valueSchema);
}
public PulsarKafkaConsumer(Properties properties) {
this(properties, null, null);
this(new ConsumerConfig(properties), null, null);
}
public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(properties),
new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
}
public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ConsumerConfig(properties), keySchema, valueSchema);
}
@SuppressWarnings("unchecked")
private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
if (keySchema == null) {
Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
} else {
this.keyDeserializer = keyDeserializer;
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keySchema = keySchema;
consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), true);
if (valueSchema == null) {
Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
kafkaValueDeserializer.configure(consumerConfig.originals(), true);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
} else {
this.valueDeserializer = valueDeserializer;
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueSchema = valueSchema;
consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
log.info("Offset reset strategy has been assigned value {}", strategy);
String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
// If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
if(config.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
if(consumerConfig.values().containsKey(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)){
maxRecordsInSinglePoll = consumerConfig.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
} else {
maxRecordsInSinglePoll = 1000;
}
interceptors = (List) config.getConfiguredInstances(
interceptors = (List) consumerConfig.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class);
this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
consumerConfig.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
......@@ -352,7 +362,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
}
K key = getKey(topic, msg);
V value = valueDeserializer.deserialize(topic, msg.getData());
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
}
V value = valueSchema.decode(msg.getData());
TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
long timestamp = msg.getPublishTime();
......@@ -403,13 +416,18 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
return null;
}
if (keyDeserializer instanceof StringDeserializer) {
return (K) msg.getKey();
} else {
// Assume base64 encoding
byte[] data = Base64.getDecoder().decode(msg.getKey());
return keyDeserializer.deserialize(topic, data);
if (keySchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
if (kafkaDeserializer instanceof StringDeserializer) {
return (K) msg.getKey();
}
pulsarKafkaSchema.setTopic(topic);
}
// Assume base64 encoding
byte[] data = Base64.getDecoder().decode(msg.getKey());
return keySchema.decode(data);
}
@Override
......
......@@ -43,20 +43,21 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -67,59 +68,68 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final Schema<K> keySchema;
private final Schema<V> valueSchema;
private final Partitioner partitioner;
private volatile Cluster cluster = Cluster.empty();
private List<ProducerInterceptor<K, V>> interceptors;
private final Properties properties;
public PulsarKafkaProducer(Map<String, Object> configs) {
this(configs, null, null);
this(new ProducerConfig(configs), null, null);
}
public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
this(configs, new Properties(), keySerializer, valueSerializer);
Serializer<V> valueSerializer) {
this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
}
public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ProducerConfig(configs), keySchema, valueSchema);
}
public PulsarKafkaProducer(Properties properties) {
this(properties, null, null);
this(new ProducerConfig(properties), null, null);
}
public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new HashMap<>(), properties, keySerializer, valueSerializer);
this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
}
@SuppressWarnings({ "unchecked", "deprecation" })
private PulsarKafkaProducer(Map<String, Object> conf, Properties properties, Serializer<K> keySerializer,
Serializer<V> valueSerializer) {
properties.forEach((k, v) -> conf.put((String) k, v));
public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
this(new ProducerConfig(properties), keySchema, valueSchema);
}
ProducerConfig producerConfig = new ProducerConfig(conf);
@SuppressWarnings({ "unchecked", "deprecation" })
private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
if (keySerializer == null) {
this.keySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(producerConfig.originals(), true);
if (keySchema == null) {
Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaKeySerializer.configure(producerConfig.originals(), true);
this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
} else {
this.keySerializer = keySerializer;
this.keySchema = keySchema;
producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (valueSerializer == null) {
this.valueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(producerConfig.originals(), false);
if (valueSchema == null) {
Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
kafkaValueSerializer.configure(producerConfig.originals(), false);
this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
} else {
this.valueSerializer = valueSerializer;
this.valueSchema = valueSchema;
producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
partitioner.configure(producerConfig.originals());
this.properties = new Properties();
producerConfig.originals().forEach((k, v) -> properties.put(k, v));
long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
......@@ -275,7 +285,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
// Add the partitions info for the new topic
cluster = cluster.withPartitions(readPartitionsInfo(topic));
List<org.apache.pulsar.client.api.ProducerInterceptor> wrappedInterceptors = interceptors.stream()
.map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySerializer, valueSerializer, topic))
.map(interceptor -> new KafkaProducerInterceptorWrapper(interceptor, keySchema, valueSchema, topic))
.collect(Collectors.toList());
return pulsarProducerBuilder.clone()
.topic(topic)
......@@ -312,7 +322,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
builder.eventTime(record.timestamp());
}
byte[] value = valueSerializer.serialize(record.topic(), record.value());
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
}
byte[] value = valueSchema.encode(record.value());
builder.value(value);
if (record.partition() != null) {
......@@ -329,12 +342,14 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
private String getKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (keySerializer instanceof StringSerializer) {
if (key instanceof String) {
return (String) key;
} else {
byte[] keyBytes = keySerializer.serialize(topic, key);
return Base64.getEncoder().encodeToString(keyBytes);
}
if (keySchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema) keySchema).setTopic(topic);
}
byte[] keyBytes = keySchema.encode(key);
return Base64.getEncoder().encodeToString(keyBytes);
}
private RecordMetadata getRecordMetadata(String topic, TypedMessageBuilder<byte[]> msgBuilder, MessageId messageId,
......
......@@ -67,9 +67,9 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
final private org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor;
// For serializer key/value, and to determine the deserializer for key/value.
private final Serializer<K> keySerializer;
private final Schema<K> keySchema;
private final Serializer<V> valueSerializer;
private final Schema<V> valueSchema;
// Keep the topic, as each Pulsar producer will tie to a Kafka topic, and ProducerInterceptor will tie to a Pulsar
// producer, it's safe to set it as final.
......@@ -91,12 +91,12 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
* @param topic Topic this {@link ProducerInterceptor} will be associated to.
*/
public KafkaProducerInterceptorWrapper(org.apache.kafka.clients.producer.ProducerInterceptor<K, V> kafkaProducerInterceptor,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
Schema<K> keySchema,
Schema<V> valueSchema,
String topic) {
this.kafkaProducerInterceptor = kafkaProducerInterceptor;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.topic = topic;
}
......@@ -163,7 +163,10 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
private Message<byte[]> toPulsarMessage(ProducerRecord<K, V> producerRecord) {
TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, scheme);
typedMessageBuilder.key(serializeKey(topic, producerRecord.key()));
typedMessageBuilder.value(valueSerializer.serialize(topic, producerRecord.value()));
if (valueSchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
}
typedMessageBuilder.value(valueSchema.encode(producerRecord.value()));
typedMessageBuilder.eventTime(eventTime);
typedMessageBuilder.property(KafkaMessageRouter.PARTITION_ID, partitionID);
return typedMessageBuilder.getMessage();
......@@ -178,8 +181,14 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
* @return Kafka record.
*/
private ProducerRecord<K, V> toKafkaRecord(Message<byte[]> message) {
Deserializer valueDeserializer = getDeserializer(valueSerializer);
V value = (V) valueDeserializer.deserialize(topic, message.getValue());
V value;
if (valueSchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<V> pulsarKeyKafkaSchema = (PulsarKafkaSchema<V>) valueSchema;
Deserializer valueDeserializer = getDeserializer((pulsarKeyKafkaSchema.getKafkaSerializer()));
value = (V) valueDeserializer.deserialize(topic, message.getValue());
} else {
value = valueSchema.decode(message.getValue());
}
try {
scheme = (Schema<byte[]>) FieldUtils.readField(message, "schema", true);
PulsarApi.MessageMetadata.Builder messageMetadataBuilder = ((MessageImpl<byte[]>)message).getMessageBuilder();
......@@ -198,22 +207,28 @@ public class KafkaProducerInterceptorWrapper<K, V> implements ProducerIntercepto
private String serializeKey(String topic, K key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (keySerializer instanceof StringSerializer) {
if (key instanceof String) {
return (String) key;
} else {
byte[] keyBytes = keySerializer.serialize(topic, key);
return Base64.getEncoder().encodeToString(keyBytes);
}
if (keySchema instanceof PulsarKafkaSchema) {
((PulsarKafkaSchema<K>) keySchema).setTopic(topic);
}
byte[] keyBytes = keySchema.encode(key);
return Base64.getEncoder().encodeToString(keyBytes);
}
private K deserializeKey(String topic, String key) {
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (keySerializer instanceof StringSerializer) {
return (K) key;
} else {
Deserializer keyDeserializer = getDeserializer(keySerializer);
if (keySchema instanceof PulsarKafkaSchema) {
PulsarKafkaSchema<K> pulsarKeyKafkaSchema = (PulsarKafkaSchema<K>) keySchema;
// If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
if (pulsarKeyKafkaSchema.getKafkaSerializer() instanceof StringSerializer) {
return (K) key;
}
Deserializer keyDeserializer = getDeserializer(pulsarKeyKafkaSchema.getKafkaSerializer());
return (K) keyDeserializer.deserialize(topic, Base64.getDecoder().decode(key));
}
return keySchema.decode(Base64.getDecoder().decode(key));
}
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.kafka.compat;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import static com.google.common.base.Preconditions.checkArgument;
public class PulsarKafkaSchema<T> implements Schema<T> {
private final Serializer<T> kafkaSerializer;
private final Deserializer<T> kafkaDeserializer;
private String topic;
public PulsarKafkaSchema(Serializer<T> serializer) {
this(serializer, null);
}
public PulsarKafkaSchema(Deserializer<T> deserializer) {
this(null, deserializer);
}
public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
this.kafkaSerializer = serializer;
this.kafkaDeserializer = deserializer;
}
public Serializer<T> getKafkaSerializer() {
return kafkaSerializer;
}
public Deserializer<T> getKafkaDeserializer() {
return kafkaDeserializer;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public byte[] encode(T message) {
checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
return kafkaSerializer.serialize(this.topic, message);
}
@Override
public T decode(byte[] message) {
checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
return kafkaDeserializer.deserialize(this.topic, message);
}
@Override
public SchemaInfo getSchemaInfo() {
return Schema.BYTES.getSchemaInfo();
}
}
......@@ -18,19 +18,23 @@
*/
package org.apache.kafka.clients.producer;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
......@@ -64,6 +68,24 @@ import static org.mockito.Mockito.when;
@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
public class PulsarKafkaProducerTest {
@Data
@ToString
@EqualsAndHashCode
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
@Data
@ToString
@EqualsAndHashCode
public static class Bar {
private boolean field1;
}
@ObjectFactory
// Necessary to make PowerMockito.mockStatic work with TestNG.
public IObjectFactory getObjectFactory() {
......@@ -103,7 +125,7 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
new PulsarKafkaProducer<>(properties, null, null);
new PulsarKafkaProducer<>(properties);
verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
......@@ -149,7 +171,7 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
// Act
PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, null, null);
PulsarKafkaProducer<String, String> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,"key", "value"));
......@@ -157,6 +179,65 @@ public class PulsarKafkaProducerTest {
verify(mockProducerBuilder, times(1)).intercept(anyVararg());
}
@Test
public void testPulsarKafkaSendAvro() throws PulsarClientException {
// Arrange
PulsarClient mockClient = mock(PulsarClient.class);
ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
CompletableFuture mockPartitionFuture = new CompletableFuture();
CompletableFuture mockSendAsyncFuture = new CompletableFuture();
TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
mockPartitionFuture.complete(new ArrayList<>());
mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
doReturn(mockClient).when(mockClientBuilder).build();
doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
doReturn(mockProducerBuilder).when(mockProducerBuilder).intercept(anyVararg());
doReturn(mockProducer).when(mockProducerBuilder).create();
doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
Properties properties = new Properties();
List interceptors = new ArrayList();
interceptors.add("org.apache.kafka.clients.producer.PulsarKafkaProducerTest$PulsarKafkaProducerInterceptor");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
// Act
PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1,foo, bar));
// Verify
verify(mockTypedMessageBuilder, times(1)).sendAsync();
verify(mockProducerBuilder, times(1)).intercept(anyVararg());
}
@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
Properties properties = new Properties();
......@@ -166,7 +247,7 @@ public class PulsarKafkaProducerTest {
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
new PulsarKafkaProducer<>(properties, null, null);
new PulsarKafkaProducer<>(properties);
}
public static class PulsarKafkaProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
......
......@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.api.ProducerInterceptor;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.schema.BytesSchema;
......@@ -97,9 +98,12 @@ public class KafkaProducerInterceptorWrapperTest {
}
}).when(mockInterceptor2).onSend(any(ProducerRecord.class));
Schema<String> pulsarKeySerializeSchema = new PulsarKafkaSchema<>(new StringSerializer());
Schema<byte[]> pulsarValueSerializeSchema = new PulsarKafkaSchema<>(new ByteArraySerializer());
ProducerInterceptors producerInterceptors = new ProducerInterceptors(Arrays.asList(new ProducerInterceptor[]{
new KafkaProducerInterceptorWrapper(mockInterceptor1, new StringSerializer(), new ByteArraySerializer(), topic),
new KafkaProducerInterceptorWrapper(mockInterceptor2, new StringSerializer(), new ByteArraySerializer(), topic)}));
new KafkaProducerInterceptorWrapper(mockInterceptor1, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic),
new KafkaProducerInterceptorWrapper(mockInterceptor2, pulsarKeySerializeSchema, pulsarValueSerializeSchema, topic)}));
TypedMessageBuilderImpl typedMessageBuilder = new TypedMessageBuilderImpl(null, new BytesSchema());
typedMessageBuilder.key("original key");
......
......@@ -57,6 +57,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-kafka</artifactId>
......
......@@ -33,8 +33,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
......@@ -53,12 +57,37 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
public class KafkaApiTest extends PulsarStandaloneTestSuite {
@Data
@ToString
@EqualsAndHashCode
public static class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
@Data
@ToString
@EqualsAndHashCode
public static class Bar {
private boolean field1;
}
private static String getPlainTextServiceUrl() {
return container.getPlainTextServiceUrl();
}
......@@ -609,4 +638,252 @@ public class KafkaApiTest extends PulsarStandaloneTestSuite {
producer.close();
}
@Test
public void testProducerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerAvroSchemaWithPulsarKafkaClient";
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Consumer<byte[]> pulsarConsumer =
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")
.subscribe();
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<Bar, Foo>(topic, bar, foo));
}
producer.flush();
producer.close();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = pulsarConsumer.receive(1, TimeUnit.SECONDS);
Foo value = fooSchema.decode(msg.getValue());
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), i);
pulsarConsumer.acknowledge(msg);
}
}
@Test
public void testConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testConsumerAvroSchemaWithPulsarKafkaClient";
StringSchema stringSchema = new StringSchema();
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<String, Foo>(props, new StringSchema(), fooSchema);
consumer.subscribe(Arrays.asList(topic));
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPlainTextServiceUrl()).build();
org.apache.pulsar.client.api.Producer<Foo> pulsarProducer = pulsarClient.newProducer(fooSchema).topic(topic).create();
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
pulsarProducer.newMessage().keyBytes(stringSchema.encode(Integer.toString(i))).value(foo).send();
}
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Assert.assertEquals(record.key(), Integer.toString(received.get()));
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerAvroSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerAvroSchemaWithPulsarKafkaClient";
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, bar, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Bar, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Bar key = record.key();
Assert.assertTrue(key.isField1());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerJsonSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerJsonSchemaWithPulsarKafkaClient";
JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<Bar, Foo> consumer = new KafkaConsumer<>(props, barSchema, fooSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<Bar, Foo> producer = new KafkaProducer<>(props, barSchema, fooSchema);
for (int i = 0; i < 10; i++) {
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, bar, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<Bar, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
Bar key = record.key();
Assert.assertTrue(key.isField1());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
@Test
public void testProducerConsumerMixedSchemaWithPulsarKafkaClient() throws Exception {
String topic = "testProducerConsumerMixedSchemaWithPulsarKafkaClient";
Schema<String> keySchema = new PulsarKafkaSchema<>(new StringSerializer(), new StringDeserializer());
JSONSchema<Foo> valueSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
Properties props = new Properties();
props.put("bootstrap.servers", getPlainTextServiceUrl());
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@Cleanup
Consumer<String, Foo> consumer = new KafkaConsumer<>(props, keySchema, valueSchema);
consumer.subscribe(Arrays.asList(topic));
Producer<String, Foo> producer = new KafkaProducer<>(props, keySchema, valueSchema);
for (int i = 0; i < 10; i++) {
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(i);
producer.send(new ProducerRecord<>(topic, "hello" + i, foo));
}
producer.flush();
producer.close();
AtomicInteger received = new AtomicInteger();
while (received.get() < 10) {
ConsumerRecords<String, Foo> records = consumer.poll(100);
if (!records.isEmpty()) {
records.forEach(record -> {
String key = record.key();
Assert.assertEquals(key, "hello" + received.get());
Foo value = record.value();
Assert.assertEquals(value.getField1(), "field1");
Assert.assertEquals(value.getField2(), "field2");
Assert.assertEquals(value.getField3(), received.get());
received.incrementAndGet();
});
consumer.commitSync();
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册