diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 4dfecabf95c74b0546520b17b7e1384a7c775462..b0235a7dcaaecb8e9f9a2ea651358380f26cd42b 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -25,7 +25,6 @@ #include "tsclient.h" #include "tsdb.h" #include "tutil.h" -#include #define TSDB_SUPPORT_NANOSECOND 1 @@ -120,7 +119,7 @@ enum _show_tables_index { TSDB_MAX_SHOW_TABLES }; -// ---------------------------------- DESCRIBE METRIC CONFIGURE ------------------------------ +// ---------------------------------- DESCRIBE STABLE CONFIGURE ------------------------------ enum _describe_table_index { TSDB_DESCRIBE_METRIC_FIELD_INDEX, TSDB_DESCRIBE_METRIC_TYPE_INDEX, @@ -148,10 +147,28 @@ extern char version[]; #define DB_PRECISION_LEN 8 #define DB_STATUS_LEN 16 +typedef struct { + char name[TSDB_TABLE_NAME_LEN]; + bool belongStb; + char stable[TSDB_TABLE_NAME_LEN]; +} TableInfo; + +typedef struct { + char name[TSDB_TABLE_NAME_LEN]; + char stable[TSDB_TABLE_NAME_LEN]; +} TableRecord; + +typedef struct { + bool isStable; + int64_t dumpNtbCount; + TableRecord **dumpNtbInfos; + TableRecord tableRecord; +} TableRecordInfo; + typedef struct { char name[TSDB_DB_NAME_LEN]; char create_time[32]; - int32_t ntables; + int64_t ntables; int32_t vgroups; int16_t replica; int16_t quorum; @@ -171,28 +188,22 @@ typedef struct { char precision[DB_PRECISION_LEN]; // time resolution int8_t update; char status[DB_STATUS_LEN]; + int64_t dumpTbCount; + TableRecordInfo **dumpTbInfos; } SDbInfo; -typedef struct { - char name[TSDB_TABLE_NAME_LEN]; - char metric[TSDB_TABLE_NAME_LEN]; -} STableRecord; - -typedef struct { - bool isMetric; - STableRecord tableRecord; -} STableRecordInfo; - typedef struct { pthread_t threadID; int32_t threadIndex; int32_t totalThreads; char dbName[TSDB_DB_NAME_LEN]; - int precision; - void *taosCon; + char stbName[TSDB_TABLE_NAME_LEN]; + int precision; + TAOS *taos; int64_t rowsOfDumpOut; int64_t tablesOfDumpOut; -} SThreadParaObj; + int64_t tableFrom; +} threadInfo; typedef struct { int64_t totalRowsOfDumpOut; @@ -204,6 +215,7 @@ typedef struct { static int64_t g_totalDumpOutRows = 0; SDbInfo **g_dbInfos = NULL; +TableInfo *g_tablesList = NULL; const char *argp_program_version = version; const char *argp_program_bug_address = ""; @@ -300,13 +312,13 @@ typedef struct arguments { int32_t thread_num; int abort; char **arg_list; - int arg_list_len; - bool isDumpIn; - bool debug_print; - bool verbose_print; - bool performance_print; + int arg_list_len; + bool isDumpIn; + bool debug_print; + bool verbose_print; + bool performance_print; - int dbCount; + int dumpDbCount; } SArguments; /* Our argp parser. */ @@ -321,25 +333,21 @@ static int taosDumpOut(); static int taosDumpIn(); static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp); -static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon); -static int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon, - char* dbName); -static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, +//static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taos); +static int dumpStable(char *table, FILE *fp, TAOS* taos, + SDbInfo *dbInfo); +static int taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp, char* dbName); -static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, +static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable, int numOfCols, FILE *fp, char* dbName); -static int32_t taosDumpTable(char *tbName, char *metric, - FILE *fp, TAOS* taosCon, char* dbName, int precision); -static int taosDumpTableData(FILE *fp, char *tbName, - TAOS* taosCon, char* dbName, +static int64_t taosDumpTable(char *tbName, char *stable, + FILE *fp, TAOS* taos, char* dbName, int precision); +static int64_t taosDumpTableData(FILE *fp, char *tbName, + TAOS* taos, char* dbName, int precision, char *jsonAvroSchema); static int taosCheckParam(struct arguments *arguments); static void taosFreeDbInfos(); -static void taosStartDumpOutWorkThreads( - int32_t numOfThread, - char *dbName, - int precision); struct arguments g_args = { // connection option @@ -383,7 +391,7 @@ struct arguments g_args = { false, // debug_print false, // verbose_print false, // performance_print - 0, // dbCount + 0, // dumpDbCount }; // get taosdump commit number version @@ -585,66 +593,29 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { } static int queryDbImpl(TAOS *taos, char *command) { - int i; TAOS_RES *res = NULL; int32_t code = -1; - for (i = 0; i < 5; i++) { - if (NULL != res) { - taos_free_result(res); - res = NULL; - } - - res = taos_query(taos, command); - code = taos_errno(res); - if (0 == code) { - break; - } + if (NULL != res) { + taos_free_result(res); + res = NULL; } + res = taos_query(taos, command); + code = taos_errno(res); + if (code != 0) { - errorPrint("Failed to run <%s>, reason: %s\n", command, taos_errstr(res)); + errorPrint("Failed to run <%s>, reason: %s\n", + command, taos_errstr(res)); taos_free_result(res); //taos_close(taos); - return -1; + return code; } taos_free_result(res); return 0; } -UNUSED_FUNC static void parse_precision_first( - int argc, char *argv[], SArguments *arguments) { - for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "-C") == 0) { - if (NULL == argv[i+1]) { - errorPrint("%s need a valid value following!\n", argv[i]); - exit(-1); - } - char *tmp = strdup(argv[i+1]); - if (tmp == NULL) { - errorPrint("%s() LN%d, strdup() cannot allocate memory\n", - __func__, __LINE__); - exit(-1); - } - if ((0 != strncasecmp(tmp, "ms", strlen("ms"))) - && (0 != strncasecmp(tmp, "us", strlen("us"))) -#if TSDB_SUPPORT_NANOSECOND == 1 - && (0 != strncasecmp(tmp, "ns", strlen("ns"))) -#endif - ) { - // - errorPrint("input precision: %s is invalid value\n", tmp); - free(tmp); - exit(-1); - } - tstrncpy(g_args.precision, tmp, - min(DB_PRECISION_LEN, strlen(tmp) + 1)); - free(tmp); - } - } -} - static void parse_args( int argc, char *argv[], SArguments *arguments) { @@ -779,245 +750,42 @@ static int getPrecisionByString(char *precision) return -1; } -/* -static void parse_timestamp( - int argc, char *argv[], SArguments *arguments) { - for (int i = 1; i < argc; i++) { - if ((strcmp(argv[i], "-S") == 0) - || (strcmp(argv[i], "-E") == 0)) { - if (NULL == argv[i+1]) { - errorPrint("%s need a valid value following!\n", argv[i]); - exit(-1); - } - char *tmp = strdup(argv[i+1]); - if (NULL == tmp) { - errorPrint("%s() LN%d, strdup() cannot allocate memory\n", - __func__, __LINE__); - exit(-1); - } - - int64_t tmpEpoch; - if (strchr(tmp, ':') && strchr(tmp, '-')) { - strcpy(g_args.humanStartTime, tmp) - int32_t timePrec; - if (0 == strncasecmp(arguments->precision, - "ms", strlen("ms"))) { - timePrec = TSDB_TIME_PRECISION_MILLI; - } else if (0 == strncasecmp(arguments->precision, - "us", strlen("us"))) { - timePrec = TSDB_TIME_PRECISION_MICRO; -#if TSDB_SUPPORT_NANOSECOND == 1 - } else if (0 == strncasecmp(arguments->precision, - "ns", strlen("ns"))) { - timePrec = TSDB_TIME_PRECISION_NANO; -#endif - } else { - errorPrint("Invalid time precision: %s", - arguments->precision); - free(tmp); - return; - } - - if (TSDB_CODE_SUCCESS != taosParseTime( - tmp, &tmpEpoch, strlen(tmp), - timePrec, 0)) { - errorPrint("Input %s, end time error!\n", tmp); - free(tmp); - return; - } - } else { - tstrncpy(arguments->precision, "n/a", strlen("n/a") + 1); - tmpEpoch = atoll(tmp); - } - - sprintf(argv[i+1], "%"PRId64"", tmpEpoch); - debugPrint("%s() LN%d, tmp is: %s, argv[%d]: %s\n", - __func__, __LINE__, tmp, i, argv[i]); - free(tmp); - } - } -} -*/ - -int main(int argc, char *argv[]) { - static char verType[32] = {0}; - sprintf(verType, "version: %s\n", version); - argp_program_version = verType; - - int ret = 0; - /* Parse our arguments; every option seen by parse_opt will be - reflected in arguments. */ - if (argc > 1) { -// parse_precision_first(argc, argv, &g_args); - parse_timestamp(argc, argv, &g_args); - parse_args(argc, argv, &g_args); - } - - argp_parse(&argp, argc, argv, 0, 0, &g_args); - - if (g_args.abort) { -#ifndef _ALPINE - error(10, 0, "ABORTED"); -#else - abort(); -#endif - } - - printf("====== arguments config ======\n"); - { - printf("host: %s\n", g_args.host); - printf("user: %s\n", g_args.user); - printf("password: %s\n", g_args.password); - printf("port: %u\n", g_args.port); - printf("mysqlFlag: %d\n", g_args.mysqlFlag); - printf("outpath: %s\n", g_args.outpath); - printf("inpath: %s\n", g_args.inpath); - printf("resultFile: %s\n", g_args.resultFile); - printf("encode: %s\n", g_args.encode); - printf("all_databases: %s\n", g_args.all_databases?"true":"false"); - printf("databases: %d\n", g_args.databases); - printf("databasesSeq: %s\n", g_args.databasesSeq); - printf("schemaonly: %s\n", g_args.schemaonly?"true":"false"); - printf("with_property: %s\n", g_args.with_property?"true":"false"); - printf("avro format: %s\n", g_args.avro?"true":"false"); - printf("start_time: %" PRId64 "\n", g_args.start_time); - printf("human readable start time: %s \n", g_args.humanStartTime); - printf("end_time: %" PRId64 "\n", g_args.end_time); - printf("human readable end time: %s \n", g_args.humanEndTime); - printf("precision: %s\n", g_args.precision); - printf("data_batch: %d\n", g_args.data_batch); - printf("max_sql_len: %d\n", g_args.max_sql_len); - printf("table_batch: %d\n", g_args.table_batch); - printf("thread_num: %d\n", g_args.thread_num); - printf("allow_sys: %d\n", g_args.allow_sys); - printf("abort: %d\n", g_args.abort); - printf("isDumpIn: %d\n", g_args.isDumpIn); - printf("arg_list_len: %d\n", g_args.arg_list_len); - printf("debug_print: %d\n", g_args.debug_print); - - for (int32_t i = 0; i < g_args.arg_list_len; i++) { - printf("arg_list[%d]: %s\n", i, g_args.arg_list[i]); - } - } - printf("==============================\n"); - if (taosCheckParam(&g_args) < 0) { - exit(EXIT_FAILURE); - } - - g_fpOfResult = fopen(g_args.resultFile, "a"); - if (NULL == g_fpOfResult) { - errorPrint("Failed to open %s for save result\n", g_args.resultFile); - exit(-1); - }; - - fprintf(g_fpOfResult, "#############################################################################\n"); - fprintf(g_fpOfResult, "============================== arguments config =============================\n"); - { - fprintf(g_fpOfResult, "host: %s\n", g_args.host); - fprintf(g_fpOfResult, "user: %s\n", g_args.user); - fprintf(g_fpOfResult, "password: %s\n", g_args.password); - fprintf(g_fpOfResult, "port: %u\n", g_args.port); - fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag); - fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath); - fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath); - fprintf(g_fpOfResult, "resultFile: %s\n", g_args.resultFile); - fprintf(g_fpOfResult, "encode: %s\n", g_args.encode); - fprintf(g_fpOfResult, "all_databases: %s\n", g_args.all_databases?"true":"false"); - fprintf(g_fpOfResult, "databases: %d\n", g_args.databases); - fprintf(g_fpOfResult, "databasesSeq: %s\n", g_args.databasesSeq); - fprintf(g_fpOfResult, "schemaonly: %s\n", g_args.schemaonly?"true":"false"); - fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false"); - fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false"); - fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time); - fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime); - fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time); - fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime); - fprintf(g_fpOfResult, "precision: %s\n", g_args.precision); - fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch); - fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len); - fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch); - fprintf(g_fpOfResult, "thread_num: %d\n", g_args.thread_num); - fprintf(g_fpOfResult, "allow_sys: %d\n", g_args.allow_sys); - fprintf(g_fpOfResult, "abort: %d\n", g_args.abort); - fprintf(g_fpOfResult, "isDumpIn: %d\n", g_args.isDumpIn); - fprintf(g_fpOfResult, "arg_list_len: %d\n", g_args.arg_list_len); - - for (int32_t i = 0; i < g_args.arg_list_len; i++) { - fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, g_args.arg_list[i]); - } - } - - g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN); - - time_t tTime = time(NULL); - struct tm tm = *localtime(&tTime); - - if (g_args.isDumpIn) { - fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n"); - fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n", - tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); - if (taosDumpIn() < 0) { - ret = -1; - } - } else { - fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n"); - fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n", - tm.tm_year + 1900, tm.tm_mon + 1, - tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); - if (taosDumpOut() < 0) { - ret = -1; - } else { - fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n"); - fprintf(g_fpOfResult, "# total database count: %d\n", - g_resultStatistics.totalDatabasesOfDumpOut); - fprintf(g_fpOfResult, "# total super table count: %d\n", - g_resultStatistics.totalSuperTblsOfDumpOut); - fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n", - g_resultStatistics.totalChildTblsOfDumpOut); - fprintf(g_fpOfResult, "# total row count: %"PRId64"\n", - g_resultStatistics.totalRowsOfDumpOut); - } - } - - fprintf(g_fpOfResult, "\n"); - fclose(g_fpOfResult); - - return ret; -} - static void taosFreeDbInfos() { if (g_dbInfos == NULL) return; - for (int i = 0; i < g_args.dbCount; i++) + for (int i = 0; i < g_args.dumpDbCount; i++) tfree(g_dbInfos[i]); tfree(g_dbInfos); } // check table is normal table or super table static int taosGetTableRecordInfo( - char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) { + char *dbName, + char *table, TableRecordInfo *pTableRecordInfo, TAOS *taos) { TAOS_ROW row = NULL; bool isSet = false; TAOS_RES *result = NULL; - memset(pTableRecordInfo, 0, sizeof(STableRecordInfo)); + memset(pTableRecordInfo, 0, sizeof(TableRecordInfo)); - char* tempCommand = (char *)malloc(COMMAND_SIZE); - if (tempCommand == NULL) { - errorPrint("%s() LN%d, failed to allocate memory\n", - __func__, __LINE__); - return -1; + char command[COMMAND_SIZE]; + + sprintf(command, "USE %s", dbName); + result = taos_query(taos, command); + int32_t code = taos_errno(result); + if (code != 0) { + errorPrint("invalid database %s, reason: %s\n", + dbName, taos_errstr(result)); + return 0; } - sprintf(tempCommand, "show tables like %s", table); + sprintf(command, "SHOW TABLES LIKE \'%s\'", table); - result = taos_query(taosCon, tempCommand); - int32_t code = taos_errno(result); + result = taos_query(taos, command); + code = taos_errno(result); if (code != 0) { - errorPrint("%s() LN%d, failed to run command %s\n", - __func__, __LINE__, tempCommand); - free(tempCommand); + errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n", + __func__, __LINE__, command, taos_errstr(result)); taos_free_result(result); return -1; } @@ -1026,12 +794,12 @@ static int taosGetTableRecordInfo( while ((row = taos_fetch_row(result)) != NULL) { isSet = true; - pTableRecordInfo->isMetric = false; + pTableRecordInfo->isStable = false; tstrncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], min(TSDB_TABLE_NAME_LEN, fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1)); - tstrncpy(pTableRecordInfo->tableRecord.metric, + tstrncpy(pTableRecordInfo->tableRecord.stable, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], min(TSDB_TABLE_NAME_LEN, fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1)); @@ -1042,27 +810,25 @@ static int taosGetTableRecordInfo( result = NULL; if (isSet) { - free(tempCommand); return 0; } - sprintf(tempCommand, "show stables like %s", table); + sprintf(command, "SHOW STABLES LIKE \'%s\'", table); - result = taos_query(taosCon, tempCommand); + result = taos_query(taos, command); code = taos_errno(result); if (code != 0) { - errorPrint("%s() LN%d, failed to run command %s\n", - __func__, __LINE__, tempCommand); - free(tempCommand); + errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n", + __func__, __LINE__, command, taos_errstr(result)); taos_free_result(result); return -1; } while ((row = taos_fetch_row(result)) != NULL) { isSet = true; - pTableRecordInfo->isMetric = true; - tstrncpy(pTableRecordInfo->tableRecord.metric, table, + pTableRecordInfo->isStable = true; + tstrncpy(pTableRecordInfo->tableRecord.stable, table, TSDB_TABLE_NAME_LEN); break; } @@ -1071,248 +837,540 @@ static int taosGetTableRecordInfo( result = NULL; if (isSet) { - free(tempCommand); return 0; } - errorPrint("%s() LN%d, invalid table/metric %s\n", + errorPrint("%s() LN%d, invalid table/stable %s\n", __func__, __LINE__, table); - free(tempCommand); return -1; } +static int inDatabasesSeq( + char *name, + int len) +{ + if (strstr(g_args.databasesSeq, ",") == NULL) { + if (0 == strncmp(g_args.databasesSeq, name, len)) { + return 0; + } + } else { + char *dupSeq = strdup(g_args.databasesSeq); + char *running = dupSeq; + char *dbname = strsep(&running, ","); + while (dbname) { + if (0 == strncmp(dbname, name, len)) { + tfree(dupSeq); + return 0; + } -static int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter, - char* metric, int* fd) { - STableRecord tableRecord; - - if (-1 == *fd) { - *fd = open(".tables.tmp.0", - O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (*fd == -1) { - errorPrint("%s() LN%d, failed to open temp file: .tables.tmp.0\n", - __func__, __LINE__); - return -1; + dbname = strsep(&running, ","); } - } - memset(&tableRecord, 0, sizeof(STableRecord)); - tstrncpy(tableRecord.name, meter, TSDB_TABLE_NAME_LEN); - tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN); + } - taosWrite(*fd, &tableRecord, sizeof(STableRecord)); - return 0; + return -1; } -static int32_t taosSaveTableOfMetricToTempFile( - TAOS *taosCon, char* metric, - int32_t* totalNumOfThread) { - TAOS_ROW row; - int fd = -1; - STableRecord tableRecord; +static int getDumpDbCount() +{ + int count = 0; - char* tmpCommand = (char *)malloc(COMMAND_SIZE); - if (tmpCommand == NULL) { - errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__); - return -1; + TAOS *taos = NULL; + TAOS_RES *result = NULL; + char *command = "show databases"; + TAOS_ROW row; + + /* Connect to server */ + taos = taos_connect(g_args.host, g_args.user, g_args.password, + NULL, g_args.port); + if (NULL == taos) { + errorPrint("Failed to connect to TDengine server %s\n", g_args.host); + return 0; } - sprintf(tmpCommand, "select tbname from %s", metric); + result = taos_query(taos, command); + int32_t code = taos_errno(result); - TAOS_RES *res = taos_query(taosCon, tmpCommand); - int32_t code = taos_errno(res); - if (code != 0) { - errorPrint("%s() LN%d, failed to run command %s\n", - __func__, __LINE__, tmpCommand); - free(tmpCommand); - taos_free_result(res); - return -1; + if (0 != code) { + errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", + __func__, __LINE__, command, taos_errstr(result)); + return 0; + } + + TAOS_FIELD *fields = taos_fetch_fields(result); + + while ((row = taos_fetch_row(result)) != NULL) { + // sys database name : 'log', but subsequent version changed to 'log' + if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log", + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) + && (!g_args.allow_sys)) { + continue; + } + + if (g_args.databases) { // input multi dbs + if (inDatabasesSeq( + (char *)row[TSDB_SHOW_DB_NAME_INDEX], + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0) + continue; + } else if (!g_args.all_databases) { // only input one db + if (strncasecmp(g_args.arg_list[0], + (char *)row[TSDB_SHOW_DB_NAME_INDEX], + fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0) + continue; + } + + count++; } - free(tmpCommand); - char tmpBuf[MAX_FILE_NAME_LEN]; - memset(tmpBuf, 0, MAX_FILE_NAME_LEN); - sprintf(tmpBuf, ".select-tbname.tmp"); - fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", + if (count == 0) { + errorPrint("%d databases valid to dump\n", count); + } + + return count; +} + +static int64_t dumpNormalTableWithoutStb(TAOS *taos, SDbInfo *dbInfo, char *ntbName) +{ + int64_t count = 0; + + char tmpBuf[4096] = {0}; + FILE *fp = NULL; + + if (g_args.outpath[0] != 0) { + sprintf(tmpBuf, "%s/%s.%s.sql", + g_args.outpath, dbInfo->name, ntbName); + } else { + sprintf(tmpBuf, "%s.%s.sql", + dbInfo->name, ntbName); + } + + fp = fopen(tmpBuf, "w"); + if (fp == NULL) { + errorPrint("%s() LN%d, failed to open file %s\n", __func__, __LINE__, tmpBuf); - taos_free_result(res); return -1; } - TAOS_FIELD *fields = taos_fetch_fields(res); + count = taosDumpTable(ntbName, NULL, + fp, taos, dbInfo->name, getPrecisionByString(dbInfo->precision)); - int32_t numOfTable = 0; - while ((row = taos_fetch_row(res)) != NULL) { + fclose(fp); + return count; +} - memset(&tableRecord, 0, sizeof(STableRecord)); - tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes); - tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN); +static int64_t dumpNormalTable(FILE *fp, TAOS *taos, char *dbName, char *tbName, + char *stbName, + int precision) +{ + int64_t count = 0; + count = taosDumpTable(tbName, stbName, + fp, taos, dbName, precision); + + return count; +} + +static void *dumpNtbOfDb(void *arg) { + threadInfo *pThreadInfo = (threadInfo *)arg; + + debugPrint("dump table from = \t%"PRId64"\n", pThreadInfo->tableFrom); + debugPrint("dump table count = \t%"PRId64"\n", + pThreadInfo->tablesOfDumpOut); + + FILE *fp = NULL; + char tmpBuf[4096] = {0}; - taosWrite(fd, &tableRecord, sizeof(STableRecord)); - numOfTable++; + if (g_args.outpath[0] != 0) { + sprintf(tmpBuf, "%s/%s.%d.sql", + g_args.outpath, pThreadInfo->dbName, pThreadInfo->threadIndex); + } else { + sprintf(tmpBuf, "%s.%d.sql", + pThreadInfo->dbName, pThreadInfo->threadIndex); } - taos_free_result(res); - lseek(fd, 0, SEEK_SET); - int maxThreads = g_args.thread_num; - int tableOfPerFile ; - if (numOfTable <= g_args.thread_num) { - tableOfPerFile = 1; - maxThreads = numOfTable; + fp = fopen(tmpBuf, "w"); + + if (fp == NULL) { + errorPrint("%s() LN%d, failed to open file %s\n", + __func__, __LINE__, tmpBuf); + return NULL; + } + + for (int64_t i = 0; i < pThreadInfo->tablesOfDumpOut; i++) { + debugPrint("[%d] No.\t%"PRId64" table name: %s\n", + pThreadInfo->threadIndex, i, + ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name); + dumpNormalTable(fp, + pThreadInfo->taos, + pThreadInfo->dbName, + ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name, + ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->stable, + pThreadInfo->precision); + } + + fclose(fp); + + return NULL; +} + +static void *dumpNormalTablesOfStb(void *arg) { + threadInfo *pThreadInfo = (threadInfo *)arg; + + debugPrint("dump table from = \t%"PRId64"\n", pThreadInfo->tableFrom); + debugPrint("dump table count = \t%"PRId64"\n", pThreadInfo->tablesOfDumpOut); + + char command[COMMAND_SIZE]; + + sprintf(command, "SELECT TBNAME FROM %s.%s LIMIT %"PRId64" OFFSET %"PRId64"", + pThreadInfo->dbName, pThreadInfo->stbName, + pThreadInfo->tablesOfDumpOut, pThreadInfo->tableFrom); + + TAOS_RES *res = taos_query(pThreadInfo->taos, command); + int32_t code = taos_errno(res); + if (code) { + errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n", + __func__, __LINE__, command, taos_errstr(res)); + taos_free_result(res); + return NULL; + } + + FILE *fp = NULL; + char tmpBuf[4096] = {0}; + + if (g_args.outpath[0] != 0) { + sprintf(tmpBuf, "%s/%s.%d.sql", + g_args.outpath, pThreadInfo->dbName, pThreadInfo->threadIndex); } else { - tableOfPerFile = numOfTable / g_args.thread_num; - if (0 != numOfTable % g_args.thread_num) { - tableOfPerFile += 1; - } + sprintf(tmpBuf, "%s.%d.sql", + pThreadInfo->dbName, pThreadInfo->threadIndex); } - char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord)); - if (NULL == tblBuf){ - errorPrint("%s() LN%d, failed to calloc %" PRIzu "\n", - __func__, __LINE__, tableOfPerFile * sizeof(STableRecord)); - close(fd); - return -1; + fp = fopen(tmpBuf, "w"); + + if (fp == NULL) { + errorPrint("%s() LN%d, failed to open file %s\n", + __func__, __LINE__, tmpBuf); + return NULL; } - int32_t numOfThread = *totalNumOfThread; - int subFd = -1; - for (; numOfThread <= maxThreads; numOfThread++) { - memset(tmpBuf, 0, MAX_FILE_NAME_LEN); - sprintf(tmpBuf, ".tables.tmp.%d", numOfThread); - subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (subFd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", - __func__, __LINE__, tmpBuf); - for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { - sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); - (void)remove(tmpBuf); - } - sprintf(tmpBuf, ".select-tbname.tmp"); - (void)remove(tmpBuf); - free(tblBuf); - close(fd); + TAOS_ROW row = NULL; + int64_t i = 0; + while((row = taos_fetch_row(res)) != NULL) { + debugPrint("[%d] sub table %"PRId64": name: %s\n", + pThreadInfo->threadIndex, i++, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]); + + dumpNormalTable(fp, + pThreadInfo->taos, + pThreadInfo->dbName, + (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], + (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], + pThreadInfo->precision); + } + + fclose(fp); + return NULL; +} + +static int64_t dumpNtbOfDbByThreads( + SDbInfo *dbInfo, + int64_t ntbCount) +{ + if (ntbCount <= 0) { + return 0; + } + + int threads = g_args.thread_num; + + int64_t a = ntbCount / threads; + if (a < 1) { + threads = ntbCount; + a = 1; + } + + assert(threads); + int64_t b = ntbCount % threads; + + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); + assert(pids); + assert(infos); + + for (int64_t i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + pThreadInfo->taos = taos_connect( + g_args.host, + g_args.user, + g_args.password, + dbInfo->name, + g_args.port + ); + if (NULL == pThreadInfo->taos) { + errorPrint("%s() LN%d, Failed to connect to TDengine, reason: %s\n", + __func__, + __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + return -1; } - // read tableOfPerFile for fd, write to subFd - ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord)); - if (readLen <= 0) { - close(subFd); - break; - } - taosWrite(subFd, tblBuf, readLen); - close(subFd); + pThreadInfo->threadIndex = i; + pThreadInfo->tablesOfDumpOut = (itableFrom = (i==0)?0: + ((threadInfo *)(infos + i - 1))->tableFrom + + ((threadInfo *)(infos + i - 1))->tablesOfDumpOut; + strcpy(pThreadInfo->dbName, dbInfo->name); + pThreadInfo->precision = getPrecisionByString(dbInfo->precision); + + pthread_create(pids + i, NULL, dumpNtbOfDb, pThreadInfo); } - sprintf(tmpBuf, ".select-tbname.tmp"); - (void)remove(tmpBuf); + for (int64_t i = 0; i < threads; i++) { + pthread_join(pids[i], NULL); + } - if (fd >= 0) { - close(fd); - fd = -1; + for (int64_t i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + taos_close(pThreadInfo->taos); } - *totalNumOfThread = numOfThread; + free(pids); + free(infos); - free(tblBuf); return 0; } -static int inDatabasesSeq( - char *name, - int len) +static int64_t getNtbCountOfStb(TAOS *taos, char *dbName, char *stbName) { - if (strstr(g_args.databasesSeq, ",") == NULL) { - if (0 == strncmp(g_args.databasesSeq, name, len)) { - return 0; - } - } else { - char *dupSeq = strdup(g_args.databasesSeq); - char *running = dupSeq; - char *dbname = strsep(&running, ","); - while (dbname) { - if (0 == strncmp(dbname, name, len)) { - tfree(dupSeq); - return 0; - } + int64_t count = 0; - dbname = strsep(&running, ","); + char command[COMMAND_SIZE]; + + sprintf(command, "SELECT COUNT(TBNAME) FROM %s.%s", dbName, stbName); + + TAOS_RES *res = taos_query(taos, command); + int32_t code = taos_errno(res); + if (code != 0) { + errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n", + __func__, __LINE__, command, taos_errstr(res)); + taos_free_result(res); + return -1; + } + + TAOS_ROW row = NULL; + + if ((row = taos_fetch_row(res)) != NULL) { + count = *(int64_t*)row[TSDB_SHOW_TABLES_NAME_INDEX]; + } + + return count; +} + +static int64_t dumpNtbOfStbByThreads( + TAOS *taos, + SDbInfo *dbInfo, char *stbName) +{ + int64_t ntbCount = getNtbCountOfStb(taos, dbInfo->name, stbName); + + if (ntbCount <= 0) { + return 0; + } + + int threads = g_args.thread_num; + + int64_t a = ntbCount / threads; + if (a < 1) { + threads = ntbCount; + a = 1; + } + + assert(threads); + int64_t b = ntbCount % threads; + + pthread_t *pids = calloc(1, threads * sizeof(pthread_t)); + threadInfo *infos = calloc(1, threads * sizeof(threadInfo)); + assert(pids); + assert(infos); + + for (int64_t i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + pThreadInfo->taos = taos_connect( + g_args.host, + g_args.user, + g_args.password, + dbInfo->name, + g_args.port + ); + if (NULL == pThreadInfo->taos) { + errorPrint("%s() LN%d, Failed to connect to TDengine, reason: %s\n", + __func__, + __LINE__, + taos_errstr(NULL)); + free(pids); + free(infos); + + return -1; } - free(dupSeq); + pThreadInfo->threadIndex = i; + pThreadInfo->tablesOfDumpOut = (itableFrom = (i==0)?0: + ((threadInfo *)(infos + i - 1))->tableFrom + + ((threadInfo *)(infos + i - 1))->tablesOfDumpOut; + strcpy(pThreadInfo->dbName, dbInfo->name); + pThreadInfo->precision = getPrecisionByString(dbInfo->precision); + + strcpy(pThreadInfo->stbName, stbName); + pthread_create(pids + i, NULL, dumpNormalTablesOfStb, pThreadInfo); } - return -1; + for (int64_t i = 0; i < threads; i++) { + pthread_join(pids[i], NULL); + } + + + int64_t records = 0; + for (int64_t i = 0; i < threads; i++) { + threadInfo *pThreadInfo = infos + i; + records += pThreadInfo->rowsOfDumpOut; + taos_close(pThreadInfo->taos); + } + + free(pids); + free(infos); + + return records; } -static int getDbCount() +static int64_t dumpCreateSTableClauseOfDb( + SDbInfo *dbInfo, FILE *fp) { - int count = 0; + TAOS *taos = taos_connect(g_args.host, + g_args.user, g_args.password, dbInfo->name, g_args.port); + if (NULL == taos) { + errorPrint( + "Failed to connect to TDengine server %s by specified database %s\n", + g_args.host, dbInfo->name); + return 0; + } - TAOS *taos = NULL; - TAOS_RES *result = NULL; - char *command = "show databases"; TAOS_ROW row; + char command[COMMAND_SIZE] = {0}; - /* Connect to server */ - taos = taos_connect(g_args.host, g_args.user, g_args.password, - NULL, g_args.port); + sprintf(command, "SHOW %s.STABLES", dbInfo->name); + + TAOS_RES* res = taos_query(taos, command); + int32_t code = taos_errno(res); + if (code != 0) { + errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", + __func__, __LINE__, command, taos_errstr(res)); + taos_free_result(res); + taos_close(taos); + exit(-1); + } + + int64_t superTblCnt = 0; + while ((row = taos_fetch_row(res)) != NULL) { + if (0 == dumpStable(row[TSDB_SHOW_TABLES_NAME_INDEX], fp, taos, dbInfo)) { + superTblCnt ++; + } + } + + taos_free_result(res); + + fprintf(g_fpOfResult, + "# super table counter: %"PRId64"\n", + superTblCnt); + g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt; + + taos_close(taos); + + return superTblCnt; +} + +static int64_t dumpNTablesOfDb(SDbInfo *dbInfo) +{ + TAOS *taos = taos_connect(g_args.host, + g_args.user, g_args.password, dbInfo->name, g_args.port); if (NULL == taos) { - errorPrint("Failed to connect to TDengine server %s\n", g_args.host); + errorPrint( + "Failed to connect to TDengine server %s by specified database %s\n", + g_args.host, dbInfo->name); return 0; } - result = taos_query(taos, command); - int32_t code = taos_errno(result); + char command[COMMAND_SIZE]; + TAOS_RES *result; + int32_t code; - if (0 != code) { - errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n", - __func__, __LINE__, command, taos_errstr(result)); + sprintf(command, "USE %s", dbInfo->name); + result = taos_query(taos, command); + code = taos_errno(result); + if (code != 0) { + errorPrint("invalid database %s, reason: %s\n", + dbInfo->name, taos_errstr(result)); + taos_close(taos); return 0; } - TAOS_FIELD *fields = taos_fetch_fields(result); - - while ((row = taos_fetch_row(result)) != NULL) { - // sys database name : 'log', but subsequent version changed to 'log' - if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log", - fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) - && (!g_args.allow_sys)) { - continue; - } + sprintf(command, "SHOW TABLES"); + result = taos_query(taos, command); + code = taos_errno(result); + if (code != 0) { + errorPrint("Failed to show %s\'s tables, reason: %s\n", + dbInfo->name, taos_errstr(result)); + taos_close(taos); + return 0; + } - if (g_args.databases) { // input multi dbs - if (inDatabasesSeq( - (char *)row[TSDB_SHOW_DB_NAME_INDEX], - fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0) - continue; - } else if (!g_args.all_databases) { // only input one db - if (strncasecmp(g_args.arg_list[0], - (char *)row[TSDB_SHOW_DB_NAME_INDEX], - fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0) - continue; - } + g_tablesList = calloc(1, dbInfo->ntables * sizeof(TableInfo)); - count++; + TAOS_ROW row; + int64_t count = 0; + while(NULL != (row = taos_fetch_row(result))) { + debugPrint("%s() LN%d, No.\t%"PRId64" table name: %s\n", + __func__, __LINE__, + count, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]); + tstrncpy(((TableInfo *)(g_tablesList + count))->name, + (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], TSDB_TABLE_NAME_LEN); + char *stbName = (char *) row[TSDB_SHOW_TABLES_METRIC_INDEX]; + if (stbName) { + tstrncpy(((TableInfo *)(g_tablesList + count))->stable, + (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], TSDB_TABLE_NAME_LEN); + ((TableInfo *)(g_tablesList + count))->belongStb = true; + } + count ++; } + taos_close(taos); - if (count == 0) { - errorPrint("%d databases valid to dump\n", count); - } + int64_t records = dumpNtbOfDbByThreads(dbInfo, count); - return count; + free(g_tablesList); + g_tablesList = NULL; + + return records; +} + +static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp) +{ + taosDumpCreateDbClause(dbInfo, g_args.with_property, fp); + + fprintf(g_fpOfResult, "\n#### database: %s\n", + dbInfo->name); + g_resultStatistics.totalDatabasesOfDumpOut++; + + dumpCreateSTableClauseOfDb(dbInfo, fp); + + return dumpNTablesOfDb(dbInfo); } static int taosDumpOut() { TAOS *taos = NULL; TAOS_RES *result = NULL; - char *command = NULL; TAOS_ROW row; FILE *fp = NULL; int32_t count = 0; - STableRecordInfo tableRecordInfo; + TableRecordInfo tableRecordInfo; char tmpBuf[4096] = {0}; if (g_args.outpath[0] != 0) { @@ -1328,26 +1386,24 @@ static int taosDumpOut() { return -1; } - g_args.dbCount = getDbCount(); + g_args.dumpDbCount = getDumpDbCount(); + debugPrint("%s() LN%d, dump db count: %d\n", + __func__, __LINE__, g_args.dumpDbCount); - if (0 == g_args.dbCount) { + if (0 == g_args.dumpDbCount) { + errorPrint("%d databases valid to dump\n", g_args.dumpDbCount); fclose(fp); - errorPrint("%d databases valid to dump\n", g_args.dbCount); return -1; } - g_dbInfos = (SDbInfo **)calloc(g_args.dbCount, sizeof(SDbInfo *)); + g_dbInfos = (SDbInfo **)calloc(g_args.dumpDbCount, sizeof(SDbInfo *)); if (g_dbInfos == NULL) { errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__); goto _exit_failure; } - command = (char *)malloc(COMMAND_SIZE); - if (command == NULL) { - errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__); - goto _exit_failure; - } + char command[COMMAND_SIZE]; /* Connect to server */ taos = taos_connect(g_args.host, g_args.user, g_args.password, @@ -1367,7 +1423,7 @@ static int taosDumpOut() { int32_t code = taos_errno(result); if (code != 0) { - errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n", + errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", __func__, __LINE__, command, taos_errstr(result)); goto _exit_failure; } @@ -1450,10 +1506,11 @@ static int taosDumpOut() { count++; if (g_args.databases) { - if (count > g_args.dbCount) break; - + if (count > g_args.dumpDbCount) + break; } else if (!g_args.all_databases) { - if (count >= 1) break; + if (count >= 1) + break; } } @@ -1462,94 +1519,52 @@ static int taosDumpOut() { goto _exit_failure; } - if (g_args.databases || g_args.all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases + if (g_args.databases || g_args.all_databases) { // case: taosdump --databases dbx,dby ... OR taosdump --all-databases for (int i = 0; i < count; i++) { - taosDumpDb(g_dbInfos[i], fp, taos); + int64_t records = 0; + records = dumpWholeDatabase(g_dbInfos[i], fp); + if (records >= 0) { + okPrint("Database %s dumped\n", g_dbInfos[i]->name); + g_totalDumpOutRows += records; + } } } else { - if (g_args.dbCount == 1) { // case: taosdump - taosDumpDb(g_dbInfos[0], fp, taos); - } else { // case: taosdump tablex tabley ... + if (1 == g_args.arg_list_len) { + int64_t records = dumpWholeDatabase(g_dbInfos[0], fp); + if (records >= 0) { + okPrint("Database %s dumped\n", g_dbInfos[0]->name); + g_totalDumpOutRows += records; + } + } else { taosDumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp); - fprintf(g_fpOfResult, "\n#### database: %s\n", - g_dbInfos[0]->name); - g_resultStatistics.totalDatabasesOfDumpOut++; - - sprintf(command, "use %s", g_dbInfos[0]->name); + } - result = taos_query(taos, command); - code = taos_errno(result); - if (code != 0) { - errorPrint("invalid database %s\n", g_dbInfos[0]->name); - goto _exit_failure; + int superTblCnt = 0 ; + for (int i = 1; g_args.arg_list[i]; i++) { + if (taosGetTableRecordInfo(g_dbInfos[0]->name, + g_args.arg_list[i], + &tableRecordInfo, taos) < 0) { + errorPrint("input the invalid table %s\n", + g_args.arg_list[i]); + continue; } - fprintf(fp, "USE %s;\n\n", g_dbInfos[0]->name); - - int32_t totalNumOfThread = 1; // 0: all normal table into .tables.tmp.0 - int normalTblFd = -1; - int32_t retCode; - int superTblCnt = 0 ; - for (int i = 1; g_args.arg_list[i]; i++) { - if (taosGetTableRecordInfo(g_args.arg_list[i], - &tableRecordInfo, taos) < 0) { - errorPrint("input the invalid table %s\n", - g_args.arg_list[i]); - continue; - } - - if (tableRecordInfo.isMetric) { // dump all table of this metric - int ret = taosDumpStable( - tableRecordInfo.tableRecord.metric, - fp, taos, g_dbInfos[0]->name); - if (0 == ret) { - superTblCnt++; - } - retCode = taosSaveTableOfMetricToTempFile( - taos, tableRecordInfo.tableRecord.metric, - &totalNumOfThread); - } else { - if (tableRecordInfo.tableRecord.metric[0] != '\0') { // dump this sub table and it's metric - int ret = taosDumpStable( - tableRecordInfo.tableRecord.metric, - fp, taos, g_dbInfos[0]->name); - if (0 == ret) { - superTblCnt++; - } - } - retCode = taosSaveAllNormalTableToTempFile( - taos, tableRecordInfo.tableRecord.name, - tableRecordInfo.tableRecord.metric, &normalTblFd); - } - - if (retCode < 0) { - if (-1 != normalTblFd){ - taosClose(normalTblFd); - } - goto _clean_tmp_file; + int64_t records = 0; + if (tableRecordInfo.isStable) { // dump all table of this stable + int ret = dumpStable( + tableRecordInfo.tableRecord.stable, + fp, taos, g_dbInfos[0]); + if (ret >= 0) { + superTblCnt++; + records = dumpNtbOfStbByThreads(taos, g_dbInfos[0], g_args.arg_list[i]); } + } else { + records = dumpNormalTableWithoutStb(taos, g_dbInfos[0], g_args.arg_list[i]); } - // TODO: save dump super table into result_output.txt - fprintf(g_fpOfResult, "# super table counter: %d\n", - superTblCnt); - g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt; - - if (-1 != normalTblFd){ - taosClose(normalTblFd); - } - - // start multi threads to dumpout - - taosStartDumpOutWorkThreads(totalNumOfThread, - g_dbInfos[0]->name, - getPrecisionByString(g_dbInfos[0]->precision)); - - char tmpFileName[MAX_FILE_NAME_LEN]; -_clean_tmp_file: - for (int loopCnt = 0; loopCnt < totalNumOfThread; loopCnt++) { - sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); - remove(tmpFileName); + if (records >= 0) { + okPrint("table: %s dumped\n", g_args.arg_list[i]); + g_totalDumpOutRows += records; } } } @@ -1558,7 +1573,6 @@ _clean_tmp_file: fclose(fp); taos_close(taos); taos_free_result(result); - tfree(command); taosFreeDbInfos(); fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows); return 0; @@ -1567,7 +1581,6 @@ _exit_failure: fclose(fp); taos_close(taos); taos_free_result(result); - tfree(command); taosFreeDbInfos(); errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows); return -1; @@ -1575,18 +1588,18 @@ _exit_failure: static int taosGetTableDes( char* dbName, char *table, - STableDef *stableDes, TAOS* taosCon, bool isSuperTable) { + STableDef *stableDes, TAOS* taos, bool isSuperTable) { TAOS_ROW row = NULL; TAOS_RES* res = NULL; - int count = 0; + int colCount = 0; char sqlstr[COMMAND_SIZE]; sprintf(sqlstr, "describe %s.%s;", dbName, table); - res = taos_query(taosCon, sqlstr); + res = taos_query(taos, sqlstr); int32_t code = taos_errno(res); if (code != 0) { - errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n", + errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", __func__, __LINE__, sqlstr, taos_errstr(res)); taos_free_result(res); return -1; @@ -1596,41 +1609,41 @@ static int taosGetTableDes( tstrncpy(stableDes->name, table, TSDB_TABLE_NAME_LEN); while ((row = taos_fetch_row(res)) != NULL) { - tstrncpy(stableDes->cols[count].field, + tstrncpy(stableDes->cols[colCount].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], min(TSDB_COL_NAME_LEN + 1, fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes + 1)); - tstrncpy(stableDes->cols[count].type, + tstrncpy(stableDes->cols[colCount].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], min(16, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes + 1)); - stableDes->cols[count].length = + stableDes->cols[colCount].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); - tstrncpy(stableDes->cols[count].note, + tstrncpy(stableDes->cols[colCount].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], min(COL_NOTE_LEN, fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes + 1)); - count++; + colCount++; } taos_free_result(res); res = NULL; if (isSuperTable) { - return count; + return colCount; } // if child-table have tag, using select tagName from table to get tagValue - for (int i = 0 ; i < count; i++) { + for (int i = 0 ; i < colCount; i++) { if (strcmp(stableDes->cols[i].note, "TAG") != 0) continue; sprintf(sqlstr, "select %s from %s.%s", stableDes->cols[i].field, dbName, table); - res = taos_query(taosCon, sqlstr); + res = taos_query(taos, sqlstr); code = taos_errno(res); if (code != 0) { - errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n", + errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", __func__, __LINE__, sqlstr, taos_errstr(res)); taos_free_result(res); return -1; @@ -1646,7 +1659,7 @@ static int taosGetTableDes( return -1; } - if (row[0] == NULL) { + if (row[TSDB_SHOW_TABLES_NAME_INDEX] == NULL) { sprintf(stableDes->cols[i].note, "%s", "NULL"); taos_free_result(res); res = NULL; @@ -1659,32 +1672,33 @@ static int taosGetTableDes( switch (fields[0].type) { case TSDB_DATA_TYPE_BOOL: sprintf(stableDes->cols[i].note, "%d", - ((((int32_t)(*((char *)row[0]))) == 1) ? 1 : 0)); + ((((int32_t)(*((char *)row[TSDB_SHOW_TABLES_NAME_INDEX]))) == 1) ? 1 : 0)); break; case TSDB_DATA_TYPE_TINYINT: - sprintf(stableDes->cols[i].note, "%d", *((int8_t *)row[0])); + sprintf(stableDes->cols[i].note, "%d", + *((int8_t *)row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_SMALLINT: - sprintf(stableDes->cols[i].note, "%d", *((int16_t *)row[0])); + sprintf(stableDes->cols[i].note, "%d", *((int16_t *)row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_INT: - sprintf(stableDes->cols[i].note, "%d", *((int32_t *)row[0])); + sprintf(stableDes->cols[i].note, "%d", *((int32_t *)row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_BIGINT: - sprintf(stableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0])); + sprintf(stableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_FLOAT: - sprintf(stableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0])); + sprintf(stableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_DOUBLE: - sprintf(stableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0])); + sprintf(stableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX])); break; case TSDB_DATA_TYPE_BINARY: { memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note)); stableDes->cols[i].note[0] = '\''; char tbuf[COL_NOTE_LEN]; - converStringToReadable((char *)row[0], length[0], tbuf, COL_NOTE_LEN); + converStringToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN); char* pstr = stpcpy(&(stableDes->cols[i].note[1]), tbuf); *(pstr++) = '\''; break; @@ -1693,518 +1707,156 @@ static int taosGetTableDes( { memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note)); char tbuf[COL_NOTE_LEN-2]; // need reserve 2 bytes for ' ' - convertNCharToReadable((char *)row[0], length[0], tbuf, COL_NOTE_LEN); + convertNCharToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN); sprintf(stableDes->cols[i].note, "\'%s\'", tbuf); break; } case TSDB_DATA_TYPE_TIMESTAMP: - sprintf(stableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]); + sprintf(stableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]); #if 0 if (!g_args.mysqlFlag) { - sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]); + sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]); } else { char buf[64] = "\0"; - int64_t ts = *((int64_t *)row[0]); + int64_t ts = *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]); time_t tt = (time_t)(ts / 1000); struct tm *ptm = localtime(&tt); strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm); sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000)); } #endif - break; - default: - break; - } - - taos_free_result(res); - res = NULL; - } - - return count; -} - -static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema) -{ - errorPrint("%s() LN%d TODO: covert table schema to avro schema\n", - __func__, __LINE__); - return 0; -} - -static int32_t taosDumpTable( - char *tbName, char *metric, - FILE *fp, TAOS* taosCon, char* dbName, int precision) { - int count = 0; - - STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) - + sizeof(SColDes) * TSDB_MAX_COLUMNS); - - if (metric != NULL && metric[0] != '\0') { // dump table schema which is created by using super table - /* - count = taosGetTableDes(metric, tableDes, taosCon); - - if (count < 0) { - free(tableDes); - return -1; - } - - taosDumpCreateTableClause(tableDes, count, fp); - - memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); - */ - - count = taosGetTableDes(dbName, tbName, tableDes, taosCon, false); - - if (count < 0) { - free(tableDes); - return -1; - } - - // create child-table using super-table - taosDumpCreateMTableClause(tableDes, metric, count, fp, dbName); - - } else { // dump table definition - count = taosGetTableDes(dbName, tbName, tableDes, taosCon, false); - - if (count < 0) { - free(tableDes); - return -1; - } - - // create normal-table or super-table - taosDumpCreateTableClause(tableDes, count, fp, dbName); - } - - char *jsonAvroSchema = NULL; - if (g_args.avro) { - convertSchemaToAvroSchema(tableDes, &jsonAvroSchema); - } - - free(tableDes); - - int32_t ret = 0; - if (!g_args.schemaonly) { - ret = taosDumpTableData(fp, tbName, taosCon, dbName, precision, - jsonAvroSchema); - } - - return ret; -} - -static void taosDumpCreateDbClause( - SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { - char sqlstr[TSDB_MAX_SQL_LEN] = {0}; - - char *pstr = sqlstr; - pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name); - if (isDumpProperty) { - pstr += sprintf(pstr, - "REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d", - dbInfo->replica, dbInfo->quorum, dbInfo->days, - dbInfo->keeplist, - dbInfo->cache, - dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows, - dbInfo->fsync, - dbInfo->cachelast, - dbInfo->comp, dbInfo->precision, dbInfo->update); - } - - pstr += sprintf(pstr, ";"); - fprintf(fp, "%s\n\n", sqlstr); -} - -static void* taosDumpOutWorkThreadFp(void *arg) -{ - SThreadParaObj *pThread = (SThreadParaObj*)arg; - STableRecord tableRecord; - int fd; - - setThreadName("dumpOutWorkThrd"); - - char tmpBuf[4096] = {0}; - sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex); - fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", - __func__, __LINE__, tmpBuf); - return NULL; - } - - FILE *fp = NULL; - memset(tmpBuf, 0, 4096); - - if (g_args.outpath[0] != 0) { - sprintf(tmpBuf, "%s/%s.tables.%d.sql", - g_args.outpath, pThread->dbName, pThread->threadIndex); - } else { - sprintf(tmpBuf, "%s.tables.%d.sql", - pThread->dbName, pThread->threadIndex); - } - - fp = fopen(tmpBuf, "w"); - if (fp == NULL) { - errorPrint("%s() LN%d, failed to open file %s\n", - __func__, __LINE__, tmpBuf); - close(fd); - return NULL; - } - - memset(tmpBuf, 0, 4096); - sprintf(tmpBuf, "use %s", pThread->dbName); - - TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf); - int32_t code = taos_errno(tmpResult); - if (code != 0) { - errorPrint("%s() LN%d, invalid database %s. reason: %s\n", - __func__, __LINE__, pThread->dbName, taos_errstr(tmpResult)); - taos_free_result(tmpResult); - fclose(fp); - close(fd); - return NULL; - } - -#if 0 - int fileNameIndex = 1; - int tablesInOneFile = 0; -#endif - int64_t lastRowsPrint = 5000000; - fprintf(fp, "USE %s;\n\n", pThread->dbName); - while (1) { - ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord)); - if (readLen <= 0) break; - - int ret = taosDumpTable( - tableRecord.name, tableRecord.metric, - fp, pThread->taosCon, pThread->dbName, - pThread->precision); - if (ret >= 0) { - // TODO: sum table count and table rows by self - pThread->tablesOfDumpOut++; - pThread->rowsOfDumpOut += ret; - - if (pThread->rowsOfDumpOut >= lastRowsPrint) { - printf(" %"PRId64 " rows already be dumpout from database %s\n", - pThread->rowsOfDumpOut, pThread->dbName); - lastRowsPrint += 5000000; - } - -#if 0 - tablesInOneFile++; - if (tablesInOneFile >= g_args.table_batch) { - fclose(fp); - tablesInOneFile = 0; - - memset(tmpBuf, 0, 4096); - if (g_args.outpath[0] != 0) { - sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", - g_args.outpath, pThread->dbName, - pThread->threadIndex, fileNameIndex); - } else { - sprintf(tmpBuf, "%s.tables.%d-%d.sql", - pThread->dbName, pThread->threadIndex, fileNameIndex); - } - fileNameIndex++; - - fp = fopen(tmpBuf, "w"); - if (fp == NULL) { - errorPrint("%s() LN%d, failed to open file %s\n", - __func__, __LINE__, tmpBuf); - close(fd); - taos_free_result(tmpResult); - return NULL; - } - } -#endif - } - } - - taos_free_result(tmpResult); - close(fd); - fclose(fp); - - return NULL; -} - -static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName, int precision) -{ - pthread_attr_t thattr; - SThreadParaObj *threadObj = - (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj)); - - if (threadObj == NULL) { - errorPrint("%s() LN%d, memory allocation failed!\n", - __func__, __LINE__); - return; - } - - for (int t = 0; t < numOfThread; ++t) { - SThreadParaObj *pThread = threadObj + t; - pThread->rowsOfDumpOut = 0; - pThread->tablesOfDumpOut = 0; - pThread->threadIndex = t; - pThread->totalThreads = numOfThread; - tstrncpy(pThread->dbName, dbName, TSDB_DB_NAME_LEN); - pThread->precision = precision; - pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password, - NULL, g_args.port); - if (pThread->taosCon == NULL) { - errorPrint("Failed to connect to TDengine server %s\n", g_args.host); - free(threadObj); - return; - } - pthread_attr_init(&thattr); - pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); - - if (pthread_create(&(pThread->threadID), &thattr, - taosDumpOutWorkThreadFp, - (void*)pThread) != 0) { - errorPrint("%s() LN%d, thread:%d failed to start\n", - __func__, __LINE__, pThread->threadIndex); - exit(-1); - } - } - - for (int32_t t = 0; t < numOfThread; ++t) { - pthread_join(threadObj[t].threadID, NULL); - } - - // TODO: sum all thread dump table count and rows of per table, then save into result_output.txt - int64_t totalRowsOfDumpOut = 0; - int64_t totalChildTblsOfDumpOut = 0; - for (int32_t t = 0; t < numOfThread; ++t) { - totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut; - totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut; - } - - fprintf(g_fpOfResult, "# child table counter: %"PRId64"\n", - totalChildTblsOfDumpOut); - fprintf(g_fpOfResult, "# row counter: %"PRId64"\n", - totalRowsOfDumpOut); - g_resultStatistics.totalChildTblsOfDumpOut += totalChildTblsOfDumpOut; - g_resultStatistics.totalRowsOfDumpOut += totalRowsOfDumpOut; - free(threadObj); -} - -static int32_t taosDumpStable(char *table, FILE *fp, - TAOS* taosCon, char* dbName) { - - uint64_t sizeOfTableDes = - (uint64_t)(sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); - STableDef *stableDes = (STableDef *)calloc(1, sizeOfTableDes); - if (NULL == stableDes) { - errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n", - __func__, __LINE__, sizeOfTableDes); - exit(-1); - } - - int count = taosGetTableDes(dbName, table, stableDes, taosCon, true); - - if (count < 0) { - free(stableDes); - errorPrint("%s() LN%d, failed to get stable[%s] schema\n", - __func__, __LINE__, table); - exit(-1); - } - - taosDumpCreateTableClause(stableDes, count, fp, dbName); - - free(stableDes); - return 0; -} - -static int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp) -{ - TAOS_ROW row; - int fd = -1; - STableRecord tableRecord; - char sqlstr[TSDB_MAX_SQL_LEN] = {0}; - - sprintf(sqlstr, "show %s.stables", dbName); - - TAOS_RES* res = taos_query(taosCon, sqlstr); - int32_t code = taos_errno(res); - if (code != 0) { - errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n", - __func__, __LINE__, sqlstr, taos_errstr(res)); - taos_free_result(res); - exit(-1); - } - - TAOS_FIELD *fields = taos_fetch_fields(res); - - char tmpFileName[MAX_FILE_NAME_LEN]; - memset(tmpFileName, 0, MAX_FILE_NAME_LEN); - sprintf(tmpFileName, ".stables.tmp"); - fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", - __func__, __LINE__, tmpFileName); - taos_free_result(res); - (void)remove(".stables.tmp"); - exit(-1); - } - - while ((row = taos_fetch_row(res)) != NULL) { - memset(&tableRecord, 0, sizeof(STableRecord)); - tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], - min(TSDB_TABLE_NAME_LEN, - fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1)); - taosWrite(fd, &tableRecord, sizeof(STableRecord)); - } - - taos_free_result(res); - (void)lseek(fd, 0, SEEK_SET); - - int superTblCnt = 0; - while (1) { - ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord)); - if (readLen <= 0) break; - - int ret = taosDumpStable(tableRecord.name, fp, taosCon, dbName); - if (0 == ret) { - superTblCnt++; + break; + default: + break; } - } - // TODO: save dump super table into result_output.txt - fprintf(g_fpOfResult, "# super table counter: %d\n", superTblCnt); - g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt; + taos_free_result(res); + res = NULL; + } - close(fd); - (void)remove(".stables.tmp"); + return colCount; +} +static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema) +{ + errorPrint("%s() LN%d TODO: covert table schema to avro schema\n", + __func__, __LINE__); return 0; } +static int64_t taosDumpTable( + char *tbName, char *stable, + FILE *fp, TAOS* taos, char* dbName, int precision) { + int colCount = 0; -static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon) { - TAOS_ROW row; - int fd = -1; - STableRecord tableRecord; + STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + + sizeof(SColDes) * TSDB_MAX_COLUMNS); - taosDumpCreateDbClause(dbInfo, g_args.with_property, fp); + if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table + /* + colCount = taosGetTableDes(stable, tableDes, taos); - fprintf(g_fpOfResult, "\n#### database: %s\n", - dbInfo->name); - g_resultStatistics.totalDatabasesOfDumpOut++; + if (count < 0) { + free(tableDes); + return -1; + } - char sqlstr[TSDB_MAX_SQL_LEN] = {0}; + taosDumpCreateTableClause(tableDes, count, fp); + + memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); + */ + + colCount = taosGetTableDes(dbName, tbName, tableDes, taos, false); + + if (colCount < 0) { + free(tableDes); + return -1; + } - fprintf(fp, "USE %s;\n\n", dbInfo->name); + // create child-table using super-table + taosDumpCreateMTableClause(tableDes, stable, colCount, fp, dbName); - (void)taosDumpCreateSuperTableClause(taosCon, dbInfo->name, fp); + } else { // dump table definition + colCount = taosGetTableDes(dbName, tbName, tableDes, taos, false); - sprintf(sqlstr, "show %s.tables", dbInfo->name); + if (colCount < 0) { + free(tableDes); + return -1; + } - TAOS_RES* res = taos_query(taosCon, sqlstr); - int code = taos_errno(res); - if (code != 0) { - errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n", - __func__, __LINE__, sqlstr, taos_errstr(res)); - taos_free_result(res); - return -1; + // create normal-table or super-table + taosDumpCreateTableClause(tableDes, colCount, fp, dbName); } - char tmpBuf[MAX_FILE_NAME_LEN]; - memset(tmpBuf, 0, MAX_FILE_NAME_LEN); - sprintf(tmpBuf, ".show-tables.tmp"); - fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", - __func__, __LINE__, tmpBuf); - taos_free_result(res); - return -1; + char *jsonAvroSchema = NULL; + if (g_args.avro) { + convertSchemaToAvroSchema(tableDes, &jsonAvroSchema); } - TAOS_FIELD *fields = taos_fetch_fields(res); + free(tableDes); - int32_t numOfTable = 0; - while ((row = taos_fetch_row(res)) != NULL) { - memset(&tableRecord, 0, sizeof(STableRecord)); - tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], - min(TSDB_TABLE_NAME_LEN, - fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1)); - tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], - min(TSDB_TABLE_NAME_LEN, - fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1)); + int64_t ret = 0; + if (!g_args.schemaonly) { + ret = taosDumpTableData(fp, tbName, taos, dbName, precision, + jsonAvroSchema); + } - taosWrite(fd, &tableRecord, sizeof(STableRecord)); + return ret; +} - numOfTable++; - } - taos_free_result(res); - lseek(fd, 0, SEEK_SET); +static void taosDumpCreateDbClause( + SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { + char sqlstr[TSDB_MAX_SQL_LEN] = {0}; - int maxThreads = g_args.thread_num; - int tableOfPerFile ; - if (numOfTable <= g_args.thread_num) { - tableOfPerFile = 1; - maxThreads = numOfTable; - } else { - tableOfPerFile = numOfTable / g_args.thread_num; - if (0 != numOfTable % g_args.thread_num) { - tableOfPerFile += 1; - } + char *pstr = sqlstr; + pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name); + if (isDumpProperty) { + pstr += sprintf(pstr, + "REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d", + dbInfo->replica, dbInfo->quorum, dbInfo->days, + dbInfo->keeplist, + dbInfo->cache, + dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows, + dbInfo->fsync, + dbInfo->cachelast, + dbInfo->comp, dbInfo->precision, dbInfo->update); } - char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord)); - if (NULL == tblBuf){ - errorPrint("failed to calloc %" PRIzu "\n", - tableOfPerFile * sizeof(STableRecord)); - close(fd); - return -1; - } + pstr += sprintf(pstr, ";"); + fprintf(fp, "%s\n\n", sqlstr); +} - int32_t numOfThread = 0; - int subFd = -1; - for (numOfThread = 0; numOfThread < maxThreads; numOfThread++) { - memset(tmpBuf, 0, MAX_FILE_NAME_LEN); - sprintf(tmpBuf, ".tables.tmp.%d", numOfThread); - subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (subFd == -1) { - errorPrint("%s() LN%d, failed to open temp file: %s\n", - __func__, __LINE__, tmpBuf); - for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { - sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); - (void)remove(tmpBuf); - } - sprintf(tmpBuf, ".show-tables.tmp"); - (void)remove(tmpBuf); - free(tblBuf); - close(fd); - return -1; - } +static int dumpStable(char *stbName, FILE *fp, + TAOS* taos, SDbInfo *dbInfo) +{ - // read tableOfPerFile for fd, write to subFd - ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord)); - if (readLen <= 0) { - close(subFd); - break; - } - taosWrite(subFd, tblBuf, readLen); - close(subFd); + uint64_t sizeOfTableDes = + (uint64_t)(sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); + + STableDef *stableDes = (STableDef *)calloc(1, sizeOfTableDes); + if (NULL == stableDes) { + errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n", + __func__, __LINE__, sizeOfTableDes); + exit(-1); } - sprintf(tmpBuf, ".show-tables.tmp"); - (void)remove(tmpBuf); + int colCount = taosGetTableDes(dbInfo->name, + stbName, stableDes, taos, true); - if (fd >= 0) { - close(fd); - fd = -1; + if (colCount < 0) { + free(stableDes); + errorPrint("%s() LN%d, failed to get stable[%s] schema\n", + __func__, __LINE__, stbName); + exit(-1); } - // start multi threads to dumpout - taosStartDumpOutWorkThreads(numOfThread, dbInfo->name, - getPrecisionByString(dbInfo->precision)); - for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) { - sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); - (void)remove(tmpBuf); - } + taosDumpCreateTableClause(stableDes, colCount, fp, dbInfo->name); + free(stableDes); - free(tblBuf); return 0; } -static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, +static int taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp, char* dbName) { int counter = 0; int count_temp = 0; @@ -2251,10 +1903,11 @@ static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, pstr += sprintf(pstr, ");"); - fprintf(fp, "%s\n\n", sqlstr); + debugPrint("%s() LN%d, write string: %s\n", __func__, __LINE__, sqlstr); + return fprintf(fp, "%s\n\n", sqlstr); } -static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, +static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable, int numOfCols, FILE *fp, char* dbName) { int counter = 0; int count_temp = 0; @@ -2271,7 +1924,7 @@ static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (", - dbName, tableDes->name, dbName, metric); + dbName, tableDes->name, dbName, stable); for (; counter < numOfCols; counter++) { if (tableDes->cols[counter].note[0] != '\0') break; @@ -2471,8 +2124,8 @@ static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbN return 0; } -static int taosDumpTableData(FILE *fp, char *tbName, - TAOS* taosCon, char* dbName, int precision, +static int64_t taosDumpTableData(FILE *fp, char *tbName, + TAOS* taos, char* dbName, int precision, char *jsonAvroSchema) { int64_t totalRows = 0; @@ -2505,7 +2158,7 @@ static int taosDumpTableData(FILE *fp, char *tbName, "select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;", dbName, tbName, start_time, end_time); - TAOS_RES* res = taos_query(taosCon, sqlstr); + TAOS_RES* res = taos_query(taos, sqlstr); int32_t code = taos_errno(res); if (code != 0) { errorPrint("failed to run command %s, reason: %s\n", @@ -2941,7 +2594,7 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset, static void* taosDumpInWorkThreadFp(void *arg) { - SThreadParaObj *pThread = (SThreadParaObj*)arg; + threadInfo *pThread = (threadInfo*)arg; setThreadName("dumpInWorkThrd"); for (int32_t f = 0; f < g_tsSqlFileNum; ++f) { @@ -2953,7 +2606,7 @@ static void* taosDumpInWorkThreadFp(void *arg) } fprintf(stderr, ", Success Open input file: %s\n", SQLFileName); - taosDumpInOneFile(pThread->taosCon, fp, g_tsCharset, g_args.encode, SQLFileName); + taosDumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName); } } @@ -2963,15 +2616,15 @@ static void* taosDumpInWorkThreadFp(void *arg) static void taosStartDumpInWorkThreads() { pthread_attr_t thattr; - SThreadParaObj *pThread; + threadInfo *pThread; int32_t totalThreads = g_args.thread_num; if (totalThreads > g_tsSqlFileNum) { totalThreads = g_tsSqlFileNum; } - SThreadParaObj *threadObj = (SThreadParaObj *)calloc( - totalThreads, sizeof(SThreadParaObj)); + threadInfo *threadObj = (threadInfo *)calloc( + totalThreads, sizeof(threadInfo)); if (NULL == threadObj) { errorPrint("%s() LN%d, memory allocation failed\n", __func__, __LINE__); @@ -2981,9 +2634,9 @@ static void taosStartDumpInWorkThreads() pThread = threadObj + t; pThread->threadIndex = t; pThread->totalThreads = totalThreads; - pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password, + pThread->taos = taos_connect(g_args.host, g_args.user, g_args.password, NULL, g_args.port); - if (pThread->taosCon == NULL) { + if (pThread->taos == NULL) { errorPrint("Failed to connect to TDengine server %s\n", g_args.host); free(threadObj); return; @@ -3004,7 +2657,7 @@ static void taosStartDumpInWorkThreads() } for (int t = 0; t < totalThreads; ++t) { - taos_close(threadObj[t].taosCon); + taos_close(threadObj[t].taos); } free(threadObj); } @@ -3054,3 +2707,154 @@ static int taosDumpIn() { return 0; } +int main(int argc, char *argv[]) { + static char verType[32] = {0}; + sprintf(verType, "version: %s\n", version); + argp_program_version = verType; + + int ret = 0; + /* Parse our arguments; every option seen by parse_opt will be + reflected in arguments. */ + if (argc > 1) { +// parse_precision_first(argc, argv, &g_args); + parse_timestamp(argc, argv, &g_args); + parse_args(argc, argv, &g_args); + } + + argp_parse(&argp, argc, argv, 0, 0, &g_args); + + if (g_args.abort) { +#ifndef _ALPINE + error(10, 0, "ABORTED"); +#else + abort(); +#endif + } + + printf("====== arguments config ======\n"); + { + printf("host: %s\n", g_args.host); + printf("user: %s\n", g_args.user); + printf("password: %s\n", g_args.password); + printf("port: %u\n", g_args.port); + printf("mysqlFlag: %d\n", g_args.mysqlFlag); + printf("outpath: %s\n", g_args.outpath); + printf("inpath: %s\n", g_args.inpath); + printf("resultFile: %s\n", g_args.resultFile); + printf("encode: %s\n", g_args.encode); + printf("all_databases: %s\n", g_args.all_databases?"true":"false"); + printf("databases: %d\n", g_args.databases); + printf("databasesSeq: %s\n", g_args.databasesSeq); + printf("schemaonly: %s\n", g_args.schemaonly?"true":"false"); + printf("with_property: %s\n", g_args.with_property?"true":"false"); + printf("avro format: %s\n", g_args.avro?"true":"false"); + printf("start_time: %" PRId64 "\n", g_args.start_time); + printf("human readable start time: %s \n", g_args.humanStartTime); + printf("end_time: %" PRId64 "\n", g_args.end_time); + printf("human readable end time: %s \n", g_args.humanEndTime); + printf("precision: %s\n", g_args.precision); + printf("data_batch: %d\n", g_args.data_batch); + printf("max_sql_len: %d\n", g_args.max_sql_len); + printf("table_batch: %d\n", g_args.table_batch); + printf("thread_num: %d\n", g_args.thread_num); + printf("allow_sys: %d\n", g_args.allow_sys); + printf("abort: %d\n", g_args.abort); + printf("isDumpIn: %d\n", g_args.isDumpIn); + printf("arg_list_len: %d\n", g_args.arg_list_len); + printf("debug_print: %d\n", g_args.debug_print); + + for (int32_t i = 0; i < g_args.arg_list_len; i++) { + printf("arg_list[%d]: %s\n", i, g_args.arg_list[i]); + } + } + printf("==============================\n"); + if (taosCheckParam(&g_args) < 0) { + exit(EXIT_FAILURE); + } + + g_fpOfResult = fopen(g_args.resultFile, "a"); + if (NULL == g_fpOfResult) { + errorPrint("Failed to open %s for save result\n", g_args.resultFile); + exit(-1); + }; + + fprintf(g_fpOfResult, "#############################################################################\n"); + fprintf(g_fpOfResult, "============================== arguments config =============================\n"); + { + fprintf(g_fpOfResult, "host: %s\n", g_args.host); + fprintf(g_fpOfResult, "user: %s\n", g_args.user); + fprintf(g_fpOfResult, "password: %s\n", g_args.password); + fprintf(g_fpOfResult, "port: %u\n", g_args.port); + fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag); + fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath); + fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath); + fprintf(g_fpOfResult, "resultFile: %s\n", g_args.resultFile); + fprintf(g_fpOfResult, "encode: %s\n", g_args.encode); + fprintf(g_fpOfResult, "all_databases: %s\n", g_args.all_databases?"true":"false"); + fprintf(g_fpOfResult, "databases: %d\n", g_args.databases); + fprintf(g_fpOfResult, "databasesSeq: %s\n", g_args.databasesSeq); + fprintf(g_fpOfResult, "schemaonly: %s\n", g_args.schemaonly?"true":"false"); + fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false"); + fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false"); + fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time); + fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime); + fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time); + fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime); + fprintf(g_fpOfResult, "precision: %s\n", g_args.precision); + fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch); + fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len); + fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch); + fprintf(g_fpOfResult, "thread_num: %d\n", g_args.thread_num); + fprintf(g_fpOfResult, "allow_sys: %d\n", g_args.allow_sys); + fprintf(g_fpOfResult, "abort: %d\n", g_args.abort); + fprintf(g_fpOfResult, "isDumpIn: %d\n", g_args.isDumpIn); + fprintf(g_fpOfResult, "arg_list_len: %d\n", g_args.arg_list_len); + + for (int32_t i = 0; i < g_args.arg_list_len; i++) { + fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, g_args.arg_list[i]); + } + } + + g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN); + + time_t tTime = time(NULL); + struct tm tm = *localtime(&tTime); + + if (g_args.isDumpIn) { + fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n"); + fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n", + tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + if (taosDumpIn() < 0) { + ret = -1; + } + } else { + fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n"); + fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n", + tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + if (taosDumpOut() < 0) { + ret = -1; + } else { + fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n"); + fprintf(g_fpOfResult, "# total database count: %d\n", + g_resultStatistics.totalDatabasesOfDumpOut); + fprintf(g_fpOfResult, "# total super table count: %d\n", + g_resultStatistics.totalSuperTblsOfDumpOut); + fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n", + g_resultStatistics.totalChildTblsOfDumpOut); + fprintf(g_fpOfResult, "# total row count: %"PRId64"\n", + g_resultStatistics.totalRowsOfDumpOut); + } + } + + fprintf(g_fpOfResult, "\n"); + fclose(g_fpOfResult); + + if (g_tablesList) { + free(g_tablesList); + } + + return ret; +} +