未验证 提交 06c357da 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Make sure the cluster register happens before streaming process. (#3471)

上级 c5b03ba7
......@@ -185,6 +185,12 @@ public class CoreModuleProvider extends ModuleProvider {
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
}
@Override public void notifyAfterCompleted() throws ModuleStartException {
......@@ -195,11 +201,6 @@ public class CoreModuleProvider extends ModuleProvider {
throw new ModuleStartException(e.getMessage(), e);
}
if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
}
PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
if (moduleConfig.isEnableDataKeeperExecutor()) {
......
......@@ -18,16 +18,25 @@
package org.apache.skywalking.oap.server.core.remote;
import java.util.List;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClient;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.selector.*;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.core.remote.selector.ForeverFirstSelector;
import org.apache.skywalking.oap.server.core.remote.selector.HashCodeSelector;
import org.apache.skywalking.oap.server.core.remote.selector.RollingSelector;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class RemoteSenderService implements Service {
private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);
private final ModuleManager moduleManager;
private final HashCodeSelector hashCodeSelector;
......@@ -44,18 +53,24 @@ public class RemoteSenderService implements Service {
public void send(String nextWorkName, StreamData streamData, Selector selector) {
RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
List<RemoteClient> clientList = clientManager.getRemoteClient();
if (clientList.size() == 0) {
logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
return;
}
RemoteClient remoteClient;
switch (selector) {
case HashCode:
remoteClient = hashCodeSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient = hashCodeSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case Rolling:
remoteClient = rollingSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient = rollingSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
case ForeverFirst:
remoteClient = foreverFirstSelector.select(clientManager.getRemoteClient(), streamData);
remoteClient = foreverFirstSelector.select(clientList, streamData);
remoteClient.push(nextWorkName, streamData);
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册