PulsarSink.java 13.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.functions.sink;

21 22
import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
23
import org.apache.commons.lang3.StringUtils;
24 25
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
26
import org.apache.pulsar.client.api.MessageId;
27
import org.apache.pulsar.client.api.MessageRoutingMode;
28
import org.apache.pulsar.client.api.Producer;
29
import org.apache.pulsar.client.api.ProducerBuilder;
30 31
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
32 33
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
34
import org.apache.pulsar.common.functions.FunctionConfig;
35
import org.apache.pulsar.functions.api.Record;
36
import org.apache.pulsar.functions.instance.FunctionResultRouter;
37
import org.apache.pulsar.functions.instance.SinkRecord;
38
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
39
import org.apache.pulsar.functions.source.PulsarRecord;
40
import org.apache.pulsar.functions.source.TopicSchema;
41
import org.apache.pulsar.functions.utils.Reflections;
42
import org.apache.pulsar.io.core.Sink;
43
import org.apache.pulsar.io.core.SinkContext;
44

45 46 47 48 49 50 51 52
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
53
import java.util.function.Function;
54

55
@Slf4j
56
public class PulsarSink<T> implements Sink<T> {
57

58 59
    private final PulsarClient client;
    private final PulsarSinkConfig pulsarSinkConfig;
60
    private final Map<String, String> properties;
61
    private final ClassLoader functionClassLoader;
62
    private ComponentStatsManager stats;
63

64 65
    @VisibleForTesting
    PulsarSinkProcessor<T> pulsarSinkProcessor;
66

67
    private final TopicSchema topicSchema;
68

69
    private interface PulsarSinkProcessor<T> {
70

71
        TypedMessageBuilder<T> newMessage(Record<T> record);
72

73
        void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record);
74

75
        void close() throws Exception;
76 77
    }

78 79 80 81 82 83 84 85
    private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap<>();
        protected Schema schema;

        protected PulsarSinkProcessorBase(Schema schema) {
            this.schema = schema;
        }

86
        public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
87 88 89 90
                throws PulsarClientException {
            ProducerBuilder<T> builder = client.newProducer(schema)
                    .blockIfQueueFull(true)
                    .enableBatching(true)
91
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
92 93 94 95
                    .compressionType(CompressionType.LZ4)
                    .hashingScheme(HashingScheme.Murmur3_32Hash) //
                    .messageRoutingMode(MessageRoutingMode.CustomPartition)
                    .messageRouter(FunctionResultRouter.of())
96 97 98
                    // set send timeout to be infinity to prevent potential deadlock with consumer
                    // that might happen when consumer is blocked due to unacked messages
                    .sendTimeout(0, TimeUnit.SECONDS)
99 100 101 102
                    .topic(topic);
            if (producerName != null) {
                builder.producerName(producerName);
            }
103

104
            return builder.properties(properties).create();
105 106
        }

107 108
        protected Producer<T> getProducer(String destinationTopic) {
            return getProducer(destinationTopic, null, destinationTopic);
109 110
        }

111 112 113 114 115 116 117
        protected Producer<T> getProducer(String producerId, String producerName, String topicName) {
            return publishProducers.computeIfAbsent(producerId, s -> {
                try {
                    return createProducer(
                            client,
                            topicName,
                            producerName,
118
                            schema);
119 120 121 122 123
                } catch (PulsarClientException e) {
                    log.error("Failed to create Producer while doing user publish", e);
                    throw new RuntimeException(e);
                }
            });
124 125 126 127
        }

        @Override
        public void close() throws Exception {
128 129 130 131 132 133 134 135 136
            List<CompletableFuture<Void>> closeFutures = new ArrayList<>(publishProducers.size());
            for (Map.Entry<String, Producer<T>> entry: publishProducers.entrySet()) {
                Producer<T> producer = entry.getValue();
                closeFutures.add(producer.closeAsync());
            }
            try {
                org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);
            } catch (Exception e) {
                log.warn("Failed to close all the producers", e);
137 138
            }
        }
139

140
        public Function<Throwable, Void> getPublishErrorHandler(Record<T> record, boolean failSource) {
141 142 143 144

            return throwable -> {
                SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
                Record<T> srcRecord = sinkRecord.getSourceRecord();
145 146 147
                if (failSource) {
                    srcRecord.fail();
                }
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

                String topic = record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic());

                String errorMsg = null;
                if (srcRecord instanceof PulsarRecord) {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src message id [%s]", topic, throwable.getMessage(), ((PulsarRecord) srcRecord).getMessageId());
                    log.error(errorMsg);
                } else {
                    errorMsg = String.format("Failed to publish to topic [%s] with error [%s] with src sequence id [%s]", topic, throwable.getMessage(), record.getRecordSequence().get());
                    log.error(errorMsg);
                }
                stats.incrSinkExceptions(new Exception(errorMsg));
                return null;
            };
        }
163 164
    }

165 166 167 168 169 170 171
    @VisibleForTesting
    class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
        public PulsarSinkAtMostOnceProcessor(Schema schema) {
            super(schema);
            // initialize default topic
            try {
                publishProducers.put(pulsarSinkConfig.getTopic(),
172
                        createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
173 174 175
            } catch (PulsarClientException e) {
                log.error("Failed to create Producer while doing user publish", e);
                throw new RuntimeException(e);            }
176 177 178
        }

        @Override
179
        public TypedMessageBuilder<T> newMessage(Record<T> record) {
180
            return getProducer(record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())).newMessage();
181 182 183
        }

        @Override
184
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
185 186
            msg.sendAsync().thenAccept(messageId -> {
                //no op
187
            }).exceptionally(getPublishErrorHandler(record, false));
188 189 190 191 192 193 194
        }
    }

    @VisibleForTesting
    class PulsarSinkAtLeastOnceProcessor extends PulsarSinkAtMostOnceProcessor {
        public PulsarSinkAtLeastOnceProcessor(Schema schema) {
            super(schema);
195 196 197
        }

        @Override
198
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
199 200
            msg.sendAsync()
                    .thenAccept(messageId -> record.ack())
201
                    .exceptionally(getPublishErrorHandler(record, true));
202 203 204
        }
    }

205 206
    @VisibleForTesting
    class PulsarSinkEffectivelyOnceProcessor extends PulsarSinkProcessorBase {
207

208 209
        public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
            super(schema);
210 211 212
        }

        @Override
213
        public TypedMessageBuilder<T> newMessage(Record<T> record) {
214 215 216 217 218 219 220 221 222
            if (!record.getPartitionId().isPresent()) {
                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
            }

            return getProducer(
                    String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), record.getPartitionId().get()),
                    record.getPartitionId().get(),
                    record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())
            ).newMessage();
223 224 225
        }

        @Override
226
        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) {
227

228 229
            if (!record.getRecordSequence().isPresent()) {
                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
230
            }
231

232 233
            // assign sequence id to output message for idempotent producing
            msg.sequenceId(record.getRecordSequence().get());
234 235
            CompletableFuture<MessageId> future = msg.sendAsync();

236
            future.thenAccept(messageId -> record.ack()).exceptionally(getPublishErrorHandler(record, true));
237
            future.join();
238 239 240
        }
    }

241 242
    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties,
                      ComponentStatsManager stats, ClassLoader functionClassLoader) {
243 244
        this.client = client;
        this.pulsarSinkConfig = pulsarSinkConfig;
245
        this.topicSchema = new TopicSchema(client);
246
        this.properties = properties;
247
        this.stats = stats;
248
        this.functionClassLoader = functionClassLoader;
249 250 251
    }

    @Override
252
    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
S
Sanjeev Kulkarni 已提交
253 254
        log.info("Opening pulsar sink with config: {}", pulsarSinkConfig);

255
        Schema<T> schema = initializeSchema();
256 257 258 259
        if (schema == null) {
            log.info("Since output type is null, not creating any real sink");
            return;
        }
260 261 262 263

        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
        switch (processingGuarantees) {
            case ATMOST_ONCE:
264
                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema);
265 266
                break;
            case ATLEAST_ONCE:
267
                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema);
268 269
                break;
            case EFFECTIVELY_ONCE:
270
                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema);
271 272 273 274 275
                break;
        }
    }

    @Override
276
    public void write(Record<T> record) {
277
        TypedMessageBuilder<T> msg = pulsarSinkProcessor.newMessage(record);
278
        if (record.getKey().isPresent()) {
279
            msg.key(record.getKey().get());
280 281
        }

282
        msg.value(record.getValue());
283 284

        if (!record.getProperties().isEmpty()) {
285
            msg.properties(record.getProperties());
286 287 288 289 290
        }

        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
            PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) sinkRecord.getSourceRecord();
291
            // forward user properties to sink-topic
292 293 294
            msg.property("__pfn_input_topic__", pulsarRecord.getTopicName().get())
               .property("__pfn_input_msg_id__",
                         new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
295 296 297 298 299 300
        } else {
            // It is coming from some source
            Optional<Long> eventTime = sinkRecord.getSourceRecord().getEventTime();
            if (eventTime.isPresent()) {
                msg.eventTime(eventTime.get());
            }
301 302
        }

303
        pulsarSinkProcessor.sendOutputMessage(msg, record);
304 305 306 307
    }

    @Override
    public void close() throws Exception {
308 309 310
        if (this.pulsarSinkProcessor != null) {
            this.pulsarSinkProcessor.close();
        }
311 312
    }

313
    @SuppressWarnings("unchecked")
314
    @VisibleForTesting
315
    Schema<T> initializeSchema() throws ClassNotFoundException {
316
        if (StringUtils.isEmpty(this.pulsarSinkConfig.getTypeClassName())) {
317
            return (Schema<T>) Schema.BYTES;
318
        }
319

320
        Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
321 322
        if (Void.class.equals(typeArg)) {
            // return type is 'void', so there's no schema to check
323
            return null;
324 325 326 327
        }

        if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
            return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
328
                    pulsarSinkConfig.getSchemaType(), false);
329 330
        } else {
            return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
331
                    pulsarSinkConfig.getSerdeClassName(), false);
332 333
        }
    }
334
}