diff --git a/src/kit/shell/CMakeLists.txt b/src/kit/shell/CMakeLists.txt index b6babc5bc53aa254e0372dbbfd235bdd4cef878a..d36c1e3fccc4ee7c5eae359f975e2ac1faa6c135 100644 --- a/src/kit/shell/CMakeLists.txt +++ b/src/kit/shell/CMakeLists.txt @@ -36,6 +36,7 @@ ELSEIF (TD_DARWIN) LIST(APPEND SRC ./src/shellDarwin.c) LIST(APPEND SRC ./src/shellCommand.c) LIST(APPEND SRC ./src/shellImport.c) + LIST(APPEND SRC ./src/shellCheck.c) ADD_EXECUTABLE(shell ${SRC}) # linking with dylib TARGET_LINK_LIBRARIES(shell taos) diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 50bceb1a7169373d1c832cadf03d6d601cace264..c403bd3de808976378ace04aee225fa17c96a5dc 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -52,6 +52,7 @@ typedef struct SShellArguments { char dir[TSDB_FILENAME_LEN]; int threadNum; char* commands; + int check; int abort; int port; int pktLen; @@ -72,6 +73,7 @@ void write_history(); void source_file(TAOS* con, char* fptr); void source_dir(TAOS* con, SShellArguments* args); void get_history_path(char* history); +void shellCheck(TAOS* con, SShellArguments* args); void cleanup_handler(void* arg); void exitShell(); int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode); diff --git a/src/kit/shell/src/shellCheck.c b/src/kit/shell/src/shellCheck.c new file mode 100644 index 0000000000000000000000000000000000000000..3ff2582407e4eac8624dae573440b36a57cc1409 --- /dev/null +++ b/src/kit/shell/src/shellCheck.c @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _GNU_SOURCE +#define _XOPEN_SOURCE +#define _DEFAULT_SOURCE + +#include "os.h" +#include "shell.h" +#include "shellCommand.h" +#include "tglobal.h" +#include "tutil.h" + +#define SHELL_SQL_LEN 1024 +static int32_t tbNum = 0; +static int32_t tbMallocNum = 0; +static char ** tbNames = NULL; +static int32_t checkedNum = 0; +static int32_t errorNum = 0; + +typedef struct { + pthread_t threadID; + int threadIndex; + int totalThreads; + void * taos; + char * db; +} ShellThreadObj; + +static int32_t shellUseDb(TAOS *con, char *db) { + if (db == NULL) { + fprintf(stdout, "no dbname input\n"); + return -1; + } + + char sql[SHELL_SQL_LEN] = {0}; + snprintf(sql, SHELL_SQL_LEN, "use %s", db); + + TAOS_RES *pSql = taos_query(con, sql); + int32_t code = taos_errno(pSql); + if (code != 0) { + fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql)); + } + + taos_free_result(pSql); + return code; +} + +static int32_t shellShowTables(TAOS *con, char *db) { + char sql[SHELL_SQL_LEN] = {0}; + snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db); + + TAOS_RES *pSql = taos_query(con, sql); + int32_t code = taos_errno(pSql); + + if (code != 0) { + fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql)); + } else { + TAOS_ROW row; + while ((row = taos_fetch_row(pSql))) { + int32_t tbIndex = tbNum++; + if (tbMallocNum < tbNum) { + tbMallocNum = (tbMallocNum * 2 + 1); + tbNames = realloc(tbNames, tbMallocNum * sizeof(char *)); + if (tbNames == NULL) { + fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + break; + } + } + + tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN); + strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN); + if (tbIndex % 100000 == 0 && tbIndex != 0) { + fprintf(stdout, "%d tablenames fetched\n", tbIndex); + } + } + } + + taos_free_result(pSql); + + fprintf(stdout, "total %d tablenames fetched, over\n", tbNum); + return code; +} + +static void shellFreeTbnames() { + for (int32_t i = 0; i < tbNum; ++i) { + free(tbNames[i]); + } + free(tbNames); +} + +static void *shellCheckThreadFp(void *arg) { + ShellThreadObj *pThread = (ShellThreadObj *)arg; + + int32_t interval = tbNum / pThread->totalThreads + 1; + int32_t start = pThread->threadIndex * interval; + int32_t end = (pThread->threadIndex + 1) * interval; + + if (end > tbNum) end = tbNum + 1; + + char file[32] = {0}; + snprintf(file, 32, "tb%d.txt", pThread->threadIndex); + + FILE *fp = fopen(file, "w"); + if (!fp) { + fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno)); + return NULL; + } + + char sql[SHELL_SQL_LEN]; + for (int32_t t = start; t < end; ++t) { + char *tbname = tbNames[t]; + if (tbname == NULL) break; + + snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname); + + TAOS_RES *pSql = taos_query(pThread->taos, sql); + int32_t code = taos_errno(pSql); + if (code != 0) { + int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname); + fwrite(sql, 1, len, fp); + atomic_add_fetch_32(&errorNum, 1); + } + + int32_t cnum = atomic_add_fetch_32(&checkedNum, 1); + if (cnum % 5000 == 0 && cnum != 0) { + fprintf(stdout, "%d tables checked\n", cnum); + } + + taos_free_result(pSql); + } + + fsync(fileno(fp)); + fclose(fp); + + return NULL; +} + +static void shellRunCheckThreads(TAOS *con, SShellArguments *args) { + pthread_attr_t thattr; + ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); + for (int t = 0; t < args->threadNum; ++t) { + ShellThreadObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = args->threadNum; + pThread->taos = con; + pThread->db = args->database; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } + } + + for (int t = 0; t < args->threadNum; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } + + for (int t = 0; t < args->threadNum; ++t) { + taos_close(threadObj[t].taos); + } + free(threadObj); +} + +void shellCheck(TAOS *con, SShellArguments *args) { + int64_t start = taosGetTimestampMs(); + + if (shellUseDb(con, args->database) != 0) { + shellFreeTbnames(); + return; + } + + if (shellShowTables(con, args->database) != 0) { + shellFreeTbnames(); + return; + } + + fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, args->threadNum); + shellRunCheckThreads(con, args); + + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum, + (end - start) / 1000.0); +} \ No newline at end of file diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index a6f5869936beb7e5d9a17d0ff9ecc9fbd499db2f..66bb93a884a37d6f8888e5bd5435c5f2b9ad6a47 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -121,12 +121,17 @@ TAOS *shellInit(SShellArguments *args) { taos_close(con); exit(EXIT_SUCCESS); } + + if (args->check != 0) { + shellCheck(con, args); + taos_close(con); + exit(EXIT_SUCCESS); + } #endif return con; } - static bool isEmptyCommand(const char* cmd) { for (char c = *cmd++; c != 0; c = *cmd++) { if (c != ' ' && c != '\t' && c != ';') { @@ -412,7 +417,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { #ifdef WINDOWS if (tt < 0) tt = 0; #endif - if (tt < 0 && ms != 0) { + if (tt <= 0 && ms != 0) { tt--; if (precision == TSDB_TIME_PRECISION_MICRO) { ms += 1000000; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 07b21531a7f1cf9fc6346a27373fe9de8895f2b2..3f6b3da9bf4b1f80f58a06613e8571ca16892c46 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -45,6 +45,7 @@ static struct argp_option options[] = { {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, + {"check", 'k', "CHECK", 0, "Check tables."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."}, @@ -130,6 +131,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { return -1; } break; + case 'k': + arguments->check = atoi(arg); + break; case 'd': arguments->database = arg; break;