/**
*
*/
package com.blazer.trans;
import java.util.ArrayList;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.blazer.db.TableDescribe;
import com.blazer.pipeline.PipeLineTask;
import com.blazer.trans.impl.*;
/**
* 实现不同数据库数据的同步
* 根据数据源判断当前的的来源和去向数据库类型
*
* @author mhshi
* @since 20180101
*
*/
public class TransData implements PipeLineTask{
private static final Logger _logger = LoggerFactory.getLogger(TransData.class);
DataSource sourceDataSource;
DataSource targetDataSource;
int commitNum = 2000;
int threadSize =1;
String fromUrl;
String fromUser;
String toUrl;
String toUser;
//
//
String transType="INCREMENT";
String trans2DBType=null;
String sourceDBType=null;
ArrayList tablesList = new ArrayList();
public int execute() throws Exception {
_logger.info("-- --From URL " + fromUrl+" , USER "+fromUser);
_logger.info("-- --To URL " + toUrl+" , USER "+toUser);
_logger.info("targetDataSource : "+targetDataSource.toString());
if(sourceDataSource.toString().toLowerCase().indexOf("oracle")>-1) {
sourceDBType="oracle";
}else if(sourceDataSource.toString().toLowerCase().indexOf("mysql")>-1) {
sourceDBType="mysql";
}else if(sourceDataSource.toString().toLowerCase().indexOf("greenplum")>-1) {
sourceDBType="greenplum";
}
_logger.info("targetDataSource : "+targetDataSource.toString());
if(targetDataSource.toString().toLowerCase().indexOf("oracle")>-1) {
trans2DBType="oracle";
}else if(targetDataSource.toString().toLowerCase().indexOf("mysql")>-1) {
trans2DBType="mysql";
}else if(targetDataSource.toString().toLowerCase().indexOf("greenplum")>-1) {
trans2DBType="greenplum";
}
int threadNum=tablesList.size()/threadSize;
ArrayList threadTablesList=new ArrayList();
int threadCount=1;
for(TableDescribe tableQuery : tablesList){
threadTablesList.add(tableQuery);
if(threadTablesList.size()>=threadNum){
runTranThread(sourceDataSource, targetDataSource, threadTablesList, transType, fromUser,
commitNum, threadCount);
threadCount++;
threadTablesList=new ArrayList();
}
}
if(threadTablesList.size()>0){
runTranThread(sourceDataSource, targetDataSource, threadTablesList, transType, fromUser,
commitNum, threadCount);
}
return 0;
}
public void runTranThread( DataSource sourceDataSource,
DataSource targetDataSource,
ArrayList listTables,
String transType,
String owner,
int commitNum,
int threadNum) {
Runnable transThread =null;
if(sourceDBType.equalsIgnoreCase("oracle")) {
if(trans2DBType.equals("oracle")) {
transThread = new TransOracle2Oracle(
sourceDataSource, targetDataSource, listTables, transType,fromUser, commitNum, threadNum);
}else if(trans2DBType.equals("mysql")) {
transThread = new TransOracle2MySql(
sourceDataSource, targetDataSource, listTables, transType,fromUser, commitNum, threadNum);
}else if(trans2DBType.equals("greenplum")) {
transThread = new TransOracle2Greenplum(
sourceDataSource, targetDataSource, listTables, transType,fromUser, commitNum, threadNum);
}
}
if(sourceDBType.equalsIgnoreCase("greenplum")) {
if(trans2DBType.equals("oracle")) {
transThread = new TransGreenplum2Oracle(
sourceDataSource, targetDataSource, listTables, transType,fromUser, commitNum, threadNum);
}
}
Thread tt=new Thread(transThread);
tt.start();
}
public DataSource getSourceDataSource() {
return sourceDataSource;
}
public void setSourceDataSource(DataSource sourceDataSource) {
this.sourceDataSource = sourceDataSource;
}
public DataSource getTargetDataSource() {
return targetDataSource;
}
public void setTargetDataSource(DataSource targetDataSource) {
this.targetDataSource = targetDataSource;
}
public int getCommitNum() {
return commitNum;
}
public void setCommitNum(int commitNum) {
this.commitNum = commitNum;
}
public String getFromUrl() {
return fromUrl;
}
public void setFromUrl(String fromUrl) {
this.fromUrl = fromUrl;
}
public String getFromUser() {
return fromUser;
}
public void setFromUser(String fromUser) {
this.fromUser = fromUser;
}
public String getToUrl() {
return toUrl;
}
public void setToUrl(String toUrl) {
this.toUrl = toUrl;
}
public String getToUser() {
return toUser;
}
public void setToUser(String toUser) {
this.toUser = toUser;
}
public String getTransType() {
return transType;
}
public void setTransType(String transType) {
this.transType = transType;
}
public void setTablesList(ArrayList tablesList) {
this.tablesList = tablesList;
}
}