From 2b21a75ae04a4c7b591cb12169f1197ddea6c2c0 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Dec 2019 18:59:18 +0800 Subject: [PATCH] #928 --- src/kit/shell/inc/shell.h | 4 + src/kit/shell/src/shellEngine.c | 10 +- src/kit/shell/src/shellImport.c | 261 ++++++++++++++++++++++++++++++++ src/kit/shell/src/shellLinux.c | 13 ++ src/kit/shell/src/shellMain.c | 14 +- 5 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 src/kit/shell/src/shellImport.c diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 12024d0b86..499c93e0ec 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -58,6 +58,8 @@ struct arguments { bool is_raw_time; bool is_use_passwd; char file[TSDB_FILENAME_LEN]; + char dir[TSDB_FILENAME_LEN]; + int threadNum; char* commands; int abort; }; @@ -74,12 +76,14 @@ void shellRunCommandOnServer(TAOS* con, char command[]); void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); +void source_dir(TAOS* con, struct arguments* args); void get_history_path(char* history); void cleanup_handler(void* arg); void exitShell(); int shellDumpResult(TAOS* con, char* fname, int* error_no, bool printMode); void shellPrintNChar(char* str, int width, bool printMode); void shellGetGrantInfo(void *con); +int isCommentLine(char *line); #define max(a, b) ((int)(a) < (int)(b) ? (int)(b) : (int)(a)) /**************** Global variable declarations ****************/ diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 116209b784..c1caf6147d 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -110,6 +110,14 @@ TAOS *shellInit(struct arguments *args) { exit(EXIT_SUCCESS); } +#ifdef LINUX + if (args->dir[0] != 0) { + source_dir(con, args); + taos_close(con); + exit(EXIT_SUCCESS); + } +#endif + printf(SERVER_VERSION, taos_get_server_info(con)); return con; @@ -762,7 +770,7 @@ void taos_error(TAOS *con) { taos_free_result(pRes); } -static int isCommentLine(char *line) { +int isCommentLine(char *line) { if (line == NULL) return 1; return regex_match(line, "^\\s*#.*", REG_EXTENDED); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c new file mode 100644 index 0000000000..e4baf07edf --- /dev/null +++ b/src/kit/shell/src/shellImport.c @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _XOPEN_SOURCE +#define _DEFAULT_SOURCE + +#include "os.h" +#include "shell.h" +#include "shellCommand.h" +#include "ttime.h" +#include "tutil.h" + +static char **shellSQLFiles = NULL; +static int32_t shellSQLFileNum = 0; +static char shellTablesSQLFile[TSDB_FILENAME_LEN] = {0}; + +typedef struct { + pthread_t threadID; + int threadIndex; + int totalThreads; + void *taos; +} ShellThreadObj; + +static int shellGetFilesNum(const char *directoryName, const char *prefix) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + if (fscanf(fp, "%d", &fileNum) != 1) { + fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd); + exit(0); + } + + if (fileNum <= 0) { + fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName); + exit(0); + } + + pclose(fp); + return fileNum; +} + +static void shellParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + while (fscanf(fp, "%s", fileArray[fileNum++])) { + if (strcmp(fileArray[fileNum-1], shellTablesSQLFile) == 0) { + fileNum--; + } + if (fileNum >= totalFiles) { + break; + } + } + + if (fileNum != totalFiles) { + fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName); + exit(0); + } + + pclose(fp); +} + +static void shellCheckTablesSQLFile(const char *directoryName) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/tables.sql", directoryName); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + while (fscanf(fp, "%s", shellTablesSQLFile)) { + break; + } + + pclose(fp); +} + +static void shellMallocSQLFiles() +{ + shellSQLFiles = (char**)calloc(shellSQLFileNum, sizeof(char*)); + for (int i = 0; i < shellSQLFileNum; i++) { + shellSQLFiles[i] = calloc(1, TSDB_FILENAME_LEN); + } +} + +static void shellGetDirectoryFileList(char *inputDir) +{ + struct stat fileStat; + if (stat(inputDir, &fileStat) < 0) { + fprintf(stderr, "ERROR: %s not exist\n", inputDir); + exit(0); + } + + if (fileStat.st_mode & S_IFDIR) { + shellCheckTablesSQLFile(inputDir); + shellSQLFileNum = shellGetFilesNum(inputDir, "sql"); + int totalSQLFileNum = shellSQLFileNum; + if (shellTablesSQLFile[0] != 0) { + shellSQLFileNum--; + } + shellMallocSQLFiles(); + shellParseDirectory(inputDir, "sql", shellSQLFiles, shellSQLFileNum); + fprintf(stdout, "start to dispose %d files in %s\n", totalSQLFileNum, inputDir); + } + else { + fprintf(stderr, "ERROR: %s is not a directory\n", inputDir); + exit(0); + } +} + +static void shellSourceFile(TAOS *con, char *fptr) { + wordexp_t full_path; + int read_len = 0; + char * cmd = malloc(MAX_COMMAND_SIZE); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; + + if (wordexp(fptr, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: illegal file name\n"); + return; + } + + char *fname = full_path.we_wordv[0]; + + if (access(fname, R_OK) == -1) { + fprintf(stderr, "ERROR: file %s is not readable\n", fptr); + wordfree(&full_path); + return; + } + + FILE *f = fopen(fname, "r"); + if (f == NULL) { + fprintf(stderr, "ERROR: failed to open file %s\n", fname); + wordfree(&full_path); + return; + } + + fprintf(stdout, "start to dispose file:%s\n", fname); + + while ((read_len = getline(&line, &line_len, f)) != -1) { + if (read_len >= MAX_COMMAND_SIZE) continue; + line[--read_len] = '\0'; + + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } + + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + + memcpy(cmd + cmd_len, line, read_len); + if (taos_query(con, cmd)) { + taos_error(con); + } + + memset(cmd, 0, MAX_COMMAND_SIZE); + cmd_len = 0; + } + + free(cmd); + if (line) free(line); + wordfree(&full_path); + fclose(f); +} + +void* shellImportThreadFp(void *arg) +{ + ShellThreadObj *pThread = (ShellThreadObj*)arg; + for (int f = 0; f < shellSQLFileNum; ++f) { + if (f % pThread->totalThreads == pThread->threadIndex) { + char *SQLFileName = shellSQLFiles[f]; + shellSourceFile(pThread->taos, SQLFileName); + } + } + + return NULL; +} + +static void shellRunImportThreads(struct arguments* args) +{ + pthread_attr_t thattr; + ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); + for (int t = 0; t < args->threadNum; ++t) { + ShellThreadObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = args->threadNum; + pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort); + if (pThread->taos == NULL) { + fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos)); + exit(0); + } + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, shellImportThreadFp, (void*)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } + } + + for (int t = 0; t < args->threadNum; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } + + for (int t = 0; t < args->threadNum; ++t) { + taos_close(threadObj[t].taos); + } + free(threadObj); +} + +void source_dir(TAOS* con, struct arguments* args) { + shellGetDirectoryFileList(args->dir); + int64_t start = taosGetTimestampMs(); + + if (shellTablesSQLFile[0] != 0) { + shellSourceFile(con, shellTablesSQLFile); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", shellTablesSQLFile, (end - start) / 1000.0); + } + + shellRunImportThreads(args); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", args->dir, (end - start) / 1000.0); +} diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index ad8bf6c5c3..67df2ea161 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -40,6 +40,8 @@ static struct argp_option options[] = { {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, + {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, + {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {0}}; @@ -89,6 +91,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { strcpy(arguments->file, full_path.we_wordv[0]); wordfree(&full_path); break; + case 'D': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + strcpy(arguments->dir, full_path.we_wordv[0]); + wordfree(&full_path); + break; + case 'T': + arguments->threadNum = atoi(arg); + break; case 'd': arguments->database = arg; break; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 82333020f1..a7b7e8383b 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -62,7 +62,19 @@ int checkVersion() { } // Global configurations -struct arguments args = {NULL, NULL, NULL, NULL, NULL, false, false, "\0", NULL}; +struct arguments args = { + .host = NULL, + .password = NULL, + .user = NULL, + .database = NULL, + .timezone = NULL, + .is_raw_time = false, + .is_use_passwd = false, + .file = "\0", + .dir = "\0", + .threadNum = 5, + .commands = NULL +}; /* * Main function. -- GitLab