提交 04f54a87 编写于 作者: C carlin cheung

modify csvLoadReadMe

上级 cb23c397
......@@ -28,13 +28,11 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.edu.thu.tsfiledb.exception.ArgsErrorException;
/**
* CSV File To TsFileDB
......@@ -70,6 +68,7 @@ public class CSVToTsfile {
private static final String TSFILEDB_CLI_PREFIX = "CSV_To_TsFile";
private static final String STRING_DATA_TYPE = "TEXT";
private static final int BATCH_EXCUTE_COUNT = 10;
private static String host;
private static String port;
......@@ -77,7 +76,8 @@ public class CSVToTsfile {
private static String password;
private static String filename;
private static String timeformat = "timestamps";
private static String errorInsertInfo = "${TSFILE_HOME}/csvInsertError.txt";
private static HashMap<String, String> timeseriesToType = null;
private static HashMap<String, ArrayList<Integer>> deviceToColumn; // storage csv table head info
......@@ -95,50 +95,27 @@ public class CSVToTsfile {
help.setRequired(false);
options.addOption(help);
Option op_host = OptionBuilder.withArgName(HOST_NAME).hasArg().withDescription("Host Name (required)")
.create(HOST_ARGS);
options.addOption(op_host);
Option opHost = Option.builder(HOST_ARGS).argName(HOST_NAME).hasArg().desc("Host Name (required)").build();
options.addOption(opHost);
Option os_port = OptionBuilder.withArgName(PORT_NAME).hasArg().withDescription("Port (required)")
.create(PORT_ARGS);
options.addOption(os_port);
Option opPort = Option.builder(PORT_ARGS).argName(PORT_NAME).hasArg().desc("Port (required)").build();
options.addOption(opPort);
Option os_username = OptionBuilder.withArgName(USERNAME_NAME).hasArg().withDescription("User name (required)")
.create(USERNAME_ARGS);
options.addOption(os_username);
Option opUsername = Option.builder(USERNAME_ARGS).argName(USERNAME_NAME).hasArg().desc("User Name (required)").build();
options.addOption(opUsername);
Option os_password = OptionBuilder.withArgName(PASSWORD_NAME).hasArg().withDescription("Password (required)")
.create(PASSWORD_ARGS);
options.addOption(os_password);
Option opPassword = Option.builder(PASSWORD_ARGS).optionalArg(true).argName(PASSWORD_NAME).hasArg().desc("Password (optional)").build();
options.addOption(opPassword);
Option os_file = OptionBuilder.withArgName(FILE_NAME).hasArg().withDescription("csv file path (required)")
.create(FILE_ARGS);
options.addOption(os_file);
Option opFile = Option.builder(FILE_ARGS).optionalArg(true).argName(FILE_NAME).hasArg().desc("csv file path (required)").build();
options.addOption(opFile);
Option os_timeformat = OptionBuilder.withArgName(TIMEFORMAT_NAME).hasArg()
.withDescription("timeFormat (not required)").create(TIMEFORMAT_ARGS);
options.addOption(os_timeformat);
Option opTimeformat = Option.builder(TIMEFORMAT_ARGS).optionalArg(true).argName(TIMEFORMAT_NAME).hasArg().desc("timeFormat (not required)").build();
options.addOption(opTimeformat);
return options;
}
/**
*
* @param arg argument for commandline e:h,f,p..
* @param name the name of argument
* @param commandLine
* @return the value of the option e: the argument h's value is 127.0.0.1
* @throws ArgsErrorException
*/
private static String checkRequiredArg(String arg, String name, CommandLine commandLine) throws ArgsErrorException {
String str = commandLine.getOptionValue(arg);
if (str == null) {
String msg = String.format("%s: Required values for option '%s' not provided", TSFILEDB_CLI_PREFIX, name);
throw new ArgsErrorException(msg);
}
return str;
}
/**
*
* @param tf time format, optioned:ISO8601, timestamps, self-defined:
......@@ -146,14 +123,9 @@ public class CSVToTsfile {
* @param timestamp the time column of the csv file
* @return the timestamps will insert into SQL
*/
private static String setTimeFormat(String tf, String timestamp) {
private static String setTimestamp(String tf, String timestamp) {
if (tf.equals("ISO8601")) {
try {
Long.parseLong(timestamp);
}catch (NumberFormatException nfe) {
return timestamp;
}
return "";
return timestamp;
} else if (tf.equals("timestamps")) {
try {
Long.parseLong(timestamp);
......@@ -184,11 +156,11 @@ public class CSVToTsfile {
Statement statement = null;
BufferedReader br = null;
BufferedWriter bw = null;
File file = new File(errorInsertInfo);
try {
Class.forName("cn.edu.thu.tsfiledb.jdbc.TsfileDriver");
connection = DriverManager.getConnection("jdbc:tsfile://" + host + ":" + port + "/", username, password);
br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(filename))));
File file = new File("csvInsertError.txt");
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)));
String line = "";
String[] strHeadInfo = br.readLine().split(",");
......@@ -213,7 +185,9 @@ public class CSVToTsfile {
timeseriesToType.put(resultSet.getString(0), resultSet.getString(1));
} else {
LOGGER.error("database Cannot find " + strHeadInfo[i] + ",stop import!");
bw.write("database Cannot find " + strHeadInfo[i] + ",stop import!");
br.close();
bw.close();
connection.close();
System.exit(1);
}
......@@ -229,22 +203,33 @@ public class CSVToTsfile {
}
statement = connection.createStatement();
int count =0;
while ((line = br.readLine()) != null) {
ArrayList<String> sqls = createInsertSQL(line);
ArrayList<String> sqls = createInsertSQL(line,bw);
for (String str : sqls) {
try {
statement.execute(str);
LOGGER.debug("{} :excuted successful!", str);
count++;
statement.addBatch(str);
if(count == BATCH_EXCUTE_COUNT) {
statement.executeBatch();
statement.clearBatch();
count = 0;
}
} catch (SQLException e) {
LOGGER.error("{} :excuted fail!", str);
LOGGER.error("{} :excuted fail!");
bw.write(e.getMessage());
bw.newLine();
}
}
}
if(file.exists() && (file.length() == 0)) {
file.delete();
try {
statement.executeBatch();
LOGGER.debug("excuted finish!");
} catch (SQLException e) {
bw.write(e.getMessage());
bw.newLine();
}
} catch (FileNotFoundException e) {
LOGGER.error("The csv file {} can't find!",filename, e);
} catch (IOException e) {
......@@ -263,7 +248,9 @@ public class CSVToTsfile {
} catch (IOException e) {
e.printStackTrace();
}
}
if(file.exists() && (file.length() == 0)) {
file.delete();
}
}
......@@ -276,12 +263,13 @@ public class CSVToTsfile {
/**
* create Insert SQL statement according to every line csv data
*
* @param line
* csv line data
* @param line csv line data
* @param bwToErrorFile the error info insert to file
* @return ArrayList<String> SQL statement list
* @throws IOException
*/
private static ArrayList<String> createInsertSQL(String line) {
String[] words = line.split(",", headInfo.size() + 1);
private static ArrayList<String> createInsertSQL(String line, BufferedWriter bwToErrorFile) throws IOException {
String[] data = line.split(",", headInfo.size() + 1);
ArrayList<String> sqls = new ArrayList<String>();
Iterator<Map.Entry<String, ArrayList<Integer>>> it = deviceToColumn.entrySet().iterator();
......@@ -292,7 +280,7 @@ public class CSVToTsfile {
sbd.append("insert into " + entry.getKey() + "(timestamp");
int skipcount = 0;
for (int j = 0; j < colIndex.size(); ++j) {
if (words[entry.getValue().get(j) + 1].equals("")) {
if (data[entry.getValue().get(j) + 1].equals("")) {
skipcount++;
continue;
}
......@@ -303,20 +291,22 @@ public class CSVToTsfile {
if (skipcount == entry.getValue().size())
continue;
String timestampsStr = setTimeFormat(timeformat, words[0]);
String timestampsStr = setTimestamp(timeformat, data[0]);
if (timestampsStr == "") {
LOGGER.error("Time Format Error!");
bwToErrorFile.write("Time Format Error!");
bwToErrorFile.newLine();
continue;
}
sbd.append(") values(").append(timestampsStr);
for (int j = 0; j < colIndex.size(); ++j) {
if (words[entry.getValue().get(j) + 1].equals(""))
if (data[entry.getValue().get(j) + 1].equals(""))
continue;
if (typeIdentify(headInfo.get(colIndex.get(j))) == STRING_DATA_TYPE) {
sbd.append(", \'" + words[colIndex.get(j) + 1] + "\'");
sbd.append(", \'" + data[colIndex.get(j) + 1] + "\'");
} else {
sbd.append("," + words[colIndex.get(j) + 1]);
sbd.append("," + data[colIndex.get(j) + 1]);
}
}
sbd.append(")");
......@@ -340,39 +330,38 @@ public class CSVToTsfile {
scanner.close();
return;
}
try {
commandLine = parser.parse(options, args);
if (commandLine.hasOption(HELP_ARGS)) {
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
scanner.close();
return;
}
host = checkRequiredArg(HOST_ARGS, HOST_NAME, commandLine);
port = checkRequiredArg(PORT_ARGS, PORT_NAME, commandLine);
username = checkRequiredArg(USERNAME_ARGS, USERNAME_NAME, commandLine);
password = commandLine.getOptionValue(PASSWORD_ARGS);
if (password == null) {
System.out.print(TSFILEDB_CLI_PREFIX + "> please input password: ");
password = scanner.nextLine();
}
filename = checkRequiredArg(FILE_ARGS, FILE_NAME, commandLine);
} catch (ParseException e) {
LOGGER.error("problems encountered while parsing the command line tokens.", e);
scanner.close();
System.exit(1);
} catch (ArgsErrorException e) {
}
if (commandLine.hasOption(HELP_ARGS)) {
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
scanner.close();
System.exit(1);
return;
}
try {
timeformat = checkRequiredArg(TIMEFORMAT_ARGS, TIMEFORMAT_NAME, commandLine);
} catch (ArgsErrorException e) {
host = commandLine.getOptionValue(HOST_ARGS);
port = commandLine.getOptionValue(PORT_ARGS);
username = commandLine.getOptionValue(USERNAME_ARGS);
password = commandLine.getOptionValue(PASSWORD_ARGS);
if (password == null) {
System.out.print(TSFILEDB_CLI_PREFIX + "> please input password: ");
password = scanner.nextLine();
}
filename = commandLine.getOptionValue(FILE_ARGS);
if(host == null || port == null || username == null || filename == null) {
hf.printHelp(TSFILEDB_CLI_PREFIX, options, true);
scanner.close();
return;
}
timeformat = commandLine.getOptionValue(TIMEFORMAT_ARGS);
if(timeformat == null) {
timeformat = "timestamps";
System.out.println("time format use default value long type!");
}
loadDataFromCSV();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册