diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4ccdb6b327a72c9f7072ea7b3b247a25d246ec1a..37bbd93ec610692fc538d0a6b4d5524ddfb22cd7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -469,6 +469,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { if (pCmd->command > TSDB_SQL_MGMT) { tscProcessMgmtRedirect(pSql, pMsg->content + 1); + } else if (pCmd->command == TSDB_SQL_INSERT){ + pSql->index++; + pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; } else { pSql->index++; } @@ -1400,7 +1403,7 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); - tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode)); + tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), htons(pShellMsg->vnode)); } int tscBuildSubmitMsg(SSqlObj *pSql) { @@ -1421,7 +1424,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode)); + tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode)); return msgLen; } diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 12024d0b8673e1349e5a789d60632a278c7a696a..499c93e0ec96ddf0b3b655286207cf23b1f5694d 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -58,6 +58,8 @@ struct arguments { bool is_raw_time; bool is_use_passwd; char file[TSDB_FILENAME_LEN]; + char dir[TSDB_FILENAME_LEN]; + int threadNum; char* commands; int abort; }; @@ -74,12 +76,14 @@ void shellRunCommandOnServer(TAOS* con, char command[]); void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); +void source_dir(TAOS* con, struct arguments* args); void get_history_path(char* history); void cleanup_handler(void* arg); void exitShell(); int shellDumpResult(TAOS* con, char* fname, int* error_no, bool printMode); void shellPrintNChar(char* str, int width, bool printMode); void shellGetGrantInfo(void *con); +int isCommentLine(char *line); #define max(a, b) ((int)(a) < (int)(b) ? (int)(b) : (int)(a)) /**************** Global variable declarations ****************/ diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 04381f5382195bf5aa4326e6cb957a5dd8a51fdd..ac66ab15d90ddaf3ceada296e713b1a9b3a9f75b 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -111,6 +111,14 @@ TAOS *shellInit(struct arguments *args) { exit(EXIT_SUCCESS); } +#ifdef LINUX + if (args->dir[0] != 0) { + source_dir(con, args); + taos_close(con); + exit(EXIT_SUCCESS); + } +#endif + printf(SERVER_VERSION, taos_get_server_info(con)); return con; @@ -765,7 +773,7 @@ void taos_error(TAOS *con) { taos_free_result(pRes); } -static int isCommentLine(char *line) { +int isCommentLine(char *line) { if (line == NULL) return 1; return regex_match(line, "^\\s*#.*", REG_EXTENDED); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c new file mode 100644 index 0000000000000000000000000000000000000000..3292aa8e04bba29b8e3636a3b4988ae2f4cd8362 --- /dev/null +++ b/src/kit/shell/src/shellImport.c @@ -0,0 +1,266 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _XOPEN_SOURCE +#define _DEFAULT_SOURCE + +#include "os.h" +#include "shell.h" +#include "shellCommand.h" +#include "ttime.h" +#include "tutil.h" + +static char **shellSQLFiles = NULL; +static int32_t shellSQLFileNum = 0; +static char shellTablesSQLFile[TSDB_FILENAME_LEN] = {0}; + +typedef struct { + pthread_t threadID; + int threadIndex; + int totalThreads; + void *taos; +} ShellThreadObj; + +static int shellGetFilesNum(const char *directoryName, const char *prefix) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + if (fscanf(fp, "%d", &fileNum) != 1) { + fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd); + exit(0); + } + + if (fileNum <= 0) { + fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName); + exit(0); + } + + pclose(fp); + return fileNum; +} + +static void shellParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + while (fscanf(fp, "%s", fileArray[fileNum++])) { + if (strcmp(fileArray[fileNum-1], shellTablesSQLFile) == 0) { + fileNum--; + } + if (fileNum >= totalFiles) { + break; + } + } + + if (fileNum != totalFiles) { + fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName); + exit(0); + } + + pclose(fp); +} + +static void shellCheckTablesSQLFile(const char *directoryName) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/tables.sql", directoryName); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + while (fscanf(fp, "%s", shellTablesSQLFile)) { + break; + } + + pclose(fp); +} + +static void shellMallocSQLFiles() +{ + shellSQLFiles = (char**)calloc(shellSQLFileNum, sizeof(char*)); + for (int i = 0; i < shellSQLFileNum; i++) { + shellSQLFiles[i] = calloc(1, TSDB_FILENAME_LEN); + } +} + +static void shellGetDirectoryFileList(char *inputDir) +{ + struct stat fileStat; + if (stat(inputDir, &fileStat) < 0) { + fprintf(stderr, "ERROR: %s not exist\n", inputDir); + exit(0); + } + + if (fileStat.st_mode & S_IFDIR) { + shellCheckTablesSQLFile(inputDir); + shellSQLFileNum = shellGetFilesNum(inputDir, "sql"); + int totalSQLFileNum = shellSQLFileNum; + if (shellTablesSQLFile[0] != 0) { + shellSQLFileNum--; + } + shellMallocSQLFiles(); + shellParseDirectory(inputDir, "sql", shellSQLFiles, shellSQLFileNum); + fprintf(stdout, "\nstart to dispose %d files in %s\n", totalSQLFileNum, inputDir); + } + else { + fprintf(stderr, "ERROR: %s is not a directory\n", inputDir); + exit(0); + } +} + +static void shellSourceFile(TAOS *con, char *fptr) { + wordexp_t full_path; + int read_len = 0; + char * cmd = malloc(MAX_COMMAND_SIZE); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; + + if (wordexp(fptr, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: illegal file name\n"); + return; + } + + char *fname = full_path.we_wordv[0]; + + if (access(fname, R_OK) == -1) { + fprintf(stderr, "ERROR: file %s is not readable\n", fptr); + wordfree(&full_path); + return; + } + + FILE *f = fopen(fname, "r"); + if (f == NULL) { + fprintf(stderr, "ERROR: failed to open file %s\n", fname); + wordfree(&full_path); + return; + } + + fprintf(stdout, "begin import file:%s\n", fname); + + int lineNo = 0; + while ((read_len = getline(&line, &line_len, f)) != -1) { + ++lineNo; + if (read_len >= MAX_COMMAND_SIZE) continue; + line[--read_len] = '\0'; + + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } + + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + + memcpy(cmd + cmd_len, line, read_len); + if (taos_query(con, cmd)) { + fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); + /* free local resouce: allocated memory/metric-meta refcnt */ + TAOS_RES *pRes = taos_use_result(con); + taos_free_result(pRes); + } + + memset(cmd, 0, MAX_COMMAND_SIZE); + cmd_len = 0; + } + + free(cmd); + if (line) free(line); + wordfree(&full_path); + fclose(f); +} + +void* shellImportThreadFp(void *arg) +{ + ShellThreadObj *pThread = (ShellThreadObj*)arg; + for (int f = 0; f < shellSQLFileNum; ++f) { + if (f % pThread->totalThreads == pThread->threadIndex) { + char *SQLFileName = shellSQLFiles[f]; + shellSourceFile(pThread->taos, SQLFileName); + } + } + + return NULL; +} + +static void shellRunImportThreads(struct arguments* args) +{ + pthread_attr_t thattr; + ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); + for (int t = 0; t < args->threadNum; ++t) { + ShellThreadObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = args->threadNum; + pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort); + if (pThread->taos == NULL) { + fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos)); + exit(0); + } + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, shellImportThreadFp, (void*)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } + } + + for (int t = 0; t < args->threadNum; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } + + for (int t = 0; t < args->threadNum; ++t) { + taos_close(threadObj[t].taos); + } + free(threadObj); +} + +void source_dir(TAOS* con, struct arguments* args) { + shellGetDirectoryFileList(args->dir); + int64_t start = taosGetTimestampMs(); + + if (shellTablesSQLFile[0] != 0) { + shellSourceFile(con, shellTablesSQLFile); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", shellTablesSQLFile, (end - start) / 1000.0); + } + + shellRunImportThreads(args); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", args->dir, (end - start) / 1000.0); +} diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 70e87525256b4907baaf8662f1df683de117d88e..e9e988c6c687a8eaefd858d6628f50f45a142af7 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -41,6 +41,8 @@ static struct argp_option options[] = { {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, + {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, + {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {0}}; @@ -90,6 +92,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { strcpy(arguments->file, full_path.we_wordv[0]); wordfree(&full_path); break; + case 'D': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + strcpy(arguments->dir, full_path.we_wordv[0]); + wordfree(&full_path); + break; + case 'T': + arguments->threadNum = atoi(arg); + break; case 'd': arguments->database = arg; break; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 82333020f15fd1c1c760c5efb5e29080c9311201..a7b7e8383bafab2f76682488d131c0d2bfbe65d3 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -62,7 +62,19 @@ int checkVersion() { } // Global configurations -struct arguments args = {NULL, NULL, NULL, NULL, NULL, false, false, "\0", NULL}; +struct arguments args = { + .host = NULL, + .password = NULL, + .user = NULL, + .database = NULL, + .timezone = NULL, + .is_raw_time = false, + .is_use_passwd = false, + .file = "\0", + .dir = "\0", + .threadNum = 5, + .commands = NULL +}; /* * Main function. diff --git a/src/system/detail/src/mgmtConn.c b/src/system/detail/src/mgmtConn.c index 13275300a6f2cc024b17a48346fcb148ec85e1f5..b440a1042e30756f50481f63afb5d82c37c5afc7 100644 --- a/src/system/detail/src/mgmtConn.c +++ b/src/system/detail/src/mgmtConn.c @@ -48,7 +48,7 @@ int mgmtGetConns(SShowObj *pShow, SConnObj *pConn) { pConn = pAcct->pConn; SConnInfo *pConnInfo = pConnShow->connInfo; - while (pConn) { + while (pConn && pConn->pUser) { strcpy(pConnInfo->user, pConn->pUser->user); pConnInfo->ip = pConn->ip; pConnInfo->port = pConn->port; diff --git a/src/system/detail/src/mgmtVgroup.c b/src/system/detail/src/mgmtVgroup.c index 60ce5c9fa72ad7197627c9385ad6fcd52d81e9b9..c9052c094b737842ce6ceca73e44ca237cbc8ad8 100644 --- a/src/system/detail/src/mgmtVgroup.c +++ b/src/system/detail/src/mgmtVgroup.c @@ -40,6 +40,7 @@ void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssiz void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize); void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize); bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode); +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode); void mgmtVgroupActionInit() { mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; @@ -333,8 +334,8 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; if (pVgroup->vnodeGid[i].ip != 0) { - bool ready = mgmtCheckVnodeReady(NULL, pVgroup, pVgroup->vnodeGid + i); - strcpy(pWrite, ready ? "ready" : "unsynced"); + char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); + strcpy(pWrite, vnodeStatus); } else { strcpy(pWrite, "null"); } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 55aa39f747cab5b4837f56dcf0e0544a409a8e9b..3d0b9eb51fdf076834973f1b9abf0f660177b5a7 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -117,6 +117,9 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) { vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); + } else if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_REDIRECT); + dTrace("vid:%d sid:%d, shell submit msg is redirect since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); } else { taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); diff --git a/src/system/lite/src/mgmtBalance.spec.c b/src/system/lite/src/mgmtBalance.spec.c index f55bada0a2f9676bdf86596c8b81f0ba2332f6a3..cf3e999e4f48c09df5da03d55b8084b0d1169168 100644 --- a/src/system/lite/src/mgmtBalance.spec.c +++ b/src/system/lite/src/mgmtBalance.spec.c @@ -52,6 +52,8 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) { return tsModule[moduleType].num != 0; } +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; } + bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; } void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbStatus) {} diff --git a/src/util/src/version.c b/src/util/src/version.c index 4f88d6170c672925f20d2bb009f2b5b0dfa0b594..c85289fb8ad2f661ec4786c20b3e8ec6207b561d 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,5 +1,5 @@ -char version[64] = "1.6.4.2"; +char version[64] = "1.6.4.4"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "b9a62d60dc1d4a41452a9bc94e3a0924485c3a75"; -char gitinfoOfInternal[128] = "e6445addc77e8c96dcb57221fa6ab5dcde0458f7"; -char buildinfo[512] = "Built by root at 2019-12-10 10:31"; +char gitinfo[128] = "d62c5c30231d04a736d437cf428af6e12599bd9f"; +char gitinfoOfInternal[128] = "8094a32d78dc519bd883d01ac2ba6ec49ac57a80"; +char buildinfo[512] = "Built by ubuntu at 2019-12-16 21:40";