/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.pulsar.tests.integration.functions; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.functions.FunctionState; import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testng.annotations.Test; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; /** * State related test cases. */ @Slf4j public class PulsarStateTest extends PulsarStandaloneTestSuite { public static final String WORDCOUNT_PYTHON_CLASS = "wordcount_function.WordCountFunction"; public static final String WORDCOUNT_PYTHON_FILE = "wordcount_function.py"; @Test public void testPythonWordCountFunction() throws Exception { String inputTopicName = "test-wordcount-py-input-" + randomName(8); String outputTopicName = "test-wordcount-py-output-" + randomName(8); String functionName = "test-wordcount-py-fn-" + randomName(8); final int numMessages = 10; // submit the exclamation function submitExclamationFunction( Runtime.PYTHON, inputTopicName, outputTopicName, functionName); // get function info getFunctionInfoSuccess(functionName); // publish and consume result publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages); // get function status getFunctionStatus(functionName, numMessages); // get state queryState(functionName, "hello", numMessages); queryState(functionName, "test", numMessages); for (int i = 0; i < numMessages; i++) { queryState(functionName, "message-" + i, 1); } // delete function deleteFunction(functionName); // get function info getFunctionInfoNotFound(functionName); } @Test public void testSourceState() throws Exception { String outputTopicName = "test-state-source-output-" + randomName(8); String sourceName = "test-state-source-" + randomName(8); submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSource", JAVAJAR); // get source info getSourceInfoSuccess(sourceName); // get source status getSourceStatus(sourceName); try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { retryStrategically((test) -> { try { SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWritten > 0; } catch (PulsarAdminException e) { return false; } }, 10, 200); SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName); assertEquals(status.getInstances().size(), 1); assertTrue(status.getInstances().get(0).getStatus().numWritten > 0); FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "initial"); assertEquals(functionState.getStringValue(), "val1"); functionState = admin.functions().getFunctionState("public", "default", sourceName, "now"); assertTrue(functionState.getStringValue().matches("val1-.*")); } // delete source deleteSource(sourceName); getSourceInfoNotFound(sourceName); } @Test public void testSinkState() throws Exception { String inputTopicName = "test-state-sink-input-" + randomName(8); String sinkName = "test-state-sink-" + randomName(8); int numMessages = 10; submitSinkConnector(sinkName, inputTopicName, "org.apache.pulsar.tests.integration.io.TestStateSink", JAVAJAR); // get sink info getSinkInfoSuccess(sinkName); // get sink status getSinkStatus(sinkName); try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { // java supports schema @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(container.getPlainTextServiceUrl()) .build(); @Cleanup Producer producer = client.newProducer(Schema.STRING) .topic(inputTopicName) .create(); FunctionState functionState = admin.functions().getFunctionState("public", "default", sinkName, "initial"); assertEquals(functionState.getStringValue(), "val1"); for (int i = 0; i < numMessages; i++) { producer.send("foo"); } retryStrategically((test) -> { try { SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); return status.getInstances().size() > 0 && status.getInstances().get(0).getStatus().numWrittenToSink > 0; } catch (PulsarAdminException e) { return false; } }, 10, 200); SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName); assertEquals(status.getInstances().size(), 1); assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink > 0); functionState = admin.functions().getFunctionState("public", "default", sinkName, "now"); assertEquals(functionState.getStringValue(), String.format("val1-%d", numMessages - 1)); } // delete source deleteSink(sinkName); getSinkInfoNotFound(sinkName); } private void submitSourceConnector(String sourceName, String outputTopicName, String className, String archive) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sources", "create", "--name", sourceName, "--destinationTopicName", outputTopicName, "--archive", archive, "--classname", className }; log.info("Run command : {}", StringUtils.join(commands, ' ')); ContainerExecResult result = container.execCmd(commands); assertTrue( result.getStdout().contains("\"Created successfully\""), result.getStdout()); } private void submitSinkConnector(String sinkName, String inputTopicName, String className, String archive) throws Exception { String[] commands = { PulsarCluster.ADMIN_SCRIPT, "sinks", "create", "--name", sinkName, "--inputs", inputTopicName, "--archive", archive, "--classname", className }; log.info("Run command : {}", StringUtils.join(commands, ' ')); ContainerExecResult result = container.execCmd(commands); assertTrue( result.getStdout().contains("\"Created successfully\""), result.getStdout()); } private static void submitExclamationFunction(Runtime runtime, String inputTopicName, String outputTopicName, String functionName) throws Exception { submitFunction( runtime, inputTopicName, outputTopicName, functionName, getExclamationClass(runtime), Schema.BYTES); } protected static String getExclamationClass(Runtime runtime) { if (Runtime.PYTHON == runtime) { return WORDCOUNT_PYTHON_CLASS; } else { throw new IllegalArgumentException("Unsupported runtime : " + runtime); } } private static void submitFunction(Runtime runtime, String inputTopicName, String outputTopicName, String functionName, String functionClass, Schema inputTopicSchema) throws Exception { CommandGenerator generator; generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); generator.setSinkTopic(outputTopicName); generator.setFunctionName(functionName); String command; if (Runtime.JAVA == runtime) { command = generator.generateCreateFunctionCommand(); } else if (Runtime.PYTHON == runtime) { generator.setRuntime(runtime); command = generator.generateCreateFunctionCommand(WORDCOUNT_PYTHON_FILE); } else { throw new IllegalArgumentException("Unsupported runtime : " + runtime); } String[] commands = { "sh", "-c", command }; ContainerExecResult result = container.execCmd( commands); assertTrue(result.getStdout().contains("\"Created successfully\"")); ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema); } private static void ensureSubscriptionCreated(String inputTopicName, String subscriptionName, Schema inputTopicSchema) throws Exception { // ensure the function subscription exists before we start producing messages try (PulsarClient client = PulsarClient.builder() .serviceUrl(container.getPlainTextServiceUrl()) .build()) { try (Consumer ignored = client.newConsumer(inputTopicSchema) .topic(inputTopicName) .subscriptionType(SubscriptionType.Shared) .subscriptionName(subscriptionName) .subscribe()) { } } } private static void getSinkInfoSuccess(String sinkName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName ); assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\"")); } private static void getSourceInfoSuccess(String sourceName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName ); assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\"")); } private static void getFunctionInfoSuccess(String functionName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName ); assertTrue(result.getStdout().contains("\"name\": \"" + functionName + "\"")); } private static void getFunctionInfoNotFound(String functionName) throws Exception { try { container.execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", "get", "--tenant", "public", "--namespace", "default", "--name", functionName); fail("Command should have exited with non-zero"); } catch (ContainerExecException e) { assertTrue(e.getResult().getStderr().contains("Reason: Function " + functionName + " doesn't exist")); } } private static void getSinkStatus(String sinkName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sinks", "status", "--tenant", "public", "--namespace", "default", "--name", sinkName ); assertTrue(result.getStdout().contains("\"running\" : true")); } private static void getSourceStatus(String sourceName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sources", "status", "--tenant", "public", "--namespace", "default", "--name", sourceName ); assertTrue(result.getStdout().contains("\"running\" : true")); } private static void getFunctionStatus(String functionName, int numMessages) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", "getstatus", "--tenant", "public", "--namespace", "default", "--name", functionName ); assertTrue(result.getStdout().contains("\"running\" : true")); assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages)); } private static void queryState(String functionName, String key, int amount) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", "querystate", "--tenant", "public", "--namespace", "default", "--name", functionName, "--key", key ); assertTrue(result.getStdout().contains("\"numberValue\": " + amount)); } private static void publishAndConsumeMessages(String inputTopic, String outputTopic, int numMessages) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(container.getPlainTextServiceUrl()) .build(); @Cleanup Consumer consumer = client.newConsumer(Schema.BYTES) .topic(outputTopic) .subscriptionType(SubscriptionType.Exclusive) .subscriptionName("test-sub") .subscribe(); @Cleanup Producer producer = client.newProducer(Schema.BYTES) .topic(inputTopic) .create(); for (int i = 0; i < numMessages; i++) { producer.send(("hello test message-" + i).getBytes(UTF_8)); } for (int i = 0; i < numMessages; i++) { Message msg = consumer.receive(); assertEquals("hello test message-" + i + "!", new String(msg.getValue(), UTF_8)); } } private static void deleteFunction(String functionName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "functions", "delete", "--tenant", "public", "--namespace", "default", "--name", functionName ); assertTrue(result.getStdout().contains("Deleted successfully")); assertTrue(result.getStderr().isEmpty()); } private static void deleteSource(String sourceName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sources", "delete", "--tenant", "public", "--namespace", "default", "--name", sourceName ); assertTrue(result.getStdout().contains("Delete source successfully")); assertTrue(result.getStderr().isEmpty()); } private static void deleteSink(String sinkName) throws Exception { ContainerExecResult result = container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sinks", "delete", "--tenant", "public", "--namespace", "default", "--name", sinkName ); assertTrue(result.getStdout().contains("Deleted successfully")); assertTrue(result.getStderr().isEmpty()); } private static void getSourceInfoNotFound(String sourceName) throws Exception { try { container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName); fail("Command should have exited with non-zero"); } catch (ContainerExecException e) { assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist")); } } private static void getSinkInfoNotFound(String sinkName) throws Exception { try { container.execCmd( PulsarCluster.ADMIN_SCRIPT, "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName); fail("Command should have exited with non-zero"); } catch (ContainerExecException e) { assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist")); } } }