ContextImplTest.java 5.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.functions.instance;

M
Matteo Merli 已提交
21 22 23 24 25 26 27 28 29 30 31 32
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

33
import io.prometheus.client.CollectorRegistry;
M
Matteo Merli 已提交
34 35 36 37 38

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

39
import org.apache.bookkeeper.api.kv.Table;
40
import org.apache.pulsar.client.api.Producer;
41
import org.apache.pulsar.client.api.Schema;
42 43
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBase;
44 45
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
46
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
47
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
48
import org.apache.pulsar.functions.api.Record;
49 50
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
51
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
52
import org.slf4j.Logger;
L
Like 已提交
53 54 55
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

56 57 58 59 60 61 62
/**
 * Unit test {@link ContextImpl}.
 */
public class ContextImplTest {

    private InstanceConfig config;
    private Logger logger;
63
    private PulsarClientImpl client;
64
    private ContextImpl context;
65
    private Producer producer = mock(Producer.class);
66

L
Like 已提交
67
    @BeforeMethod
68 69 70 71 72 73 74
    public void setup() {
        config = new InstanceConfig();
        FunctionDetails functionDetails = FunctionDetails.newBuilder()
            .setUserConfig("")
            .build();
        config.setFunctionDetails(functionDetails);
        logger = mock(Logger.class);
75 76
        client = mock(PulsarClientImpl.class);
        when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
M
Matteo Merli 已提交
77
        when(client.createProducerAsync(any(ProducerConfigurationData.class), any(), any()))
78 79 80
                .thenReturn(CompletableFuture.completedFuture(producer));
        when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
        when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
81

82 83 84
        TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
        doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
        when(producer.newMessage()).thenReturn(messageBuilder);
85 86 87 88
        context = new ContextImpl(
            config,
            logger,
            client,
89
            new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
90
                FunctionDetails.ComponentType.FUNCTION, null, null);
91
        context.setCurrentMessageContext((Record<String>) () -> null);
92 93
    }

L
Like 已提交
94
    @Test(expectedExceptions = IllegalStateException.class)
95 96 97 98
    public void testIncrCounterStateDisabled() {
        context.incrCounter("test-key", 10);
    }

L
Like 已提交
99
    @Test(expectedExceptions = IllegalStateException.class)
100
    public void testGetCounterStateDisabled() {
101

102 103 104
        context.getCounter("test-key");
    }

L
Like 已提交
105
    @Test(expectedExceptions = IllegalStateException.class)
106 107 108 109
    public void testPutStateStateDisabled() {
        context.putState("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8)));
    }

L
Like 已提交
110
    @Test(expectedExceptions = IllegalStateException.class)
111 112 113 114 115 116
    public void testGetStateStateDisabled() {
        context.getState("test-key");
    }

    @Test
    public void testIncrCounterStateEnabled() throws Exception {
117
        context.stateContext = mock(StateContextImpl.class);
118
        context.incrCounterAsync("test-key", 10L);
119
        verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
120 121 122 123
    }

    @Test
    public void testGetCounterStateEnabled() throws Exception {
124
        context.stateContext = mock(StateContextImpl.class);
125
        context.getCounterAsync("test-key");
126
        verify(context.stateContext, times(1)).getCounter(eq("test-key"));
127 128 129 130
    }

    @Test
    public void testPutStateStateEnabled() throws Exception {
131
        context.stateContext = mock(StateContextImpl.class);
132
        ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
133
        context.putStateAsync("test-key", buffer);
134
        verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
135 136 137 138
    }

    @Test
    public void testGetStateStateEnabled() throws Exception {
139
        context.stateContext = mock(StateContextImpl.class);
140
        context.getStateAsync("test-key");
141
        verify(context.stateContext, times(1)).get(eq("test-key"));
142 143
    }

144 145
    @Test
    public void testPublishUsingDefaultSchema() throws Exception {
146
        context.newOutputMessage("sometopic", null).value("Somevalue").sendAsync();
147 148
    }
 }