未验证 提交 186ba3ba 编写于 作者: D dailidong 提交者: GitHub

fix oom when no master is active in dev-1.3.0 (#2918)

* fix worker group config no effect

* remove codehaus janino jar
the license about janino maybe not compatiable with Apache v2

* Merge remote-tracking branch 'upstream/dev-1.3.0' into dev-1.3.0

# Conflicts:
#	dolphinscheduler-server/src/main/resources/config/install_config.conf

* datasource config

* Update datasource.properties

* fix RunConfig bug

* remove param monitor server state

* fix table T_DS_ALERT

* update h2 database

* fix #2910 master server will show exception for some time when it restart

* fix oom when no master is active

* fix worker oom when master server restart

* fix oom

* fix

* add UT

* fix worker group config no effect
上级 422fe513
...@@ -349,7 +349,7 @@ public class NettyRemotingClient { ...@@ -349,7 +349,7 @@ public class NettyRemotingClient {
return channel; return channel;
} }
} catch (Exception ex) { } catch (Exception ex) {
logger.info("connect to {} error {}", host, ex); logger.warn(String.format("connect to %s error", host), ex);
} }
return null; return null;
} }
......
...@@ -87,7 +87,7 @@ public class LowerWeightHostManager extends CommonHostManager { ...@@ -87,7 +87,7 @@ public class LowerWeightHostManager extends CommonHostManager {
this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 40, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS);
this.roundRobinHostManager = new RoundRobinHostManager(); this.roundRobinHostManager = new RoundRobinHostManager();
this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager()); this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
} }
......
...@@ -46,6 +46,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; ...@@ -46,6 +46,7 @@ import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
public class TaskCallbackService { public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class); private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
private static final int [] RETRY_BACKOFF = { 1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200 };
/** /**
* remote channels * remote channels
...@@ -58,6 +59,7 @@ public class TaskCallbackService { ...@@ -58,6 +59,7 @@ public class TaskCallbackService {
@Autowired @Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter; private ZookeeperRegistryCenter zookeeperRegistryCenter;
/** /**
* netty remoting client * netty remoting client
*/ */
...@@ -99,14 +101,19 @@ public class TaskCallbackService { ...@@ -99,14 +101,19 @@ public class TaskCallbackService {
nettyRemoteChannel.getHost(), nettyRemoteChannel.getHost(),
taskInstanceId); taskInstanceId);
Set<String> masterNodes = null; Set<String> masterNodes = null;
int ntries = 0;
while (Stopper.isRunning()) { while (Stopper.isRunning()) {
masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly(); masterNodes = zookeeperRegistryCenter.getMasterNodesDirectly();
if (CollectionUtils.isEmpty(masterNodes)) { if (CollectionUtils.isEmpty(masterNodes)) {
logger.info("try {} times but not find any master for task : {}.",
ntries + 1,
taskInstanceId);
masterNodes = null; masterNodes = null;
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(pause(ntries++));
continue; continue;
} }
logger.info("find {} masters for task : {}.", logger.info("try {} times to find {} masters for task : {}.",
ntries + 1,
masterNodes.size(), masterNodes.size(),
taskInstanceId); taskInstanceId);
for (String masterNode : masterNodes) { for (String masterNode : masterNodes) {
...@@ -116,12 +123,18 @@ public class TaskCallbackService { ...@@ -116,12 +123,18 @@ public class TaskCallbackService {
} }
} }
masterNodes = null; masterNodes = null;
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(pause(ntries++));
} }
throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId)); throw new IllegalStateException(String.format("all available master nodes : %s are not reachable for task: {}", masterNodes, taskInstanceId));
} }
public int pause(int ntries){
return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
}
private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){ private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque); NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
addRemoteChannel(taskInstanceId, remoteChannel); addRemoteChannel(taskInstanceId, remoteChannel);
......
...@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer; ...@@ -39,6 +39,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mockito; import org.mockito.Mockito;
...@@ -46,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -46,6 +47,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.Date; import java.util.Date;
/** /**
...@@ -91,12 +93,8 @@ public class TaskCallbackServiceTest { ...@@ -91,12 +93,8 @@ public class TaskCallbackServiceTest {
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
Thread.sleep(5000);
Stopper.stop(); Stopper.stop();
Thread.sleep(5000);
nettyRemotingServer.close(); nettyRemotingServer.close();
nettyRemotingClient.close(); nettyRemotingClient.close();
} }
...@@ -140,8 +138,13 @@ public class TaskCallbackServiceTest { ...@@ -140,8 +138,13 @@ public class TaskCallbackServiceTest {
Stopper.stop(); Stopper.stop();
} }
@Test(expected = IllegalStateException.class) @Test
public void testSendAckWithIllegalStateException1(){ public void testPause(){
Assert.assertEquals(5000, taskCallbackService.pause(3));;
}
@Test
public void testSendAck1(){
masterRegistry.registry(); masterRegistry.registry();
final NettyServerConfig serverConfig = new NettyServerConfig(); final NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(30000); serverConfig.setListenPort(30000);
...@@ -153,27 +156,20 @@ public class TaskCallbackServiceTest { ...@@ -153,27 +156,20 @@ public class TaskCallbackServiceTest {
NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
channel.close(); // channel.close();
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(1); ackCommand.setTaskInstanceId(1);
ackCommand.setStartTime(new Date()); ackCommand.setStartTime(new Date());
nettyRemotingServer.close();
taskCallbackService.sendAck(1, ackCommand.convert2Command()); taskCallbackService.sendAck(1, ackCommand.convert2Command());
try {
Thread.sleep(5000); Assert.assertEquals(true, channel.isOpen());
} catch (InterruptedException e) {
e.printStackTrace();
}
Stopper.stop(); Stopper.stop();
try { nettyRemotingServer.close();
Thread.sleep(5000); nettyRemotingClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
// @Test(expected = IllegalStateException.class) // @Test(expected = IllegalStateException.class)
......
...@@ -95,8 +95,9 @@ public class ZKServer { ...@@ -95,8 +95,9 @@ public class ZKServer {
* @param port The port to listen on * @param port The port to listen on
*/ */
public static void startLocalZkServer(final int port) { public static void startLocalZkServer(final int port) {
String zkDataDir = System.getProperty("user.dir") +"/zookeeper_data";
startLocalZkServer(port, System.getProperty("user.dir") +"/zookeeper_data", ZooKeeperServer.DEFAULT_TICK_TIME,"20"); logger.info("zk server starting, data dir path:{}" , zkDataDir);
startLocalZkServer(port, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME,"60");
} }
/** /**
......
...@@ -16,22 +16,16 @@ ...@@ -16,22 +16,16 @@
*/ */
package org.apache.dolphinscheduler.service.zk; package org.apache.dolphinscheduler.service.zk;
import org.junit.Ignore; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; // ZKServer is a process, can't unit test
@Ignore
public class ZKServerTest { public class ZKServerTest {
@Test
public void start() {
//ZKServer is a process, can't unit test
}
@Test @Test
public void isStarted() { public void isStarted() {
Assert.assertEquals(false, ZKServer.isStarted());
} }
@Test @Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册