提交 15c0c607 编写于 作者: 公众号-WU双's avatar 公众号-WU双 提交者: avalon566

#3704 Config optimization for ShardingScaling (#3706)

* Config optimization

* For checkstyle

* use basic type
上级 576985bf
......@@ -20,10 +20,8 @@ cd `dirname $0`
cd ..
DEPLOY_DIR=`pwd`
JAVA_OPTS=" -Dport=8080 "
CLASS_PATH=.:${DEPLOY_DIR}/conf:${DEPLOY_DIR}/lib/*
MAIN_CLASS=org.apache.shardingsphere.shardingscaling.Bootstrap
java ${JAVA_OPTS} -classpath ${CLASS_PATH} ${MAIN_CLASS}
java -classpath ${CLASS_PATH} ${MAIN_CLASS}
......@@ -32,9 +32,7 @@
"username": "root"
}
},
"serverConfiguration": {
"blockQueueSize": 10000,
"pushTimeout": 1000,
"jobConfiguration": {
"concurrency": 3
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# port: 8888
# blockQueueSize: 10000
# pushTimeout: 1000
# workerThread: 30
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.shardingscaling;
import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
......@@ -28,10 +29,14 @@ import io.netty.handler.logging.LoggingHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.PropertyConfigurator;
import org.apache.shardingsphere.core.yaml.engine.YamlEngine;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.shardingscaling.core.web.HttpServerInitializer;
import org.apache.shardingsphere.shardingscaling.utils.RuntimeUtil;
import java.io.File;
import java.io.IOException;
/**
* Bootstrap of ShardingScaling.
......@@ -41,7 +46,7 @@ import java.io.File;
@Slf4j
public class Bootstrap {
private static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
private static final String SERVER_CONFIG_FILE = "/conf/server.yaml";
static {
PropertyConfigurator.configure(RuntimeUtil.getBasePath() + "conf" + File.separator + "log4j.properties");
......@@ -54,6 +59,8 @@ public class Bootstrap {
*/
@SneakyThrows
public static void main(final String[] args) {
log.info("Init server config");
initServerConfig();
log.info("ShardingScaling Startup");
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
......@@ -65,12 +72,20 @@ public class Bootstrap {
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpServerInitializer());
Channel channel = bootstrap.bind(PORT).sync().channel();
log.info("Shardingscaling is server on http://127.0.0.1:" + PORT + '/');
int port = ScalingContext.getInstance().getServerConfiguration().getPort();
Channel channel = bootstrap.bind(port).sync().channel();
log.info("Shardingscaling is server on http://127.0.0.1:" + port + '/');
channel.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private static void initServerConfig() throws IOException {
File yamlFile = new File(Bootstrap.class.getResource(SERVER_CONFIG_FILE).getFile());
ServerConfiguration serverConfiguration = YamlEngine.unmarshal(yamlFile, ServerConfiguration.class);
Preconditions.checkNotNull(serverConfiguration, "Server configuration file `%s` is invalid.", yamlFile.getName());
ScalingContext.getInstance().init(serverConfiguration);
}
}
......@@ -32,9 +32,7 @@
"username": "root"
}
},
"serverConfiguration": {
"blockQueueSize": 10000,
"pushTimeout": 1000,
"jobConfiguration": {
"concurrency": 3
}
}
\ No newline at end of file
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# port: 8888
# blockQueueSize: 10000
# pushTimeout: 1000
# workerThread: 30
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.shardingscaling.core.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class JobConfiguration {
private int concurrency = 3;
}
......@@ -26,5 +26,5 @@ public class ScalingConfiguration {
private RuleConfiguration ruleConfiguration;
private ServerConfiguration serverConfiguration;
private JobConfiguration jobConfiguration;
}
......@@ -27,8 +27,6 @@ public final class ScalingContext {
private static final ScalingContext INSTANCE = new ScalingContext();
private RuleConfiguration ruleConfiguration;
private ServerConfiguration serverConfiguration;
/**
......@@ -43,11 +41,9 @@ public final class ScalingContext {
/**
* Initialize Scaling context.
*
* @param ruleConfiguration ruleConfiguration
* @param serverConfiguration serverConfiguration
*/
public void init(final RuleConfiguration ruleConfiguration, final ServerConfiguration serverConfiguration) {
this.ruleConfiguration = ruleConfiguration;
public void init(final ServerConfiguration serverConfiguration) {
this.serverConfiguration = serverConfiguration;
}
......
......@@ -17,19 +17,24 @@
package org.apache.shardingsphere.shardingscaling.core.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.core.yaml.config.YamlConfiguration;
@AllArgsConstructor
/**
* Global server configuration.
*
* @author ssxlulu
*/
@NoArgsConstructor
@Data
public class ServerConfiguration {
public class ServerConfiguration implements YamlConfiguration {
private Integer blockQueueSize;
private int port = 8080;
private Integer pushTimeout;
private int blockQueueSize = 10000;
private Integer concurrency;
private int pushTimeout = 1000;
private int workerThread = 30;
}
......@@ -21,6 +21,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.SyncRunner;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.Reader;
......@@ -37,7 +38,7 @@ import java.util.List;
@Slf4j
public class ExecuteUtil {
private static final SyncTaskExecuteEngine EXECUTE_ENGINE = new DefaultSyncTaskExecuteEngine(30);
private static final SyncTaskExecuteEngine EXECUTE_ENGINE = new DefaultSyncTaskExecuteEngine(ScalingContext.getInstance().getServerConfiguration().getWorkerThread());
/**
* Execute.
......
......@@ -17,6 +17,7 @@
package org.apache.shardingsphere.shardingscaling.core.execute.executor.channel;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Record;
import java.util.ArrayList;
......@@ -34,9 +35,9 @@ import java.util.concurrent.TimeUnit;
*/
public class MemoryChannel implements Channel {
private static final int PUSH_TIMEOUT = 1000;
private static final int PUSH_TIMEOUT = ScalingContext.getInstance().getServerConfiguration().getPushTimeout();
private final BlockingQueue<Record> queue = new ArrayBlockingQueue<>(10000);
private final BlockingQueue<Record> queue = new ArrayBlockingQueue<>(ScalingContext.getInstance().getServerConfiguration().getBlockQueueSize());
private final List<AckCallback> ackCallbacks;
......
......@@ -56,7 +56,7 @@ public class SyncConfigurationUtil {
ruleConfig.getDestinationDataSources().getUsername(),
ruleConfig.getDestinationDataSources().getPassword());
writerConfiguration.setDataSourceConfiguration(writerDataSourceConfiguration);
syncConfigurations.add(new SyncConfiguration(scalingConfiguration.getServerConfiguration().getConcurrency(), readerConfiguration, writerConfiguration));
syncConfigurations.add(new SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(), readerConfiguration, writerConfiguration));
}
return syncConfigurations;
}
......
......@@ -21,6 +21,8 @@ import lombok.SneakyThrows;
import org.apache.shardingsphere.shardingscaling.core.config.DataSourceConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.JdbcDataSourceConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.ScalingContext;
import org.apache.shardingsphere.shardingscaling.core.config.ServerConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.shardingscaling.core.controller.task.ReportCallback;
import org.apache.shardingsphere.shardingscaling.core.execute.Event;
......@@ -52,6 +54,7 @@ public class HistoryDataSyncTaskTest {
public void setUp() {
RdbmsConfiguration readerConfig = mockReaderConfig();
RdbmsConfiguration writerConfig = mockWriterConfig();
ScalingContext.getInstance().init(new ServerConfiguration());
syncConfiguration = new SyncConfiguration(3, readerConfig, writerConfig);
dataSourceFactory = new DataSourceFactory();
}
......
......@@ -32,9 +32,7 @@
"username": "root"
}
},
"serverConfiguration": {
"blockQueueSize": 10000,
"pushTimeout": 1000,
"jobConfiguration": {
"concurrency": 3
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册