From 0fc2985bf23b1ca3fd85253d63f6da39d0568687 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 14 Sep 2021 23:20:38 +0800 Subject: [PATCH] [TD-5757]: taosdump precision. (#7896) --- src/kit/taosdump/taosdump.c | 252 ++++++++++++++++-- tests/pytest/fulltest.sh | 2 +- .../taosdumpTestNanoSupport.py | 6 +- 3 files changed, 228 insertions(+), 32 deletions(-) rename tests/pytest/tools/{taosdemoAllTest/NanoTestCase => }/taosdumpTestNanoSupport.py (98%) diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index fe7616fa17..ef9e584978 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -181,6 +181,7 @@ typedef struct { int32_t threadIndex; int32_t totalThreads; char dbName[TSDB_DB_NAME_LEN]; + int precision; void *taosCon; int64_t rowsOfDumpOut; int64_t tablesOfDumpOut; @@ -246,11 +247,6 @@ static struct argp_option options[] = { {"avro", 'v', 0, 0, "Dump apache avro format data file. By default, dump sql command sequence.", 2}, {"start-time", 'S', "START_TIME", 0, "Start time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00:000+0800 or '2017-10-01 00:00:00.000+0800'", 4}, {"end-time", 'E', "END_TIME", 0, "End time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 5}, -#if TSDB_SUPPORT_NANOSECOND == 1 - {"precision", 'C', "PRECISION", 0, "Specify precision for converting human-readable time to epoch. Valid value is one of ms, us, and ns. Default is ms.", 6}, -#else - {"precision", 'C', "PRECISION", 0, "Use specified precision to convert human-readable time. Valid value is one of ms and us. Default is ms.", 6}, -#endif {"data-batch", 'B', "DATA_BATCH", 0, "Number of data point per insert statement. Max value is 32766. Default is 1.", 3}, {"max-sql-len", 'L', "SQL_LEN", 0, "Max length of one sql. Default is 65480.", 3}, {"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3}, @@ -281,8 +277,11 @@ typedef struct arguments { bool with_property; bool avro; int64_t start_time; + char humanStartTime[28]; int64_t end_time; + char humanEndTime[28]; char precision[8]; + int32_t data_batch; int32_t max_sql_len; int32_t table_batch; // num of table which will be dump into one output file. @@ -296,6 +295,8 @@ typedef struct arguments { bool debug_print; bool verbose_print; bool performance_print; + + int dbCount; } SArguments; /* Our argp parser. */ @@ -318,13 +319,17 @@ static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp, char* dbName); static int32_t taosDumpTable(char *tbName, char *metric, - FILE *fp, TAOS* taosCon, char* dbName); + FILE *fp, TAOS* taosCon, char* dbName, int precision); static int taosDumpTableData(FILE *fp, char *tbName, TAOS* taosCon, char* dbName, + int precision, char *jsonAvroSchema); static int taosCheckParam(struct arguments *arguments); static void taosFreeDbInfos(); -static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName); +static void taosStartDumpOutWorkThreads( + int32_t numOfThread, + char *dbName, + int precision); struct arguments g_args = { // connection option @@ -349,8 +354,10 @@ struct arguments g_args = { false, // schemeonly true, // with_property false, // avro format - -INT64_MAX, // start_time + -INT64_MAX + 1, // start_time + {0}, // humanStartTime INT64_MAX, // end_time + {0}, // humanEndTime "ms", // precision 1, // data_batch TSDB_MAX_SQL_LEN, // max_sql_len @@ -364,7 +371,8 @@ struct arguments g_args = { false, // isDumpIn false, // debug_print false, // verbose_print - false // performance_print + false, // performance_print + 0, // dbCount }; static void errorPrintReqArg2(char *program, char *wrong_arg) @@ -472,12 +480,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { break; case 'S': // parse time here. - g_args.start_time = atol(arg); break; case 'E': - g_args.end_time = atol(arg); - break; - case 'C': break; case 'B': g_args.data_batch = atoi(arg); @@ -550,7 +554,7 @@ static int queryDbImpl(TAOS *taos, char *command) { return 0; } -static void parse_precision_first( +UNUSED_FUNC static void parse_precision_first( int argc, char *argv[], SArguments *arguments) { for (int i = 1; i < argc; i++) { if (strcmp(argv[i], "-C") == 0) { @@ -616,6 +620,73 @@ static void parse_args( } } +static void copyHumanTimeToArg(char *timeStr, bool isStartTime) +{ + if (isStartTime) + strcpy(g_args.humanStartTime, timeStr); + else + strcpy(g_args.humanEndTime, timeStr); +} + +static void copyTimestampToArg(char *timeStr, bool isStartTime) +{ + if (isStartTime) + g_args.start_time = atol(timeStr); + else + g_args.end_time = atol(timeStr); +} + +static void parse_timestamp( + int argc, char *argv[], SArguments *arguments) { + for (int i = 1; i < argc; i++) { + char *tmp; + bool isStartTime = false; + bool isEndTime = false; + + if (strcmp(argv[i], "-S") == 0) { + isStartTime = true; + } else if (strcmp(argv[i], "-E") == 0) { + isEndTime = true; + } + + if (isStartTime || isEndTime) { + if (NULL == argv[i+1]) { + errorPrint("%s need a valid value following!\n", argv[i]); + exit(-1); + } + tmp = strdup(argv[i+1]); + + if (strchr(tmp, ':') && strchr(tmp, '-')) { + copyHumanTimeToArg(tmp, isStartTime); + } else { + copyTimestampToArg(tmp, isStartTime); + } + } + } +} + +static int getPrecisionByString(char *precision) +{ + if (0 == strncasecmp(precision, + "ms", 2)) { + return TSDB_TIME_PRECISION_MILLI; + } else if (0 == strncasecmp(precision, + "us", 2)) { + return TSDB_TIME_PRECISION_MICRO; +#if TSDB_SUPPORT_NANOSECOND == 1 + } else if (0 == strncasecmp(precision, + "ns", 2)) { + return TSDB_TIME_PRECISION_NANO; +#endif + } else { + errorPrint("Invalid time precision: %s", + precision); + } + + return -1; +} + +/* static void parse_timestamp( int argc, char *argv[], SArguments *arguments) { for (int i = 1; i < argc; i++) { @@ -634,6 +705,7 @@ static void parse_timestamp( int64_t tmpEpoch; if (strchr(tmp, ':') && strchr(tmp, '-')) { + strcpy(g_args.humanStartTime, tmp) int32_t timePrec; if (0 == strncasecmp(arguments->precision, "ms", strlen("ms"))) { @@ -672,6 +744,7 @@ static void parse_timestamp( } } } +*/ int main(int argc, char *argv[]) { static char verType[32] = {0}; @@ -682,7 +755,7 @@ int main(int argc, char *argv[]) { /* Parse our arguments; every option seen by parse_opt will be reflected in arguments. */ if (argc > 1) { - parse_precision_first(argc, argv, &g_args); +// parse_precision_first(argc, argv, &g_args); parse_timestamp(argc, argv, &g_args); parse_args(argc, argv, &g_args); } @@ -714,7 +787,9 @@ int main(int argc, char *argv[]) { printf("with_property: %s\n", g_args.with_property?"true":"false"); printf("avro format: %s\n", g_args.avro?"true":"false"); printf("start_time: %" PRId64 "\n", g_args.start_time); + printf("human readable start time: %s \n", g_args.humanStartTime); printf("end_time: %" PRId64 "\n", g_args.end_time); + printf("human readable end time: %s \n", g_args.humanEndTime); printf("precision: %s\n", g_args.precision); printf("data_batch: %d\n", g_args.data_batch); printf("max_sql_len: %d\n", g_args.max_sql_len); @@ -759,7 +834,9 @@ int main(int argc, char *argv[]) { fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false"); fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false"); fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time); + fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime); fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time); + fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime); fprintf(g_fpOfResult, "precision: %s\n", g_args.precision); fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch); fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len); @@ -816,7 +893,8 @@ int main(int argc, char *argv[]) { static void taosFreeDbInfos() { if (g_dbInfos == NULL) return; - for (int i = 0; i < 128; i++) tfree(g_dbInfos[i]); + for (int i = 0; i < g_args.dbCount; i++) + tfree(g_dbInfos[i]); tfree(g_dbInfos); } @@ -1046,6 +1124,88 @@ static int32_t taosSaveTableOfMetricToTempFile( return 0; } +static int getDbCount() +{ + int count; + + TAOS *taos = NULL; + TAOS_RES *result = NULL; + char *command = NULL; + TAOS_ROW row; + + command = (char *)malloc(COMMAND_SIZE); + if (command == NULL) { + errorPrint("%s() LN%d, failed to allocate command buffer\n", __func__, __LINE__); + return 0; + } + + /* Connect to server */ + taos = taos_connect(g_args.host, g_args.user, g_args.password, + NULL, g_args.port); + if (NULL == taos) { + errorPrint("Failed to connect to TDengine server %s\n", g_args.host); + free(command); + return 0; + } + + sprintf(command, "show databases"); + result = taos_query(taos, command); + int32_t code = taos_errno(result); + + if (0 != code) { + errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n", + __func__, __LINE__, command, taos_errstr(result)); + free(command); + return 0; + } + + TAOS_FIELD *fields = taos_fetch_fields(result); + + while ((row = taos_fetch_row(result)) != NULL) { + // sys database name : 'log', but subsequent version changed to 'log' + if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log", + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) + && (!g_args.allow_sys)) { + continue; + } + + if (g_args.databases) { // input multi dbs + for (int i = 0; g_args.arg_list[i]; i++) { + if (strncasecmp(g_args.arg_list[i], + (char *)row[TSDB_SHOW_DB_NAME_INDEX], + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) + goto _dump_db_point; + } + continue; + } else if (!g_args.all_databases) { // only input one db + if (strncasecmp(g_args.arg_list[0], + (char *)row[TSDB_SHOW_DB_NAME_INDEX], + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) + goto _dump_db_point; + else + continue; + } + +_dump_db_point: + + count++; + + if (g_args.databases) { + if (count > g_args.arg_list_len) break; + + } else if (!g_args.all_databases) { + if (count >= 1) break; + } + } + + if (count == 0) { + errorPrint("%d databases valid to dump\n", count); + } + + free(command); + return count; +} + static int taosDumpOut() { TAOS *taos = NULL; TAOS_RES *result = NULL; @@ -1070,7 +1230,14 @@ static int taosDumpOut() { return -1; } - g_dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *)); + g_args.dbCount = getDbCount(); + + if (0 == g_args.dbCount) { + errorPrint("%d databases valid to dump\n", g_args.dbCount); + return -1; + } + + g_dbInfos = (SDbInfo **)calloc(g_args.dbCount, sizeof(SDbInfo *)); if (g_dbInfos == NULL) { errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__); @@ -1165,9 +1332,9 @@ _dump_db_point: g_dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); g_dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX])); - tstrncpy(g_dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], - min(8, fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes + 1)); - //g_dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]); + tstrncpy(g_dbInfos[count]->precision, + (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], + DB_PRECISION_LEN); g_dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]); } count++; @@ -1263,8 +1430,10 @@ _dump_db_point: } // start multi threads to dumpout + taosStartDumpOutWorkThreads(totalNumOfThread, - g_dbInfos[0]->name); + g_dbInfos[0]->name, + getPrecisionByString(g_dbInfos[0]->precision)); char tmpFileName[MAX_FILE_NAME_LEN]; _clean_tmp_file: @@ -1453,7 +1622,7 @@ static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema) static int32_t taosDumpTable( char *tbName, char *metric, - FILE *fp, TAOS* taosCon, char* dbName) { + FILE *fp, TAOS* taosCon, char* dbName, int precision) { int count = 0; STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) @@ -1504,7 +1673,7 @@ static int32_t taosDumpTable( int32_t ret = 0; if (!g_args.schemaonly) { - ret = taosDumpTableData(fp, tbName, taosCon, dbName, + ret = taosDumpTableData(fp, tbName, taosCon, dbName, precision, jsonAvroSchema); } @@ -1595,7 +1764,8 @@ static void* taosDumpOutWorkThreadFp(void *arg) int ret = taosDumpTable( tableRecord.name, tableRecord.metric, - fp, pThread->taosCon, pThread->dbName); + fp, pThread->taosCon, pThread->dbName, + pThread->precision); if (ret >= 0) { // TODO: sum table count and table rows by self pThread->tablesOfDumpOut++; @@ -1644,7 +1814,7 @@ static void* taosDumpOutWorkThreadFp(void *arg) return NULL; } -static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName) +static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName, int precision) { pthread_attr_t thattr; SThreadParaObj *threadObj = @@ -1663,6 +1833,7 @@ static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName) pThread->threadIndex = t; pThread->totalThreads = numOfThread; tstrncpy(pThread->dbName, dbName, TSDB_DB_NAME_LEN); + pThread->precision = precision; pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password, NULL, g_args.port); if (pThread->taosCon == NULL) { @@ -1912,7 +2083,8 @@ static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon) { } // start multi threads to dumpout - taosStartDumpOutWorkThreads(numOfThread, dbInfo->name); + taosStartDumpOutWorkThreads(numOfThread, dbInfo->name, + getPrecisionByString(dbInfo->precision)); for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) { sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); (void)remove(tmpBuf); @@ -2190,14 +2362,38 @@ static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbN } static int taosDumpTableData(FILE *fp, char *tbName, - TAOS* taosCon, char* dbName, + TAOS* taosCon, char* dbName, int precision, char *jsonAvroSchema) { int64_t totalRows = 0; char sqlstr[1024] = {0}; + + int64_t start_time, end_time; + if (strlen(g_args.humanStartTime)) { + if (TSDB_CODE_SUCCESS != taosParseTime( + g_args.humanStartTime, &start_time, strlen(g_args.humanStartTime), + precision, 0)) { + errorPrint("Input %s, time format error!\n", g_args.humanStartTime); + return -1; + } + } else { + start_time = g_args.start_time; + } + + if (strlen(g_args.humanEndTime)) { + if (TSDB_CODE_SUCCESS != taosParseTime( + g_args.humanEndTime, &end_time, strlen(g_args.humanEndTime), + precision, 0)) { + errorPrint("Input %s, time format error!\n", g_args.humanEndTime); + return -1; + } + } else { + end_time = g_args.end_time; + } + sprintf(sqlstr, "select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;", - dbName, tbName, g_args.start_time, g_args.end_time); + dbName, tbName, start_time, end_time); TAOS_RES* res = taos_query(taosCon, sqlstr); int32_t code = taos_errno(res); diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index db846f2bd7..3df42cbf33 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -181,7 +181,7 @@ python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoIns python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoQuery.py python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanosubscribe.py python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestInsertTime_step.py -python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdumpTestNanoSupport.py +python3 test.py -f tools/taosdumpTestNanoSupport.py # python3 ./test.py -f tsdb/tsdbComp.py diff --git a/tests/pytest/tools/taosdemoAllTest/NanoTestCase/taosdumpTestNanoSupport.py b/tests/pytest/tools/taosdumpTestNanoSupport.py similarity index 98% rename from tests/pytest/tools/taosdemoAllTest/NanoTestCase/taosdumpTestNanoSupport.py rename to tests/pytest/tools/taosdumpTestNanoSupport.py index a2059ec924..55f1671daa 100644 --- a/tests/pytest/tools/taosdemoAllTest/NanoTestCase/taosdumpTestNanoSupport.py +++ b/tests/pytest/tools/taosdumpTestNanoSupport.py @@ -136,7 +136,7 @@ class TDTestCase: # dump part data with -S -E os.system( - '%staosdump --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -C ns -o ./taosdumptest/dumptmp2 ' % + '%staosdump --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -o ./taosdumptest/dumptmp2 ' % binPath) os.system( '%staosdump --databases timedb1 -S 1625068810000000000 -o ./taosdumptest/dumptmp3 ' % @@ -218,7 +218,7 @@ class TDTestCase: "%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath) os.system( - '%staosdump --databases timedb1 -S 1625068810000000 -E 1625068860000000 -C us -o ./taosdumptest/dumptmp2 ' % + '%staosdump --databases timedb1 -S 1625068810000000 -E 1625068860000000 -o ./taosdumptest/dumptmp2 ' % binPath) os.system( '%staosdump --databases timedb1 -S 1625068810000000 -o ./taosdumptest/dumptmp3 ' % @@ -299,7 +299,7 @@ class TDTestCase: "%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath) os.system( - '%staosdump --databases timedb1 -S 1625068810000 -E 1625068860000 -C ms -o ./taosdumptest/dumptmp2 ' % + '%staosdump --databases timedb1 -S 1625068810000 -E 1625068860000 -o ./taosdumptest/dumptmp2 ' % binPath) os.system( '%staosdump --databases timedb1 -S 1625068810000 -o ./taosdumptest/dumptmp3 ' % -- GitLab