From e994a2d4c9c04afedcf617164945b876721f87d6 Mon Sep 17 00:00:00 2001 From: shimingxy Date: Mon, 23 Mar 2020 23:55:38 +0800 Subject: [PATCH] u --- .../blazer/export/file/TransDataExport.java | 6 + .../file/runner/TransDataExport2Sql.java | 218 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 blazer-trans/src/main/java/com/blazer/export/file/runner/TransDataExport2Sql.java diff --git a/blazer-trans/src/main/java/com/blazer/export/file/TransDataExport.java b/blazer-trans/src/main/java/com/blazer/export/file/TransDataExport.java index ef8fbca..ea13772 100644 --- a/blazer-trans/src/main/java/com/blazer/export/file/TransDataExport.java +++ b/blazer-trans/src/main/java/com/blazer/export/file/TransDataExport.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.blazer.export.file.runner.TransDataExport2Csv; +import com.blazer.export.file.runner.TransDataExport2Sql; import com.blazer.export.file.runner.TransDataExport2Xlsx; import com.blazer.pipeline.PipeLineTask; @@ -309,6 +310,11 @@ public class TransDataExport extends BasicConfigure implements PipeLineTask{ commitNumber, threadNumber, tableName, outFileName, fileNameSuffix, terminatedString, threadSql, exportFilePath,limitTextSize); + }if(this.fileType.equalsIgnoreCase("sql")) { + transThread =new TransDataExport2Sql(sourceDataSource, + commitNumber, threadNumber, + tableName, outFileName, fileNameSuffix, + terminatedString, threadSql, exportFilePath,limitTextSize); }else { transThread =new TransDataExport2Xlsx(sourceDataSource, diff --git a/blazer-trans/src/main/java/com/blazer/export/file/runner/TransDataExport2Sql.java b/blazer-trans/src/main/java/com/blazer/export/file/runner/TransDataExport2Sql.java new file mode 100644 index 0000000..b3fb6e8 --- /dev/null +++ b/blazer-trans/src/main/java/com/blazer/export/file/runner/TransDataExport2Sql.java @@ -0,0 +1,218 @@ +/** + * + */ +package com.blazer.export.file.runner; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; + +import javax.sql.DataSource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.blazer.db.TableColumns; +import com.blazer.export.file.BasicConfigure; +import com.blazer.export.file.TransDataExport; + + +/** + * @author mhshi + * + */ +public class TransDataExport2Sql extends BasicConfigure implements Runnable{ + private static final Logger _logger = LoggerFactory.getLogger(TransDataExport2Sql.class); + + String selectSqlString; + int threadNumber =1; + FileOutputStream fop = null; + + long commitCount=0; + + ArrayList listTableColumns ; + + + public TransDataExport2Sql(DataSource sourceDataSource,int commitNumber, int threadNumber, + String tableName, String outFileName, String fileNameSuffix, + String terminatedString, String selectSqlString, String exportFilePath,int limitTextSize) { + super(); + this.commitNumber = commitNumber; + this.threadNumber = threadNumber; + this.tableName = tableName; + this.outFileName = outFileName; + this.fileNameSuffix = fileNameSuffix; + this.terminatedString = terminatedString; + this.selectSqlString = selectSqlString; + this.exportFilePath = exportFilePath; + this.sourceDataSource=sourceDataSource; + this.limitTextSize=limitTextSize; + } + + @Override + public void run() { + // TODO Auto-generated method stub + File exportFilePathName = new File(exportFilePath+outFileName+"_"+threadNumber+fileNameSuffix); + + try { + if(exportFilePathName.exists()){ + exportFilePathName.delete(); + } + exportFilePathName.createNewFile(); + this.fop = new FileOutputStream(exportFilePathName); + } catch (IOException e) { + e.printStackTrace(); + } + + listTableColumns = new ArrayList(); + try { + Connection sourcConn=sourceDataSource.getConnection(); + Statement sourcStmt = sourcConn.createStatement(); + ResultSet sourceRs=sourcStmt.executeQuery(selectSqlString); + buildMetaData(sourceRs); + batchWrite(sourceRs); + sourceRs.close(); + sourcStmt.close(); + sourcConn.close(); + fop.close(); + + Thread.sleep(2000); + + successThread(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + _logger.info("--thread "+threadNumber+" -- Fail ."); + _logger.error("导出数据错误", e); + } + + TransDataExport.mCountDownLatch.countDown(); + } + + //TODO + public synchronized void successThread() { + TransDataExport.successThread[threadNumber-1]=commitCount; + _logger.info("--thread "+threadNumber+" data count "+TransDataExport.successThread[threadNumber-1]+" -- Complete ."); + } + + public void batchWrite(ResultSet rs) throws Exception{ + + long insertNum=0; + + StringBuffer stringBufferLines=new StringBuffer(""); + while(rs.next()){ + for(int ccount=0;ccount0){//NUMBER + stringBufferLines.append(rs.getLong(tc.getColumnName())); + //targetPstmt.setLong(pos, rs.getLong(tc.getColumnName())); + }else if(tc.getDataLength()==22&&tc.getDataScale()==0){//INTEGER + stringBufferLines.append(rs.getInt(tc.getColumnName())); + //targetPstmt.setInt(pos, rs.getInt(tc.getColumnName())); + }else if(tc.getDataPrecision()==0||tc.getDataScale()==0||tc.getDataScale()==0){//LONG + stringBufferLines.append(rs.getLong(tc.getColumnName())); + //targetPstmt.setLong(pos, rs.getLong(tc.getColumnName())); + }else{//DOUBLE + stringBufferLines.append(rs.getDouble(tc.getColumnName())); + //targetPstmt.setDouble(pos, rs.getDouble(tc.getColumnName())); + } + }else if(tc.getDataType().equalsIgnoreCase("blob")){ + stringBufferLines.append(getBlob(rs,tc.getColumnName())); + }else if(tc.getDataType().equalsIgnoreCase("clob")||tc.getDataType().equalsIgnoreCase("NCLOB")){ + String lobString =getClob(rs,tc.getColumnName()); + //_logger.info(lobString); + if(0= this.commitNumber) { + insertNum = 0; + _logger.info("--thread "+threadNumber+"--Commit Count "+commitCount ); + fop.write(stringBufferLines.toString().getBytes()); + stringBufferLines=new StringBuffer(""); + } + } + if (insertNum >0){ + _logger.info("--thread "+threadNumber+"--Commit Count "+commitCount +" Complete ."); + fop.write(stringBufferLines.toString().getBytes()); + } + + } + + public String getClob(ResultSet sourceRs,String column) throws Exception{ + oracle.sql.CLOB sb=(oracle.sql.CLOB)sourceRs.getClob(column); + if(sb==null)return ""; + Reader is=sb.getCharacterStream(); + char[]data=new char[(int)sb.length()]; + is.read(data); + is.close(); + return new String(data); + } + + public String getBlob(ResultSet sourceRs,String column) throws Exception{ + oracle.sql.BLOB sb=(oracle.sql.BLOB)sourceRs.getBlob(column); + if(sb==null)return ""; + InputStream is=sb.getBinaryStream(); + byte[]data=new byte[(int)sb.length()]; + is.read(data); + is.close(); + return new String(data); + } + public void buildMetaData(ResultSet rs) throws SQLException{ + ResultSetMetaData metaData = rs.getMetaData(); + _logger.debug("--thread "+threadNumber+"--column Count "+metaData.getColumnCount() ); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + TableColumns tc=new TableColumns(); + tc.setColumnName(metaData.getColumnName(i)); + tc.setDataType(metaData.getColumnTypeName(i)); + tc.setTableName(metaData.getTableName(i)); + tc.setDataPrecision(metaData.getPrecision(i)); + tc.setDataScale(metaData.getScale(i)); + _logger.debug("--thread "+threadNumber+"--No. "+i+" , Column "+tc.getColumnName()+" , DataType "+tc.getDataType() ); + listTableColumns.add(tc); + } + } + + + + +} -- GitLab