提交 0dba8035 编写于 作者: S Sanjeev Kulkarni 提交者: Matteo Merli

Simplified the workflow of functionruntime manager (#3551)

* Simplified the workflow of functionruntime manager

* Fix unittest

* Took feedback into account

* added missing imports
上级 8aa63aa7
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import lombok.*;
import lombok.experimental.Accessors;
@Data
@Setter
@Getter
@EqualsAndHashCode
@ToString
@Accessors(chain = true)
public class FunctionAction {
public enum Action {
START,
STOP,
TERMINATE
}
private Action action;
private FunctionRuntimeInfo functionRuntimeInfo;
}
......@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.functions.worker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.MoreFiles;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
......@@ -79,120 +79,74 @@ import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@EqualsAndHashCode
@ToString
@Slf4j
public class FunctionActioner implements AutoCloseable {
public class FunctionActioner {
private final WorkerConfig workerConfig;
private final RuntimeFactory runtimeFactory;
private final Namespace dlogNamespace;
private LinkedBlockingQueue<FunctionAction> actionQueue;
private volatile boolean running;
private Thread actioner;
private final ConnectorsManager connectorsManager;
private final PulsarAdmin pulsarAdmin;
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
LinkedBlockingQueue<FunctionAction> actionQueue,
ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.actionQueue = actionQueue;
this.connectorsManager = connectorsManager;
this.pulsarAdmin = pulsarAdmin;
actioner = new Thread(() -> {
log.info("Starting Actioner Thread...");
while(running) {
try {
FunctionAction action = actionQueue.poll(1, TimeUnit.SECONDS);
processAction(action);
} catch (InterruptedException ex) {
}
}
});
actioner.setName("FunctionActionerThread");
}
public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
try {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
void processAction(FunctionAction action) {
if (action == null) return;
switch (action.getAction()) {
case START:
try {
startFunction(action.getFunctionRuntimeInfo());
} catch (Exception ex) {
FunctionDetails details = action.getFunctionRuntimeInfo().getFunctionInstance()
.getFunctionMetaData().getFunctionDetails();
log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
details.getName(), ex);
action.getFunctionRuntimeInfo().setStartupException(ex);
}
break;
case STOP:
stopFunction(action.getFunctionRuntimeInfo());
break;
case TERMINATE:
terminateFunction(action.getFunctionRuntimeInfo());
break;
}
}
public void start() {
this.running = true;
actioner.start();
}
@Override
public void close() {
running = false;
}
public void join() throws InterruptedException {
actioner.join();
}
@VisibleForTesting
public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), instanceId);
log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), instanceId);
String packageFile;
String packageFile;
String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
if (runtimeFactory.externallyManaged()) {
packageFile = pkgLocation;
} else {
if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
URL url = new URL(pkgLocation);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
} else if (isFunctionCodeBuiltin(functionDetails)) {
File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
if (runtimeFactory.externallyManaged()) {
packageFile = pkgLocation;
} else {
File pkgDir = new File(workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
packageFile = pkgFile.getAbsolutePath();
if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
URL url = new URL(pkgLocation);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
} else if (isFunctionCodeBuiltin(functionDetails)) {
File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
} else {
File pkgDir = new File(workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
packageFile = pkgFile.getAbsolutePath();
}
}
}
RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
return;
} catch (Exception ex) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails();
log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
details.getName(), ex);
functionRuntimeInfo.setStartupException(ex);
return;
}
}
RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
......@@ -306,12 +260,13 @@ public class FunctionActioner implements AutoCloseable {
}
}
private void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
.getFunctionDetails();
log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(),
functionRuntimeInfo.getFunctionInstance().getInstanceId());
String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
stopFunction(functionRuntimeInfo);
//cleanup subscriptions
if (details.getSource().getCleanupSubscription()) {
......@@ -483,4 +438,4 @@ public class FunctionActioner implements AutoCloseable {
}
}
}
\ No newline at end of file
}
......@@ -57,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
/**
......@@ -73,8 +72,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
// All the runtime info related to functions executed by this worker
// Fully Qualified InstanceId - > FunctionRuntimeInfo
// NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo methods to modify this data structure
// Since during initialization phase nothing should be modified
@VisibleForTesting
Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();
......@@ -82,12 +79,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Getter
final WorkerConfig workerConfig;
@VisibleForTesting
LinkedBlockingQueue<FunctionAction> actionQueue;
private FunctionAssignmentTailer functionAssignmentTailer;
@Setter
@Getter
private FunctionActioner functionActioner;
@Getter
......@@ -174,10 +169,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
}
this.actionQueue = new LinkedBlockingQueue<>();
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, actionQueue, connectorsManager, workerService.getBrokerAdmin());
dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
this.membershipManager = membershipManager;
this.functionMetaDataManager = functionMetaDataManager;
......@@ -226,8 +219,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
public void start() {
log.info("/** Starting Function Runtime Manager **/");
log.info("Initialize metrics sink...");
log.info("Starting function actioner...");
this.functionActioner.start();
log.info("Starting function assignment tailer...");
this.functionAssignmentTailer.start();
}
......@@ -447,10 +438,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
log.info("[{}] {}..", restart ? "restarting" : "stopping", fullyQualifiedInstanceId);
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId);
if (functionRuntimeInfo != null) {
this.functionActioner.stopFunction(functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
try {
if(restart) {
this.functionActioner.startFunction(functionRuntimeInfo);
this.conditionallyStartFunction(functionRuntimeInfo);
}
} catch (Exception ex) {
log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex);
......@@ -630,7 +621,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
if (!assignment.getInstance().equals(existingAssignment.getInstance())) {
//stop function
if (functionRuntimeInfo != null) {
this.insertStopAction(functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
}
// still assigned to me, need to restart
if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
......@@ -639,11 +630,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.insertStartAction(newFunctionRuntimeInfo);
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
} else {
// if assignment got transferred to me just set function runtime
......@@ -655,15 +646,15 @@ public class FunctionRuntimeManager implements AutoCloseable{
assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
} else {
deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}
} else {
//stop function
if (functionRuntimeInfo != null) {
this.insertStopAction(functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
}
// still assigned to me, need to restart
if (assignment.getWorkerId().equals(this.workerConfig.getWorkerId())) {
......@@ -671,11 +662,11 @@ public class FunctionRuntimeManager implements AutoCloseable{
//start again
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.insertStartAction(newFunctionRuntimeInfo);
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}
......@@ -699,12 +690,12 @@ public class FunctionRuntimeManager implements AutoCloseable{
// TODO could be a race condition here if functionMetaDataTailer somehow does not receive the functionMeta prior to the functionAssignmentsTailer gets the assignment for the function.
if (this.functionMetaDataManager.containsFunction(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName())) {
// function still exists thus probably an update or stop operation
this.insertStopAction(functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
} else {
// function doesn't exist anymore thus we should terminate
this.insertTerminateAction(functionRuntimeInfo);
this.conditionallyTerminateFunction(functionRuntimeInfo);
}
this.deleteFunctionRuntimeInfo(fullyQualifiedInstanceId);
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
String workerId = null;
......@@ -751,16 +742,14 @@ public class FunctionRuntimeManager implements AutoCloseable{
if (functionRuntimeInfo == null) {
functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(assignment.getInstance());
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, functionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
} else {
//Somehow this function is already started
log.warn("Function {} already running. Going to restart function.",
functionRuntimeInfo);
this.insertStopAction(functionRuntimeInfo);
this.conditionallyStopFunction(functionRuntimeInfo);
}
this.insertStartAction(functionRuntimeInfo);
this.conditionallyStartFunction(functionRuntimeInfo);
}
public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
......@@ -770,48 +759,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
/**
* Private methods for internal use. Should not be used outside of this class
*/
@VisibleForTesting
void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.STOP);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}
}
@VisibleForTesting
void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.START);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}
}
void insertTerminateAction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.TERMINATE);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}
}
private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
String fullyQualifiedInstanceId
= org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
......@@ -844,22 +791,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
assignment);
}
private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
if (!this.isInitializePhase) {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}
private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
// Don't modify Function Runtime Infos when initializing
if (!this.isInitializePhase) {
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
}
}
@Override
public void close() throws Exception {
this.functionActioner.close();
this.functionAssignmentTailer.close();
stopAllOwnedFunctions();
......@@ -908,4 +841,22 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
return toStart;
}
private void conditionallyStartFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
this.functionActioner.startFunction(functionRuntimeInfo);
}
}
private void conditionallyStopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
this.functionActioner.stopFunction(functionRuntimeInfo);
}
}
private void conditionallyTerminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
if (!this.isInitializePhase) {
this.functionActioner.terminateFunction(functionRuntimeInfo);
}
}
}
......@@ -25,11 +25,6 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.net.UnknownHostException;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
......@@ -40,6 +35,8 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.testng.annotations.Test;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.testng.Assert.*;
import static org.testng.AssertJUnit.assertFalse;
/**
* Unit test of {@link FunctionActioner}.
......@@ -66,10 +63,9 @@ public class FunctionActionerTest {
// throw exception when dlogNamespace is accessed by actioner and verify it
final String exceptionMsg = "dl namespace not-found";
doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>();
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue,
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
Runtime runtime = mock(Runtime.class);
Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
......@@ -80,13 +76,14 @@ public class FunctionActionerTest {
.build();
FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
// actioner should try to download file from bk-dlogNamespace and fails with exception
try {
actioner.startFunction(functionRuntimeInfo);
fail("should have failed with dlogNamespace open");
} catch (IllegalArgumentException ie) {
assertEquals(ie.getMessage(), exceptionMsg);
assertFalse(true);
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "StartupException");
}
}
......@@ -109,10 +106,9 @@ public class FunctionActionerTest {
Namespace dlogNamespace = mock(Namespace.class);
final String exceptionMsg = "dl namespace not-found";
doThrow(new IllegalArgumentException(exceptionMsg)).when(dlogNamespace).openLog(any());
LinkedBlockingQueue<FunctionAction> queue = new LinkedBlockingQueue<>();
@SuppressWarnings("resource")
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace, queue,
FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
new ConnectorsManager(workerConfig), mock(PulsarAdmin.class));
// (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
......@@ -141,12 +137,13 @@ public class FunctionActionerTest {
instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build();
functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
try {
actioner.startFunction(functionRuntimeInfo);
fail("Function-Actioner should have tried to donwload file from http-location");
} catch (UnknownHostException ue) {
// ok
assertFalse(true);
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "StartupException");
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册