/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pulsar.functions.instance; import com.google.common.base.Stopwatch; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; import io.prometheus.client.CollectorRegistry; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.clients.StorageClientBuilder; import org.apache.bookkeeper.clients.admin.StorageAdminClient; import org.apache.bookkeeper.clients.config.StorageClientSettings; import org.apache.bookkeeper.clients.exceptions.ClientException; import org.apache.bookkeeper.clients.exceptions.InternalServerException; import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException; import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException; import org.apache.bookkeeper.common.util.Backoff.Jitter; import org.apache.bookkeeper.common.util.Backoff.Jitter.Type; import org.apache.bookkeeper.stream.proto.NamespaceConfiguration; import org.apache.bookkeeper.stream.proto.StorageType; import org.apache.bookkeeper.stream.proto.StreamConfiguration; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.functions.ConsumerConfig; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.instance.stats.ComponentStatsManager; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder; import org.apache.pulsar.functions.secretsprovider.SecretsProvider; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.FunctionCommon; import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF; /** * A function container implemented using java thread. */ @Slf4j public class JavaInstanceRunnable implements AutoCloseable, Runnable { private final InstanceConfig instanceConfig; private final FunctionCacheManager fnCache; private final String jarFile; // input topic consumer & output topic producer private final PulsarClientImpl client; private LogAppender logAppender; // provide tables for storing states private final String stateStorageServiceUrl; @Getter(AccessLevel.PACKAGE) private StorageClient storageClient; @Getter(AccessLevel.PACKAGE) private Table stateTable; private JavaInstance javaInstance; @Getter private Throwable deathException; // function stats @Getter private ComponentStatsManager stats; private Record currentRecord; private Source source; private Sink sink; private final SecretsProvider secretsProvider; private CollectorRegistry collectorRegistry; private final String[] metricsLabels; private InstanceCache instanceCache; private final org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType; private final Map properties; private final ClassLoader instanceClassLoader; private ClassLoader functionClassLoader; public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, String jarFile, PulsarClient pulsarClient, String stateStorageServiceUrl, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) { this.instanceConfig = instanceConfig; this.fnCache = fnCache; this.jarFile = jarFile; this.client = (PulsarClientImpl) pulsarClient; this.stateStorageServiceUrl = stateStorageServiceUrl; this.secretsProvider = secretsProvider; this.collectorRegistry = collectorRegistry; this.metricsLabels = new String[]{ instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace()), instanceConfig.getFunctionDetails().getName(), String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()) }; this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()); this.properties = InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId()); // Declare function local collector registry so that it will not clash with other function instances' // metrics collection especially in threaded mode // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down this.collectorRegistry = collectorRegistry; this.instanceClassLoader = Thread.currentThread().getContextClassLoader(); } /** * NOTE: this method should be called in the instance thread, in order to make class loading work. */ JavaInstance setupJavaInstance() throws Exception { // initialize the thread context ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())); ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); ThreadContext.put("instance", instanceConfig.getInstanceName()); log.info("Starting Java Instance {} : \n Details = {}", instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails()); // start the function thread functionClassLoader = loadJars(); Object object = Reflections.createInstance( instanceConfig.getFunctionDetails().getClassName(), functionClassLoader); if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) { throw new RuntimeException("User class must either be Function or java.util.Function"); } // start the state table setupStateTable(); ContextImpl contextImpl = setupContext(); // start the output producer setupOutput(contextImpl); // start the input consumer setupInput(contextImpl); // start any log topic handler setupLogHandler(); return new JavaInstance(contextImpl, object); } ContextImpl setupContext() { Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider, collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable); } /** * The core logic that initialize the instance thread and executes the function. */ @Override public void run() { try { this.instanceCache = InstanceCache.getInstanceCache(); if (this.collectorRegistry == null) { this.collectorRegistry = new CollectorRegistry(); } this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), this.componentType); javaInstance = setupJavaInstance(); while (true) { currentRecord = readInput(); // increment number of records received from source stats.incrTotalReceived(); if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions .proto.Function.ProcessingGuarantees.ATMOST_ONCE) { if (instanceConfig.getFunctionDetails().getAutoAck()) { currentRecord.ack(); } } addLogTopicHandler(); JavaExecutionResult result; // set last invocation time stats.setLastInvocation(System.currentTimeMillis()); // start time for process latency stat stats.processTimeStart(); // process the message Thread.currentThread().setContextClassLoader(functionClassLoader); result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); Thread.currentThread().setContextClassLoader(instanceClassLoader); // register end time stats.processTimeEnd(); removeLogTopicHandler(); if (log.isDebugEnabled()) { log.debug("Got result: {}", result.getResult()); } try { processResult(currentRecord, result); } catch (Exception e) { log.warn("Failed to process result of message {}", currentRecord, e); currentRecord.fail(); } } } catch (Throwable t) { log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId( instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace(), instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId()), t); deathException = t; if (stats != null) { stats.incrSysExceptions(t); } return; } finally { log.info("Closing instance"); close(); } } private ClassLoader loadJars() throws Exception { ClassLoader fnClassLoader; try { log.info("Load JAR: {}", jarFile); // Let's first try to treat it as a nar archive fnCache.registerFunctionInstanceWithArchive( instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), jarFile); } catch (FileNotFoundException e) { // create the function class loader fnCache.registerFunctionInstance( instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), Arrays.asList(jarFile), Collections.emptyList()); } log.info("Initialize function class loader for function {} at function cache manager", instanceConfig.getFunctionDetails().getName()); fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId()); if (null == fnClassLoader) { throw new Exception("No function class loader available."); } return fnClassLoader; } private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception { try (StorageAdminClient storageAdminClient = StorageClientBuilder.newBuilder() .withSettings(settings) .buildAdmin()) { StreamConfiguration streamConf = StreamConfiguration.newBuilder(DEFAULT_STREAM_CONF) .setInitialNumRanges(4) .setMinNumRanges(4) .setStorageType(StorageType.TABLE) .build(); Stopwatch elapsedWatch = Stopwatch.createStarted(); while (elapsedWatch.elapsed(TimeUnit.MINUTES) < 1) { try { result(storageAdminClient.getStream(tableNs, tableName)); return; } catch (NamespaceNotFoundException nnfe) { try { result(storageAdminClient.createNamespace(tableNs, NamespaceConfiguration.newBuilder() .setDefaultStreamConf(streamConf) .build())); result(storageAdminClient.createStream(tableNs, tableName, streamConf)); } catch (Exception e) { // there might be two clients conflicting at creating table, so let's retrieve the table again // to make sure the table is created. } } catch (StreamNotFoundException snfe) { try { result(storageAdminClient.createStream(tableNs, tableName, streamConf)); } catch (Exception e) { // there might be two client conflicting at creating table, so let's retrieve it to make // sure the table is created. } } catch (ClientException ce) { log.warn("Encountered issue on fetching state stable metadata, re-attempting in 100 milliseconds", ce.getMessage()); TimeUnit.MILLISECONDS.sleep(100); } } } } private void setupStateTable() throws Exception { if (null == stateStorageServiceUrl) { return; } String tableNs = FunctionCommon.getStateNamespace( instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace() ); String tableName = instanceConfig.getFunctionDetails().getName(); StorageClientSettings settings = StorageClientSettings.newBuilder() .serviceUri(stateStorageServiceUrl) .clientName("function-" + tableNs + "/" + tableName) // configure a maximum 2 minutes jitter backoff for accessing table service .backoffPolicy(Jitter.of( Type.EXPONENTIAL, 100, 2000, 60 )) .build(); // we defer creation of the state table until a java instance is running here. createStateTable(tableNs, tableName, settings); log.info("Starting state table for function {}", instanceConfig.getFunctionDetails().getName()); this.storageClient = StorageClientBuilder.newBuilder() .withSettings(settings) .withNamespace(tableNs) .build(); // NOTE: this is a workaround until we bump bk version to 4.9.0 // table might just be created above, so it might not be ready for serving traffic Stopwatch openSw = Stopwatch.createStarted(); while (openSw.elapsed(TimeUnit.MINUTES) < 1) { try { this.stateTable = result(storageClient.openTable(tableName)); break; } catch (InternalServerException ise) { log.warn("Encountered internal server on opening table '{}', re-attempt in 100 milliseconds : {}", tableName, ise.getMessage()); TimeUnit.MILLISECONDS.sleep(100); } } } private void processResult(Record srcRecord, JavaExecutionResult result) throws Exception { if (result.getUserException() != null) { log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException()); stats.incrUserExceptions(result.getUserException()); srcRecord.fail(); } else { if (result.getResult() != null) { sendOutputMessage(srcRecord, result.getResult()); } else { if (instanceConfig.getFunctionDetails().getAutoAck()) { // the function doesn't produce any result or the user doesn't want the result. srcRecord.ack(); } } // increment total successfully processed stats.incrTotalProcessedSuccessfully(); } } private void sendOutputMessage(Record srcRecord, Object output) { if (!(this.sink instanceof PulsarSink)) { Thread.currentThread().setContextClassLoader(functionClassLoader); } try { this.sink.write(new SinkRecord<>(srcRecord, output)); } catch (Exception e) { log.info("Encountered exception in sink write: ", e); stats.incrSinkExceptions(e); throw new RuntimeException(e); } finally { Thread.currentThread().setContextClassLoader(instanceClassLoader); } } private Record readInput() { Record record; if (!(this.source instanceof PulsarSource)) { Thread.currentThread().setContextClassLoader(functionClassLoader); } try { record = this.source.read(); } catch (Exception e) { stats.incrSourceExceptions(e); log.info("Encountered exception in source read: ", e); throw new RuntimeException(e); } finally { Thread.currentThread().setContextClassLoader(instanceClassLoader); } // check record is valid if (record == null) { throw new IllegalArgumentException("The record returned by the source cannot be null"); } else if (record.getValue() == null) { throw new IllegalArgumentException("The value in the record returned by the source cannot be null"); } return record; } /** * NOTE: this method is be syncrhonized because it is potentially called by two different places * one inside the run/finally clause and one inside the ThreadRuntime::stop */ @Override synchronized public void close() { if (stats != null) { stats.close(); stats = null; } if (source != null) { if (!(this.source instanceof PulsarSource)) { Thread.currentThread().setContextClassLoader(functionClassLoader); } try { source.close(); } catch (Throwable e) { log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); } finally { Thread.currentThread().setContextClassLoader(instanceClassLoader); } source = null; } if (sink != null) { if (!(this.sink instanceof PulsarSink)) { Thread.currentThread().setContextClassLoader(functionClassLoader); } try { sink.close(); } catch (Throwable e) { log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e); } finally { Thread.currentThread().setContextClassLoader(instanceClassLoader); } sink = null; } if (null != javaInstance) { javaInstance.close(); javaInstance = null; } // kill the state table if (null != stateTable) { stateTable.close(); stateTable = null; } if (null != storageClient) { storageClient.closeAsync() .exceptionally(cause -> { log.warn("Failed to close state storage client", cause); return null; }); storageClient = null; } if (instanceCache != null) { // once the thread quits, clean up the instance fnCache.unregisterFunctionInstance( instanceConfig.getFunctionId(), instanceConfig.getInstanceName()); log.info("Unloading JAR files for function {}", instanceConfig); instanceCache = null; } } public InstanceCommunication.MetricsData getAndResetMetrics() { InstanceCommunication.MetricsData metricsData = getMetrics(); stats.reset(); return metricsData; } public InstanceCommunication.MetricsData getMetrics() { InstanceCommunication.MetricsData.Builder bldr = createMetricsDataBuilder(); if (javaInstance != null) { Map userMetrics = javaInstance.getMetrics(); if (userMetrics != null) { bldr.putAllUserMetrics(userMetrics); } } return bldr.build(); } public void resetMetrics() { stats.reset(); javaInstance.resetMetrics(); } private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); if (stats != null) { bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully()); bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions()); bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions()); bldr.setReceivedTotal((long) stats.getTotalRecordsReceived()); bldr.setAvgProcessLatency(stats.getAvgProcessLatency()); bldr.setLastInvocation((long) stats.getLastInvocation()); bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min()); bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min()); bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min()); bldr.setReceivedTotal1Min((long) stats.getTotalRecordsReceived1min()); bldr.setAvgProcessLatency1Min(stats.getAvgProcessLatency1min()); } return bldr; } public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() { InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); if (stats != null) { functionStatusBuilder.setNumReceived((long) stats.getTotalRecordsReceived()); functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully()); functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions()); stats.getLatestUserExceptions().forEach(ex -> { functionStatusBuilder.addLatestUserExceptions(ex); }); functionStatusBuilder.setNumSystemExceptions((long) stats.getTotalSysExceptions()); stats.getLatestSystemExceptions().forEach(ex -> { functionStatusBuilder.addLatestSystemExceptions(ex); }); stats.getLatestSourceExceptions().forEach(ex -> { functionStatusBuilder.addLatestSourceExceptions(ex); }); stats.getLatestSinkExceptions().forEach(ex -> { functionStatusBuilder.addLatestSinkExceptions(ex); }); functionStatusBuilder.setAverageLatency(stats.getAvgProcessLatency()); functionStatusBuilder.setLastInvocationTime((long) stats.getLastInvocation()); } return functionStatusBuilder; } private void setupLogHandler() { if (instanceConfig.getFunctionDetails().getLogTopic() != null && !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) { logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())); logAppender.start(); } } private void addLogTopicHandler() { if (logAppender == null) return; LoggerContext context = LoggerContext.getContext(false); Configuration config = context.getConfiguration(); config.addAppender(logAppender); for (final LoggerConfig loggerConfig : config.getLoggers().values()) { loggerConfig.addAppender(logAppender, null, null); } config.getRootLogger().addAppender(logAppender, null, null); } private void removeLogTopicHandler() { if (logAppender == null) return; LoggerContext context = LoggerContext.getContext(false); Configuration config = context.getConfiguration(); for (final LoggerConfig loggerConfig : config.getLoggers().values()) { loggerConfig.removeAppender(logAppender.getName()); } config.getRootLogger().removeAppender(logAppender.getName()); } public void setupInput(ContextImpl contextImpl) throws Exception { SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource(); Object object; // If source classname is not set, we default pulsar source if (sourceSpec.getClassName().isEmpty()) { PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig(); sourceSpec.getInputSpecsMap().forEach((topic, conf) -> { ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build(); if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) { consumerConfig.setSchemaType(conf.getSchemaType()); } else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { consumerConfig.setSerdeClassName(conf.getSerdeClassName()); } if (conf.hasReceiverQueueSize()) { consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue()); } pulsarSourceConfig.getTopicSchema().put(topic, consumerConfig); }); sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> { pulsarSourceConfig.getTopicSchema().put(topic, ConsumerConfig.builder() .serdeClassName(serde) .isRegexPattern(false) .build()); }); if (!StringUtils.isEmpty(sourceSpec.getTopicsPattern())) { pulsarSourceConfig.getTopicSchema().get(sourceSpec.getTopicsPattern()).setRegexPattern(true); } pulsarSourceConfig.setSubscriptionName( StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() : InstanceUtils.getDefaultSubscriptionName(instanceConfig.getFunctionDetails())); pulsarSourceConfig.setProcessingGuarantees( FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); switch (sourceSpec.getSubscriptionType()) { case FAILOVER: pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover); break; default: pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared); break; } pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName()); if (sourceSpec.getTimeoutMs() > 0 ) { pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs()); } if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) { pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries()); pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic()); } object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader); } else { object = Reflections.createInstance( sourceSpec.getClassName(), this.functionClassLoader); } Class[] typeArgs; if (object instanceof Source) { typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass()); assert typeArgs.length > 0; } else { throw new RuntimeException("Source does not implement correct interface"); } this.source = (Source) object; if (!(this.source instanceof PulsarSource)) { Thread.currentThread().setContextClassLoader(this.functionClassLoader); } try { if (sourceSpec.getConfigs().isEmpty()) { this.source.open(new HashMap<>(), contextImpl); } else { this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), new TypeToken>() { }.getType()), contextImpl); } } catch (Exception e) { log.error("Source open produced uncaught exception: ", e); throw e; } finally { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } public void setupOutput(ContextImpl contextImpl) throws Exception { SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink(); Object object; // If sink classname is not set, we default pulsar sink if (sinkSpec.getClassName().isEmpty()) { if (StringUtils.isEmpty(sinkSpec.getTopic())) { object = PulsarSinkDisable.INSTANCE; } else { PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig(); pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf( this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); pulsarSinkConfig.setTopic(sinkSpec.getTopic()); if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) { pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType()); } else if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) { pulsarSinkConfig.setSerdeClassName(sinkSpec.getSerDeClassName()); } pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader); } } else { object = Reflections.createInstance( sinkSpec.getClassName(), this.functionClassLoader); } if (object instanceof Sink) { this.sink = (Sink) object; } else { throw new RuntimeException("Sink does not implement correct interface"); } if (!(this.sink instanceof PulsarSink)) { Thread.currentThread().setContextClassLoader(this.functionClassLoader); } try { if (sinkSpec.getConfigs().isEmpty()) { this.sink.open(new HashMap<>(), contextImpl); } else { this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken>() { }.getType()), contextImpl); } } catch (Exception e) { log.error("Sink open produced uncaught exception: ", e); throw e; } finally { Thread.currentThread().setContextClassLoader(this.instanceClassLoader); } } }