FunctionCacheEntry.java 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

20
package org.apache.pulsar.functions.utils.functioncache;
21

22
import java.io.File;
23 24 25 26 27
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.Collection;
28 29
import java.util.Collections;
import java.util.HashSet;
30 31
import java.util.Set;
import java.util.stream.Collectors;
32

33 34 35
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

36 37 38
import static org.apache.commons.lang3.StringUtils.isNoneBlank;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;

39 40
import org.apache.pulsar.common.nar.NarClassLoader;

41 42 43 44 45 46
/**
 * A cache entry in the function cache. Tracks which workers still reference
 * the dependencies. Once none reference it any more, the class loaders will
 * be cleaned up.
 */
@Slf4j
47 48 49
public class FunctionCacheEntry implements AutoCloseable {

    public static final String JAVA_INSTANCE_JAR_PROPERTY = "pulsar.functions.java.instance.jar";
50 51 52 53

    @Getter
    private final URLClassLoader classLoader;

54
    private final Set<String> executionHolders;
55 56 57 58 59 60 61 62 63

    @Getter
    private final Set<String> jarFiles;

    private final Set<String> classpaths;

    FunctionCacheEntry(Collection<String> requiredJarFiles,
                       Collection<URL> requiredClasspaths,
                       URL[] libraryURLs,
64 65 66
                       String initialInstanceId, ClassLoader rootClassLoader) {
        this.classLoader = FunctionClassLoaders.create(libraryURLs, rootClassLoader);

67 68 69
        this.classpaths = requiredClasspaths.stream()
            .map(URL::toString)
            .collect(Collectors.toSet());
70 71
        this.jarFiles = new HashSet<>(requiredJarFiles);
        this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
72 73
    }

74 75
    FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
        this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
76 77 78 79 80
        this.classpaths = Collections.emptySet();
        this.jarFiles = Collections.singleton(narArchive);
        this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
    }

81
    boolean isInstanceRegistered(String iid) {
82 83 84
        return executionHolders.contains(iid);
    }

85
    public void register(String eid,
86 87 88
                         Collection<String> requiredJarFiles,
                         Collection<URL> requiredClassPaths) {
        if (jarFiles.size() != requiredJarFiles.size()
89
            || !new HashSet<>(requiredJarFiles).containsAll(jarFiles)) {
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
            throw new IllegalStateException(
                "The function registration references a different set of jar files than "
                + " previous registrations for this function : old = " + jarFiles
                + ", new = " + requiredJarFiles);
        }

        if (classpaths.size() != requiredClassPaths.size()
            || !requiredClassPaths.stream().map(URL::toString).collect(Collectors.toSet())
                .containsAll(classpaths)) {
            throw new IllegalStateException(
                "The function registration references a different set of classpaths than "
                + " previous registrations for this function : old = " + classpaths
                + ", new = " + requiredClassPaths);
        }

        this.executionHolders.add(eid);
    }

108
    public boolean unregister(String eid) {
109 110 111 112 113 114 115 116 117 118 119 120 121 122
        this.executionHolders.remove(eid);
        return this.executionHolders.isEmpty();
    }

    @Override
    public void close() {
        try {
            classLoader.close();
        } catch (IOException e) {
            log.warn("Failed to release function code class loader for "
                + Arrays.toString(jarFiles.toArray()));
        }
    }
}