提交 166cf444 编写于 作者: B Boyang Jerry Peng 提交者: Jia Zhai

Need to reinitialize certain components for externally managed runtimes when...

Need to reinitialize certain components for externally managed runtimes when moving functions (#5007)

(cherry picked from commit 27a9f628)
上级 057e7d28
...@@ -247,9 +247,19 @@ public class KubernetesRuntime implements Runtime { ...@@ -247,9 +247,19 @@ public class KubernetesRuntime implements Runtime {
throw e; throw e;
} }
if (channel == null && stub == null) { setupGrpcChannelIfNeeded();
}
@Override
public void reinitialize() {
setupGrpcChannelIfNeeded();
}
private synchronized void setupGrpcChannelIfNeeded() {
if (channel == null || stub == null) {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()]; channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()]; stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
String jobName = createJobName(instanceConfig.getFunctionDetails()); String jobName = createJobName(instanceConfig.getFunctionDetails());
for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) { for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
String address = getServiceUrl(jobName, jobNamespace, i); String address = getServiceUrl(jobName, jobNamespace, i);
...@@ -332,16 +342,16 @@ public class KubernetesRuntime implements Runtime { ...@@ -332,16 +342,16 @@ public class KubernetesRuntime implements Runtime {
@Override @Override
public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) { public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId) {
CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>(); CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
if (instanceId < 0 || instanceId >= stub.length) {
if (stub == null) { if (stub == null) {
retval.completeExceptionally(new RuntimeException("Invalid InstanceId")); retval.completeExceptionally(new RuntimeException("Not alive"));
return retval; return retval;
} }
}
if (stub == null) { if (instanceId < 0 || instanceId >= stub.length) {
retval.completeExceptionally(new RuntimeException("Not alive")); retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
return retval; return retval;
} }
ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build()); ListenableFuture<InstanceCommunication.MetricsData> response = stub[instanceId].withDeadlineAfter(GRPC_TIMEOUT_SECS, TimeUnit.SECONDS).getMetrics(Empty.newBuilder().build());
Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() { Futures.addCallback(response, new FutureCallback<InstanceCommunication.MetricsData>() {
@Override @Override
......
...@@ -31,6 +31,10 @@ public interface Runtime { ...@@ -31,6 +31,10 @@ public interface Runtime {
void start() throws Exception; void start() throws Exception;
default void reinitialize() {
}
void join() throws Exception; void join() throws Exception;
void stop() throws Exception; void stop() throws Exception;
......
...@@ -215,9 +215,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ ...@@ -215,9 +215,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
} }
} }
} }
// start assignment tailer
this.functionAssignmentTailer.start();
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to initialize function runtime manager: ", e.getMessage(), e); log.error("Failed to initialize function runtime manager: ", e.getMessage(), e);
throw new RuntimeException(e); throw new RuntimeException(e);
...@@ -229,7 +226,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ ...@@ -229,7 +226,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
*/ */
public void start() { public void start() {
log.info("/** Starting Function Runtime Manager **/"); log.info("/** Starting Function Runtime Manager **/");
log.info("Initialize metrics sink...");
log.info("Starting function assignment tailer..."); log.info("Starting function assignment tailer...");
this.functionAssignmentTailer.start(); this.functionAssignmentTailer.start();
} }
...@@ -629,7 +625,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ ...@@ -629,7 +625,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
// changes to the function meta data of the instance // changes to the function meta data of the instance
if (runtimeFactory.externallyManaged()) { if (runtimeFactory.externallyManaged()) {
// change in metadata thus need to potentially restart // change in metadata thus need to potentially restart
if (!assignment.getInstance().equals(existingAssignment.getInstance())) { if (!assignment.getInstance().equals(existingAssignment.getInstance())) {
//stop function //stop function
...@@ -657,6 +652,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ ...@@ -657,6 +652,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner( RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner(
assignment.getInstance(), assignment.getInstance(),
assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath()); assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
// re-initialize if necessary
runtimeSpawner.getRuntime().reinitialize();
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner); newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo); this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
......
...@@ -648,7 +648,8 @@ public class FunctionRuntimeManagerTest { ...@@ -648,7 +648,8 @@ public class FunctionRuntimeManagerTest {
doNothing().when(kubernetesRuntimeFactory).setupClient(); doNothing().when(kubernetesRuntimeFactory).setupClient();
doReturn(true).when(kubernetesRuntimeFactory).externallyManaged(); doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any()); KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
FunctionActioner functionActioner = spy(new FunctionActioner( FunctionActioner functionActioner = spy(new FunctionActioner(
workerConfig, workerConfig,
...@@ -743,7 +744,10 @@ public class FunctionRuntimeManagerTest { ...@@ -743,7 +744,10 @@ public class FunctionRuntimeManagerTest {
instance.getInstanceId()); instance.getInstanceId());
Assert.assertTrue( Assert.assertTrue(
functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory); functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntimeFactory() instanceof KubernetesRuntimeFactory);
Assert.assertTrue( Assert.assertTrue(
functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime() != null); functionRuntimeManager.functionRuntimeInfoMap.get("test-tenant/test-namespace/func-1:0").getRuntimeSpawner().getRuntime() != null);
verify(kubernetesRuntime, times(1)).reinitialize();
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册