MysqlSourceGenerator.java 5.6 KB
Newer Older
_和's avatar
_和 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;

import org.apache.commons.lang.StringUtils;
20
import org.apache.dolphinscheduler.common.enums.DbType;
_和's avatar
_和 已提交
21 22 23 24
import org.apache.dolphinscheduler.common.enums.QueryType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
张世鸣 已提交
25
import org.apache.dolphinscheduler.common.utils.*;
_和's avatar
_和 已提交
26 27
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
28 29
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
_和's avatar
_和 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * mysql source generator
 */
public class MysqlSourceGenerator implements ISourceGenerator {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
44
    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
_和's avatar
_和 已提交
45 46 47 48 49
        StringBuilder result = new StringBuilder();
        try {
            SourceMysqlParameter sourceMysqlParameter
                    = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);

50 51
            SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();

_和's avatar
_和 已提交
52
            if(sourceMysqlParameter != null){
53 54
                BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()),
                        sqoopTaskExecutionContext.getSourceConnectionParams());
_和's avatar
_和 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
                if(baseDataSource != null){
                    result.append(" --connect ")
                            .append(baseDataSource.getJdbcUrl())
                            .append(" --username ")
                            .append(baseDataSource.getUser())
                            .append(" --password ")
                            .append(baseDataSource.getPassword());

                    if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){
                        if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){
                            result.append(" --table ").append(sourceMysqlParameter.getSrcTable());
                        }

                        if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){
                            result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns());
                        }

72 73 74 75 76 77 78
                    }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()
                            && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){
                        String srcQuery = sourceMysqlParameter.getSrcQuerySql();
                        if(srcQuery.toLowerCase().contains("where")){
                            srcQuery += " AND "+"$CONDITIONS";
                        }else{
                            srcQuery += " WHERE $CONDITIONS";
_和's avatar
_和 已提交
79
                        }
E
eights 已提交
80
                        result.append(" --query \'").append(srcQuery).append("\'");
81

_和's avatar
_和 已提交
82 83 84 85 86
                    }

                    List<Property>  mapColumnHive = sourceMysqlParameter.getMapColumnHive();

                    if(mapColumnHive != null && !mapColumnHive.isEmpty()){
E
eights 已提交
87
                        StringBuilder columnMap = new StringBuilder();
_和's avatar
_和 已提交
88
                        for(Property item:mapColumnHive){
E
eights 已提交
89
                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
_和's avatar
_和 已提交
90 91
                        }

E
eights 已提交
92
                        if(StringUtils.isNotEmpty(columnMap.toString())){
_和's avatar
_和 已提交
93
                            result.append(" --map-column-hive ")
94
                                    .append(columnMap.substring(0,columnMap.length()-1));
_和's avatar
_和 已提交
95 96 97 98 99 100
                        }
                    }

                    List<Property>  mapColumnJava = sourceMysqlParameter.getMapColumnJava();

                    if(mapColumnJava != null && !mapColumnJava.isEmpty()){
E
eights 已提交
101
                        StringBuilder columnMap = new StringBuilder();
_和's avatar
_和 已提交
102
                        for(Property item:mapColumnJava){
E
eights 已提交
103
                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
_和's avatar
_和 已提交
104 105
                        }

E
eights 已提交
106
                        if(StringUtils.isNotEmpty(columnMap.toString())){
_和's avatar
_和 已提交
107
                            result.append(" --map-column-java ")
108
                                    .append(columnMap.substring(0,columnMap.length()-1));
_和's avatar
_和 已提交
109 110 111 112 113 114 115 116 117 118 119
                        }
                    }
                }
            }
        }catch (Exception e){
            logger.error(e.getMessage());
        }

        return result.toString();
    }
}