提交 0e74fa1e 编写于 作者: L lta

Reorganized the start and close of sync thread

上级 abc189cd
......@@ -29,7 +29,7 @@ set IOTDB_CONF=%IOTDB_HOME%\conf
set IOTDB_LOGS=%IOTDB_HOME%\logs
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.sync.sender.SyncSenderImpl
if NOT DEFINED JAVA_HOME goto :err
@REM -----------------------------------------------------------------------------
......
......@@ -47,7 +47,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
MAIN_CLASS=org.apache.iotdb.db.sync.sender.FileSenderImpl
MAIN_CLASS=org.apache.iotdb.db.sync.sender.SyncSenderImpl
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -DTSFILE_HOME=${IOTDB_HOME} -DIOTDB_CONF=${IOTDB_CONF} -Dlogback.configurationFile=${IOTDB_CONF}/logback.xml $IOTDB_DERBY_OPTS $IOTDB_JMX_OPTS -Dname=SyncClient -cp "$CLASSPATH" "$MAIN_CLASS"
......
......@@ -33,8 +33,8 @@ import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.monitor.StatMonitor;
import org.apache.iotdb.db.sync.receiver.ServerManager;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.sync.receiver.SyncServerManager;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.manager.WriteLogNodeManager;
import org.slf4j.Logger;
......@@ -46,7 +46,6 @@ public class IoTDB implements IoTDBMBean {
private final String mbeanName = String.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE,
IoTDBConstant.JMX_TYPE, "IoTDB");
private RegisterManager registerManager = new RegisterManager();
private ServerManager serverManager = ServerManager.getInstance();
public static final IoTDB getInstance() {
return IoTDBHolder.INSTANCE;
......@@ -107,18 +106,17 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(StatMonitor.getInstance());
registerManager.register(BasicMemController.getInstance());
registerManager.register(FileReaderManager.getInstance());
registerManager.register(SyncServerManager.getInstance());
JMXService.registerMBean(getInstance(), mbeanName);
initErrorInformation();
serverManager.startServer();
LOGGER.info("IoTDB is set up.");
}
public void deactivate() {
LOGGER.info("Deactivating IoTDB...");
serverManager.closeServer();
registerManager.deregisterAll();
JMXService.deregisterMBean(mbeanName);
LOGGER.info("IoTDB is deactivated.");
......@@ -141,7 +139,7 @@ public class IoTDB implements IoTDBMBean {
* Recover data using system log.
*
* @throws RecoverException if FileNode(Manager)Exception is encountered during the recovery.
* @throws IOException if IOException is encountered during the recovery.
* @throws IOException if IOException is encountered during the recovery.
*/
private void systemDataRecovery() throws RecoverException {
LOGGER.info("{}: start checking write log...", IoTDBConstant.GLOBAL_DB_NAME);
......
......@@ -29,7 +29,9 @@ public enum ServiceType {
CLOSE_MERGE_SERVICE("Close&Merge ServerService", ""),
JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
AUTHORIZATION_SERVICE("Authorization ServerService", ""),
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", "");
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
SYNC_SERVICE("SYNC ServerService", "");
private String name;
private String jmxName;
......
......@@ -18,12 +18,16 @@
*/
package org.apache.iotdb.db.sync.receiver;
import java.net.InetSocketAddress;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.thrift.TProcessor;
import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
......@@ -36,66 +40,104 @@ import org.slf4j.LoggerFactory;
/**
* sync receiver server.
*/
public class ServerManager {
public class SyncServerManager implements IService {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerManager.class);
private TServerSocket serverTransport;
private TServer poolServer;
private static final Logger LOGGER = LoggerFactory.getLogger(SyncServerManager.class);
private Thread syncServerThread;
private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
private ServerManager() {
private SyncServerManager() {
}
public static final ServerManager getInstance() {
public static final SyncServerManager getInstance() {
return ServerManagerHolder.INSTANCE;
}
/**
* start sync receiver's server.
* Start sync receiver's server.
*/
public void startServer() throws StartupException {
Factory protocolFactory;
TProcessor processor;
TThreadPoolServer.Args poolArgs;
@Override
public void start() throws StartupException {
if (!conf.isSyncEnable()) {
return;
}
try {
if (conf.getIpWhiteList() == null) {
LOGGER.error(
"Sync server failed to start because IP white list is null, please set IP white list.");
return;
}
conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
serverTransport = new TServerSocket(conf.getSyncServerPort());
protocolFactory = new TBinaryProtocol.Factory();
processor = new SyncService.Processor<>(new ServerServiceImpl());
poolArgs = new TThreadPoolServer.Args(serverTransport);
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
LOGGER.info("Sync server has started.");
Runnable syncServerRunnable = () -> poolServer.serve();
Thread syncServerThread = new Thread(syncServerRunnable, ThreadName.SYNC_SERVER.getName());
syncServerThread.start();
} catch (TTransportException e) {
throw new StartupException("Cannot start sync server.", e);
if (conf.getIpWhiteList() == null) {
LOGGER.error(
"Sync server failed to start because IP white list is null, please set IP white list.");
return;
}
conf.setIpWhiteList(conf.getIpWhiteList().replaceAll(" ", ""));
syncServerThread = new SyncServiceThread();
syncServerThread.setName(ThreadName.SYNC_SERVER.getName());
syncServerThread.start();
LOGGER.info("Sync server has started.");
}
/**
* close sync receiver's server.
* Close sync receiver's server.
*/
public void closeServer() {
if (conf.isSyncEnable() && poolServer != null) {
poolServer.stop();
serverTransport.close();
LOGGER.info("Stop sync server.");
@Override
public void stop() {
if (conf.isSyncEnable()) {
((SyncServiceThread) syncServerThread).close();
}
}
@Override
public ServiceType getID() {
return ServiceType.SYNC_SERVICE;
}
private static class ServerManagerHolder {
private static final ServerManager INSTANCE = new ServerManager();
private static final SyncServerManager INSTANCE = new SyncServerManager();
}
private class SyncServiceThread extends Thread {
private TServerSocket serverTransport;
private TServer poolServer;
private Factory protocolFactory;
private Processor<SyncService.Iface> processor;
private TThreadPoolServer.Args poolArgs;
public SyncServiceThread() {
processor = new SyncService.Processor<>(new SyncServiceImpl());
}
@Override
public void run() {
try {
serverTransport = new TServerSocket(
new InetSocketAddress(conf.getRpcAddress(), conf.getSyncServerPort()));
protocolFactory = new TBinaryProtocol.Factory();
processor = new SyncService.Processor<>(new SyncServiceImpl());
poolArgs = new TThreadPoolServer.Args(serverTransport);
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
poolServer = new TThreadPoolServer(poolArgs);
poolServer.serve();
} catch (TTransportException e) {
LOGGER.error("{}: failed to start {}, because ", IoTDBConstant.GLOBAL_DB_NAME,
getID().getName(), e);
} catch (Exception e) {
LOGGER.error("{}: {} exit, because ", IoTDBConstant.GLOBAL_DB_NAME, getID().getName(), e);
} finally {
close();
LOGGER.info("{}: close TThreadPoolServer and TServerSocket for {}",
IoTDBConstant.GLOBAL_DB_NAME, getID().getName());
}
}
private synchronized void close() {
if (poolServer != null) {
poolServer.stop();
poolServer = null;
}
if (serverTransport != null) {
serverTransport.close();
serverTransport = null;
}
}
}
}
\ No newline at end of file
......@@ -77,9 +77,9 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerServiceImpl implements SyncService.Iface {
public class SyncServiceImpl implements SyncService.Iface {
private static final Logger logger = LoggerFactory.getLogger(ServerServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(SyncServiceImpl.class);
private static final FileNodeManager fileNodeManager = FileNodeManager.getInstance();
/**
......
......@@ -38,11 +38,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FileManager is used to pick up those tsfiles need to sync.
* SyncFileManager is used to pick up those tsfiles need to sync.
*/
public class FileManager {
public class SyncFileManager {
private static final Logger LOGGER = LoggerFactory.getLogger(FileManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManager.class);
/**
* Files that need to be synchronized
......@@ -65,15 +65,15 @@ public class FileManager {
private static final String RESTORE_SUFFIX = ".restore";
private FileManager() {
private SyncFileManager() {
}
public static final FileManager getInstance() {
public static final SyncFileManager getInstance() {
return FileManagerHolder.INSTANCE;
}
/**
* Initialize FileManager.
* Initialize SyncFileManager.
*/
public void init() throws IOException {
validAllFiles.clear();
......@@ -199,6 +199,6 @@ public class FileManager {
private static class FileManagerHolder {
private static final FileManager INSTANCE = new FileManager();
private static final SyncFileManager INSTANCE = new SyncFileManager();
}
}
\ No newline at end of file
......@@ -23,9 +23,9 @@ import java.util.Set;
import org.apache.iotdb.db.exception.SyncConnectionException;
/**
* FileSender defines the methods of a sender in sync module.
* SyncSender defines the methods of a sender in sync module.
*/
public interface FileSender {
public interface SyncSender {
/**
* Connect to server.
......
......@@ -33,7 +33,6 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
......@@ -62,11 +61,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FileSenderImpl is used to transfer tsfiles that needs to sync to receiver.
* SyncSenderImpl is used to transfer tsfiles that needs to sync to receiver.
*/
public class FileSenderImpl implements FileSender {
public class SyncSenderImpl implements SyncSender {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSenderImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderImpl.class);
private TTransport transport;
private SyncService.Client serviceClient;
private List<String> schema = new ArrayList<>();
......@@ -96,7 +95,7 @@ public class FileSenderImpl implements FileSender {
**/
private Map<String, Set<String>> validFileSnapshot = new HashMap<>();
private FileManager fileManager = FileManager.getInstance();
private SyncFileManager syncFileManager = SyncFileManager.getInstance();
private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
/**
......@@ -119,10 +118,10 @@ public class FileSenderImpl implements FileSender {
}
};
private FileSenderImpl() {
private SyncSenderImpl() {
}
public static final FileSenderImpl getInstance() {
public static final SyncSenderImpl getInstance() {
return InstanceHolder.INSTANCE;
}
......@@ -134,7 +133,7 @@ public class FileSenderImpl implements FileSender {
public static void main(String[] args)
throws InterruptedException, IOException, SyncConnectionException {
Thread.currentThread().setName(ThreadName.SYNC_CLIENT.getName());
FileSenderImpl fileSenderImpl = new FileSenderImpl();
SyncSenderImpl fileSenderImpl = new SyncSenderImpl();
fileSenderImpl.verifySingleton();
fileSenderImpl.startMonitor();
fileSenderImpl.timedTask();
......@@ -185,9 +184,9 @@ public class FileSenderImpl implements FileSender {
}
// 2. Acquire valid files and check
fileManager.init();
validAllFiles = fileManager.getValidAllFiles();
currentLocalFiles = fileManager.getCurrentLocalFiles();
syncFileManager.init();
validAllFiles = syncFileManager.getValidAllFiles();
currentLocalFiles = syncFileManager.getCurrentLocalFiles();
if (SyncUtils.isEmpty(validAllFiles)) {
LOGGER.info("There has no file to sync !");
return;
......@@ -255,8 +254,8 @@ public class FileSenderImpl implements FileSender {
syncData(validSnapshot);
if (afterSynchronization()) {
currentLocalFiles.get(entry.getKey()).addAll(validFiles);
fileManager.setCurrentLocalFiles(currentLocalFiles);
fileManager.backupNowLocalFileInfo(config.getLastFileInfo());
syncFileManager.setCurrentLocalFiles(currentLocalFiles);
syncFileManager.backupNowLocalFileInfo(config.getLastFileInfo());
LOGGER.info("Sync process has finished storage group {}.", entry.getKey());
} else {
LOGGER.error("Receiver cannot sync data, abandon this synchronization of storage group {}", entry.getKey());
......@@ -531,7 +530,7 @@ public class FileSenderImpl implements FileSender {
private static class InstanceHolder {
private static final FileSenderImpl INSTANCE = new FileSenderImpl();
private static final SyncSenderImpl INSTANCE = new SyncSenderImpl();
}
public void setConfig(SyncSenderConfig config) {
......
......@@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
*/
public class SingleClientSyncTest {
FileSenderImpl fileSenderImpl = FileSenderImpl.getInstance();
SyncSenderImpl fileSenderImpl = SyncSenderImpl.getInstance();
private IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
private String serverIpTest = "192.168.130.7";
private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
......
......@@ -33,14 +33,14 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileManagerTest {
public class SyncFileManagerTest {
private static final String POST_BACK_DIRECTORY_TEST = Constans.SYNC_CLIENT + File.separator;
private static final String LAST_FILE_INFO_TEST =
POST_BACK_DIRECTORY_TEST + Constans.LAST_LOCAL_FILE_NAME;
private static final String SENDER_FILE_PATH_TEST = POST_BACK_DIRECTORY_TEST + "data";
private FileManager manager = FileManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(FileManagerTest.class);
private SyncFileManager manager = SyncFileManager.getInstance();
private static final Logger LOGGER = LoggerFactory.getLogger(SyncFileManagerTest.class);
@Before
public void setUp() throws IOException, InterruptedException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册