提交 5d52ccbb 编写于 作者: B Boyang Jerry Peng 提交者: Matteo Merli

fix function termination cleanup (#3751)

* fix function termination cleanup

* cleaning up
上级 0dba8035
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.utils;
import lombok.Data;
@Data
public class FunctionInstanceId {
private String tenant;
private String namespace;
private String name;
private int instanceId;
public FunctionInstanceId(String fullyQualifiedInstanceName) {
String[] t1 = fullyQualifiedInstanceName.split("/");
if (t1.length != 3) {
throw new IllegalArgumentException("Invalid format for fully qualified instance name: " + fullyQualifiedInstanceName);
}
this.tenant = t1[0];
this.namespace = t1[1];
String[] t2 = t1[2].split(":");
if (t2.length != 2) {
throw new IllegalArgumentException("Invalid format for fully qualified instance name: " + fullyQualifiedInstanceName);
}
this.name = t2[0];
this.instanceId = Integer.parseInt(t2[1]);
}
}
......@@ -32,7 +32,6 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.Function;
......
......@@ -42,6 +42,7 @@ import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
import org.apache.pulsar.functions.utils.Reflections;
import javax.ws.rs.WebApplicationException;
......@@ -291,7 +292,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
return assignments;
}
/**
* Removes a collection of assignments
* @param assignments assignments to remove
......@@ -693,7 +693,22 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.conditionallyStopFunction(functionRuntimeInfo);
} else {
// function doesn't exist anymore thus we should terminate
this.conditionallyTerminateFunction(functionRuntimeInfo);
FunctionInstanceId functionInstanceId
= new FunctionInstanceId(fullyQualifiedInstanceId);
String name = functionInstanceId.getName();
String namespace = functionInstanceId.getNamespace();
String tenant = functionInstanceId.getTenant();
// only run the termination logic if
// this is the last/only instance from a function left on the worker
Collection<Assignment> assignments = findFunctionAssignments(tenant, namespace, name, this
.workerIdToAssignments);
if (assignments.size() > 1) {
this.conditionallyStopFunction(functionRuntimeInfo);
} else {
this.conditionallyTerminateFunction(functionRuntimeInfo);
}
}
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册