From 2822a72b26da615678335c8171fa5a4b48839d44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=B1=E9=B9=BF=20Lucas?= Date: Tue, 8 Sep 2020 14:21:50 +0800 Subject: [PATCH] Add shardingsphere-scaling-elasticjob module (#7310) * add shardingsphere-scaling-elasticjob module * add a blank line in the end of file Co-authored-by: qiulu3 --- pom.xml | 21 ++- shardingsphere-scaling/pom.xml | 1 + .../src/main/resources/conf/server.yaml | 5 + .../scaling/core/config/JobConfiguration.java | 2 + .../scaling/core/config/ScalingContext.java | 13 +- .../core/datasource/DataSourceManager.java | 6 +- .../scaling/core/spi/ElasticJobEntry.java | 34 ++++ .../core/spi/ElasticJobEntryLoader.java | 43 +++++ .../InventoryDataScalingTaskGroupTest.java | 11 +- .../shardingsphere-scaling-elasticjob/pom.xml | 56 ++++++ .../elasticjob/ScalingElasticJobEntry.java | 161 ++++++++++++++++++ .../elasticjob/job/ScalingElasticJob.java | 68 ++++++++ ...ingsphere.scaling.core.spi.ElasticJobEntry | 18 ++ 13 files changed, 429 insertions(+), 10 deletions(-) create mode 100644 shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java create mode 100644 shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java create mode 100644 shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml create mode 100644 shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java create mode 100644 shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java create mode 100644 shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry diff --git a/pom.xml b/pom.xml index 44b3bc5e96..e3a062ade0 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,8 @@ 1.5.0.Final 1.4.6 2.3.0 - + 3.0.0-alpha + ${java.home}/../bin/javadoc false -Xmx1024m -XX:MaxMetaspaceSize=256m @@ -366,7 +367,23 @@ ${lombok.version} provided - + + + org.apache.shardingsphere.elasticjob + elasticjob-lite-core + ${elasticjob.version} + + + org.apache.shardingsphere.elasticjob + elasticjob-api + ${elasticjob.version} + + + org.apache.shardingsphere.elasticjob + elasticjob-lite-lifecycle + ${elasticjob.version} + + org.springframework.boot spring-boot-configuration-processor diff --git a/shardingsphere-scaling/pom.xml b/shardingsphere-scaling/pom.xml index 48987d219d..143b50b8ea 100755 --- a/shardingsphere-scaling/pom.xml +++ b/shardingsphere-scaling/pom.xml @@ -33,5 +33,6 @@ shardingsphere-scaling-bootstrap shardingsphere-scaling-mysql shardingsphere-scaling-postgresql + shardingsphere-scaling-elasticjob diff --git a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml index 41b507cd0e..8ff859244c 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml +++ b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/main/resources/conf/server.yaml @@ -19,3 +19,8 @@ port: 8888 blockQueueSize: 10000 pushTimeout: 1000 workerThread: 30 +#name: elasticjob +#registryCenter: +# type: zookeeper +# serverLists: localhost:2181 +# props: diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java index ca34fc5d7d..74e583c619 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java @@ -35,6 +35,8 @@ public final class JobConfiguration { private String jobName; + private boolean running = true; + private String[] shardingTables; private int shardingItem; diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java index f9ba2848ca..0a7c578848 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ScalingContext.java @@ -17,11 +17,13 @@ package org.apache.shardingsphere.scaling.core.config; -import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine; - +import com.google.common.base.Strings; import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; +import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper; +import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine; +import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntryLoader; /** * ShardingSphere-Scaling context. @@ -56,5 +58,12 @@ public final class ScalingContext { this.serverConfiguration = serverConfiguration; taskExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread()); importerExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread()); + initElasticJobEntry(serverConfiguration); + } + + private void initElasticJobEntry(final ServerConfiguration serverConfiguration) { + if (!Strings.isNullOrEmpty(serverConfiguration.getName()) && null != serverConfiguration.getRegistryCenter()) { + ElasticJobEntryLoader.init(serverConfiguration.getName(), new GovernanceCenterConfigurationYamlSwapper().swapToObject(serverConfiguration.getRegistryCenter())); + } } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java index f24d568789..4eaa300a61 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/datasource/DataSourceManager.java @@ -56,9 +56,9 @@ public final class DataSourceManager implements AutoCloseable { private void createSourceDatasources(final List syncConfigs) { for (SyncConfiguration syncConfiguration : syncConfigs) { DataSourceConfiguration dataSourceConfig = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration(); - DataSourceWrapper hikariDataSource = dataSourceFactory.newInstance(dataSourceConfig); - cachedDataSources.put(dataSourceConfig, hikariDataSource); - sourceDatasources.put(dataSourceConfig, hikariDataSource); + DataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig); + cachedDataSources.put(dataSourceConfig, dataSource); + sourceDatasources.put(dataSourceConfig, dataSource); } } diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java new file mode 100644 index 0000000000..425da752b8 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntry.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.spi; + +import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration; + +/** + * Elastic job entry. + */ +public interface ElasticJobEntry { + + /** + * Init elastic job. + * + * @param namespace registry center namespace + * @param registryCenter registry center + */ + void init(String namespace, GovernanceCenterConfiguration registryCenter); +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java new file mode 100644 index 0000000000..ead7b5342b --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ElasticJobEntryLoader.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.core.spi; + +import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration; +import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader; + +import java.util.Collection; + +/** + * Elastic job entry loader. + */ +public final class ElasticJobEntryLoader { + + /** + * Init elastic job entry. + * + * @param namespace registry center namespace + * @param registryCenter registry center + */ + public static void init(final String namespace, final GovernanceCenterConfiguration registryCenter) { + ShardingSphereServiceLoader.register(ElasticJobEntry.class); + Collection elasticJobEntries = ShardingSphereServiceLoader.newServiceInstances(ElasticJobEntry.class); + for (ElasticJobEntry each : elasticJobEntries) { + each.init(namespace, registryCenter); + } + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java index 87d34fbb90..5028ec87f2 100644 --- a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java +++ b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskGroupTest.java @@ -19,22 +19,29 @@ package org.apache.shardingsphere.scaling.core.job.task.inventory; import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager; import org.apache.shardingsphere.scaling.core.job.SyncProgress; +import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition; import org.apache.shardingsphere.scaling.core.job.task.ScalingTask; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; import java.util.Collections; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +@RunWith(MockitoJUnitRunner.class) public final class InventoryDataScalingTaskGroupTest { private DataSourceManager dataSourceManager; + @Mock + private ScalingTask scalingTask; + @Before public void setUp() { dataSourceManager = new DataSourceManager(); @@ -47,7 +54,6 @@ public final class InventoryDataScalingTaskGroupTest { @Test public void assertStart() { - ScalingTask scalingTask = mock(ScalingTask.class); InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask)); inventoryDataSyncTaskGroup.start(); verify(scalingTask).start(); @@ -55,7 +61,6 @@ public final class InventoryDataScalingTaskGroupTest { @Test public void assertStop() { - ScalingTask scalingTask = mock(ScalingTask.class); InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask)); inventoryDataSyncTaskGroup.stop(); verify(scalingTask).stop(); diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml new file mode 100644 index 0000000000..694c37eb46 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/pom.xml @@ -0,0 +1,56 @@ + + + + + 4.0.0 + + org.apache.shardingsphere + shardingsphere-scaling + 5.0.0-RC1-SNAPSHOT + + shardingsphere-scaling-elasticjob + ${project.artifactId} + + + + org.apache.shardingsphere + shardingsphere-scaling-core + ${project.version} + + + org.apache.shardingsphere.elasticjob + elasticjob-lite-core + + + org.apache.shardingsphere.elasticjob + elasticjob-api + + + org.apache.shardingsphere.elasticjob + elasticjob-lite-lifecycle + + + + ch.qos.logback + logback-classic + runtime + + + diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java new file mode 100644 index 0000000000..debbd3b9ce --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/ScalingElasticJobEntry.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.elasticjob; + +import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.elasticjob.api.JobConfiguration; +import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO; +import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap; +import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService; +import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory; +import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI; +import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; +import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; +import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; +import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration; +import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent; +import org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository; +import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration; +import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry; +import org.apache.shardingsphere.scaling.elasticjob.job.ScalingElasticJob; + +import java.util.Optional; + +/** + * Scaling elastic job entry. + */ +@Slf4j +public final class ScalingElasticJobEntry implements ElasticJobEntry { + + private static final String SCALING_JOB_NAME = "ScalingJob"; + + private static final String SCALING_JOB_CONFIG = "/__scalingjob_config"; + + private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create(); + + private final CuratorZookeeperRepository curatorZookeeperRepository = new CuratorZookeeperRepository(); + + private OneOffJobBootstrap scalingJobBootstrap; + + private boolean running; + + private String namespace; + + private GovernanceCenterConfiguration registryCenter; + + @Override + public void init(final String namespace, final GovernanceCenterConfiguration registryCenter) { + log.info("Scaling elastic job start..."); + this.namespace = namespace; + this.registryCenter = registryCenter; + initConfigurationRepository(); + watchConfigurationRepository(); + } + + private void initConfigurationRepository() { + scalingJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new ScalingElasticJob(), createJobConfiguration()); + curatorZookeeperRepository.init(namespace, registryCenter); + } + + private CoordinatorRegistryCenter createRegistryCenter() { + ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryCenter.getServerLists(), namespace); + zkConfig.setMaxSleepTimeMilliseconds(getProperty("max.sleep.time.milliseconds", zkConfig.getMaxSleepTimeMilliseconds())); + zkConfig.setBaseSleepTimeMilliseconds(getProperty("base.sleep.time.milliseconds", zkConfig.getBaseSleepTimeMilliseconds())); + zkConfig.setConnectionTimeoutMilliseconds(getProperty("connection.timeout.milliseconds", zkConfig.getConnectionTimeoutMilliseconds())); + zkConfig.setSessionTimeoutMilliseconds(getProperty("session.timeout.milliseconds", zkConfig.getSessionTimeoutMilliseconds())); + CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig); + regCenter.init(); + return regCenter; + } + + private int getProperty(final String key, final int defaultValue) { + if (Strings.isNullOrEmpty(registryCenter.getProps().getProperty(key))) { + return defaultValue; + } + return Integer.parseInt(registryCenter.getProps().getProperty(key)); + } + + private JobConfiguration createJobConfiguration() { + return createJobConfiguration(1, null); + } + + private JobConfiguration createJobConfiguration(final int shardingTotalCount, final String jobParameter) { + return JobConfiguration.newBuilder(SCALING_JOB_NAME, shardingTotalCount).jobParameter(jobParameter).build(); + } + + private void watchConfigurationRepository() { + curatorZookeeperRepository.watch(SCALING_JOB_CONFIG, event -> { + Optional scalingConfiguration = getScalingConfiguration(event); + if (!scalingConfiguration.isPresent()) { + return; + } + switch (event.getChangedType()) { + case ADDED: + case UPDATED: + executeJob(scalingConfiguration.get()); + break; + case DELETED: + deleteJob(scalingConfiguration.get()); + break; + default: + break; + } + }); + } + + private Optional getScalingConfiguration(final DataChangedEvent event) { + try { + log.info("{} scaling config: {}", event.getChangedType(), event.getValue()); + return Optional.of(GSON.fromJson(event.getValue(), ScalingConfiguration.class)); + } catch (JsonSyntaxException ex) { + log.error("analyze scaling config failed.", ex); + } + return Optional.empty(); + } + + private void executeJob(final ScalingConfiguration scalingConfiguration) { + if (running && scalingConfiguration.getJobConfiguration().isRunning()) { + log.warn("scaling elastic job has already running, ignore current config."); + return; + } + if (running == scalingConfiguration.getJobConfiguration().isRunning()) { + return; + } + if (new LeaderService(createRegistryCenter(), SCALING_JOB_NAME).isLeader()) { + log.info("leader worker update config."); + updateJobConfiguration(scalingConfiguration); + } + scalingJobBootstrap.execute(); + running = scalingConfiguration.getJobConfiguration().isRunning(); + } + + private void deleteJob(final ScalingConfiguration scalingConfiguration) { + scalingConfiguration.getJobConfiguration().setRunning(false); + executeJob(scalingConfiguration); + } + + private void updateJobConfiguration(final ScalingConfiguration scalingConfiguration) { + JobConfigurationAPI jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(registryCenter.getServerLists(), namespace, null); + jobConfigurationAPI.updateJobConfiguration( + JobConfigurationPOJO.fromJobConfiguration(createJobConfiguration(scalingConfiguration.getJobConfiguration().getShardingTables().length, GSON.toJson(scalingConfiguration)))); + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java new file mode 100644 index 0000000000..a90ae7b1e5 --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/java/org/apache/shardingsphere/scaling/elasticjob/job/ScalingElasticJob.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.elasticjob.job; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.elasticjob.api.ShardingContext; +import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; +import org.apache.shardingsphere.scaling.core.ScalingJobController; +import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration; +import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob; +import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil; + +/** + * Scaling elastic job. + */ +@Slf4j +public final class ScalingElasticJob implements SimpleJob { + + private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create(); + + private static final ScalingJobController SCALING_JOB_CONTROLLER = new ScalingJobController(); + + private ShardingScalingJob shardingScalingJob; + + @Override + public void execute(final ShardingContext shardingContext) { + log.info("execute job: {} - {}/{}", shardingContext.getTaskId(), shardingContext.getShardingItem(), shardingContext.getShardingTotalCount()); + ScalingConfiguration scalingConfiguration = GSON.fromJson(shardingContext.getJobParameter(), ScalingConfiguration.class); + if (scalingConfiguration.getJobConfiguration().isRunning()) { + startJob(scalingConfiguration, shardingContext); + return; + } + stopJob(shardingContext); + } + + private void startJob(final ScalingConfiguration scalingConfiguration, final ShardingContext shardingContext) { + log.info("start job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem()); + scalingConfiguration.getJobConfiguration().setShardingItem(shardingContext.getShardingItem()); + shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem()); + shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration)); + SCALING_JOB_CONTROLLER.start(shardingScalingJob); + } + + private void stopJob(final ShardingContext shardingContext) { + log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem()); + if (null != shardingScalingJob) { + SCALING_JOB_CONTROLLER.stop(shardingScalingJob.getJobId()); + shardingScalingJob = null; + } + } +} diff --git a/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry new file mode 100644 index 0000000000..908881663b --- /dev/null +++ b/shardingsphere-scaling/shardingsphere-scaling-elasticjob/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.scaling.elasticjob.ScalingElasticJobEntry -- GitLab