diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6ba420514bf62c5c00a83afedc85b57fd87d86b4..317da97fd359a910b3ed4bbd8886f160426c91d4 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -870,6 +870,11 @@ void *readTable(void *sarg) { int64_t sTime = rinfo->start_time; char *tb_prefix = rinfo->tb_prefix; FILE *fp = fopen(rinfo->fp, "a"); + if (NULL == fp) { + printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + return NULL; + } + int num_of_DPT = rinfo->nrecords_per_table; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; @@ -925,6 +930,11 @@ void *readMetric(void *sarg) { TAOS *taos = rinfo->taos; char command[BUFFER_SIZE] = "\0"; FILE *fp = fopen(rinfo->fp, "a"); + if (NULL == fp) { + printf("fopen %s fail, reason:%s.\n", rinfo->fp, strerror(errno)); + return NULL; + } + int num_of_DPT = rinfo->nrecords_per_table; int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1; int totalData = num_of_DPT * num_of_tables; diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index 0b1890c5abc47159b267c4481a98d25422817d3b..70f18b32bd4b7b36d5c82b9958ab854cf08ff671 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -27,19 +27,18 @@ #include #include #include +#include #include "taos.h" +#include "taosdef.h" #include "taosmsg.h" +#include "tglobal.h" #include "tsclient.h" -#include "taosdef.h" +#include "tsdb.h" #include "tutil.h" -#include "tglobal.h" - #define COMMAND_SIZE 65536 -#define DEFAULT_DUMP_FILE "taosdump.sql" - -#define MAX_DBS 100 +//#define DEFAULT_DUMP_FILE "taosdump.sql" int converStringToReadable(char *str, int size, char *buf, int bufsize); int convertNCharToReadable(char *str, int size, char *buf, int bufsize); @@ -90,21 +89,21 @@ enum _describe_table_index { }; typedef struct { - char field[TSDB_COL_NAME_LEN]; + char field[TSDB_COL_NAME_LEN + 1]; char type[16]; int length; char note[128]; } SColDes; typedef struct { - char name[TSDB_COL_NAME_LEN]; + char name[TSDB_COL_NAME_LEN + 1]; SColDes cols[]; } STableDef; extern char version[]; typedef struct { - char name[TSDB_DB_NAME_LEN]; + char name[TSDB_DB_NAME_LEN + 1]; int32_t replica; int32_t days; int32_t keep; @@ -119,8 +118,8 @@ typedef struct { } SDbInfo; typedef struct { - char name[TSDB_TABLE_NAME_LEN]; - char metric[TSDB_TABLE_NAME_LEN]; + char name[TSDB_TABLE_NAME_LEN + 1]; + char metric[TSDB_TABLE_NAME_LEN + 1]; } STableRecord; typedef struct { @@ -128,6 +127,16 @@ typedef struct { STableRecord tableRecord; } STableRecordInfo; +typedef struct { + pthread_t threadID; + int32_t threadIndex; + int32_t totalThreads; + char dbName[TSDB_TABLE_NAME_LEN + 1]; + void *taosCon; +} SThreadParaObj; + +static int64_t totalDumpOutRows = 0; + SDbInfo **dbInfos = NULL; const char *argp_program_version = version; @@ -142,7 +151,7 @@ static char doc[] = ""; /* to force a line-break, e.g.\n<-- here."; */ /* A description of the arguments we accept. */ -static char args_doc[] = "dbname [tbname ...]\n--databases dbname ...\n--all-databases\n-i input_file"; +static char args_doc[] = "dbname [tbname ...]\n--databases dbname ...\n--all-databases\n-i inpath\n-o outpath"; /* Keys for options without short-options. */ #define OPT_ABORT 1 /* –abort */ @@ -150,60 +159,68 @@ static char args_doc[] = "dbname [tbname ...]\n--databases dbname ...\n--all-dat /* The options we understand. */ static struct argp_option options[] = { // connection option - {"host", 'h', "HOST", 0, "Server host dumping data from. Default is localhost.", 0}, - {"user", 'u', "USER", 0, "User name used to connect to server. Default is root.", 0}, - {"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is taosdata.", 0}, - {"port", 'P', "PORT", 0, "Port to connect", 0}, + {"host", 'h', "HOST", 0, "Server host dumping data from. Default is localhost.", 0}, + {"user", 'u', "USER", 0, "User name used to connect to server. Default is root.", 0}, + {"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is taosdata.", 0}, + {"port", 'P', "PORT", 0, "Port to connect", 0}, + {"cversion", 'v', "CVERION", 0, "client version", 0}, + {"mysqlFlag", 'q', "MYSQLFLAG", 0, "mysqlFlag, Default is 0", 0}, // input/output file - {"output", 'o', "OUTPUT", 0, "Output file name.", 1}, - {"input", 'i', "INPUT", 0, "Input file name.", 1}, - {"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1}, - {"encode", 'e', "ENCODE", 0, "Input file encoding.", 1}, + {"outpath", 'o', "OUTPATH", 0, "Output file path.", 1}, + {"inpath", 'i', "INPATH", 0, "Input file path.", 1}, + {"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1}, + {"encode", 'e', "ENCODE", 0, "Input file encoding.", 1}, // dump unit options - {"all-databases", 'A', 0, 0, "Dump all databases.", 2}, - {"databases", 'B', 0, 0, "Dump assigned databases", 2}, + {"all-databases", 'A', 0, 0, "Dump all databases.", 2}, + {"databases", 'B', 0, 0, "Dump assigned databases", 2}, // dump format options - {"schemaonly", 's', 0, 0, "Only dump schema.", 3}, - {"with-property", 'M', 0, 0, "Dump schema with properties.", 3}, - {"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3}, - {"end-time", 'E', "END_TIME", 0, "End time to dump.", 3}, - {"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3}, - {"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3}, + {"schemaonly", 's', 0, 0, "Only dump schema.", 3}, + {"with-property", 'M', 0, 0, "Dump schema with properties.", 3}, + {"start-time", 'S', "START_TIME", 0, "Start time to dump.", 3}, + {"end-time", 'E', "END_TIME", 0, "End time to dump.", 3}, + {"data-batch", 'N', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3}, + {"table-batch", 'T', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3}, + {"thread_num", 't', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3}, + {"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3}, {0}}; /* Used by main to communicate with parse_opt. */ -typedef struct SDumpArguments { +struct arguments { // connection option - char *host; - char *user; - char *password; - uint16_t port; + char *host; + char *user; + char *password; + uint16_t port; + char cversion[TSDB_FILENAME_LEN+1]; + uint16_t mysqlFlag; // output file - char output[TSDB_FILENAME_LEN]; - char input[TSDB_FILENAME_LEN]; - char *encode; + char outpath[TSDB_FILENAME_LEN+1]; + char inpath[TSDB_FILENAME_LEN+1]; + char *encode; // dump unit option - bool all_databases; - bool databases; + bool all_databases; + bool databases; // dump format option - bool schemaonly; - bool with_property; - int64_t start_time; - int64_t end_time; - int data_batch; - bool allow_sys; + bool schemaonly; + bool with_property; + int64_t start_time; + int64_t end_time; + int32_t data_batch; + int32_t table_batch; // num of table which will be dump into one output file. + bool allow_sys; // other options - int abort; - char **arg_list; - int arg_list_len; - bool isDumpIn; -} SDumpArguments; + int32_t thread_num; + int abort; + char **arg_list; + int arg_list_len; + bool isDumpIn; +}; /* Parse a single option. */ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Get the input argument from argp_parse, which we know is a pointer to our arguments structure. */ - SDumpArguments *arguments = state->input; + struct arguments *arguments = state->input; wordexp_t full_path; switch (key) { @@ -223,13 +240,24 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'P': arguments->port = atoi(arg); break; - // output file + case 'q': + arguments->mysqlFlag = atoi(arg); + break; + case 'v': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid client vesion %s\n", arg); + return -1; + } + strcpy(arguments->cversion, full_path.we_wordv[0]); + wordfree(&full_path); + break; + // output file path case 'o': if (wordexp(arg, &full_path, 0) != 0) { fprintf(stderr, "Invalid path %s\n", arg); return -1; } - tstrncpy(arguments->output, full_path.we_wordv[0], TSDB_FILENAME_LEN); + strcpy(arguments->outpath, full_path.we_wordv[0]); wordfree(&full_path); break; case 'i': @@ -238,7 +266,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { fprintf(stderr, "Invalid path %s\n", arg); return -1; } - tstrncpy(arguments->input, full_path.we_wordv[0], TSDB_FILENAME_LEN); + strcpy(arguments->inpath, full_path.we_wordv[0]); wordfree(&full_path); break; case 'c': @@ -246,7 +274,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { fprintf(stderr, "Invalid path %s\n", arg); return -1; } - tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN); + strcpy(configDir, full_path.we_wordv[0]); wordfree(&full_path); break; case 'e': @@ -276,13 +304,19 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { case 'N': arguments->data_batch = atoi(arg); break; + case 'T': + arguments->table_batch = atoi(arg); + break; + case 't': + arguments->thread_num = atoi(arg); + break; case OPT_ABORT: arguments->abort = 1; break; case ARGP_KEY_ARG: - arguments->arg_list = &state->argv[state->next - 1]; + arguments->arg_list = &state->argv[state->next - 1]; arguments->arg_list_len = state->argc - state->next + 1; - state->next = state->argc; + state->next = state->argc; break; default: @@ -294,52 +328,70 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Our argp parser. */ static struct argp argp = {options, parse_opt, args_doc, doc}; -TAOS *taos = NULL; -char *command = NULL; -char *lcommand = NULL; -char *buffer = NULL; - -int taosDumpOut(SDumpArguments *arguments); - -int taosDumpIn(SDumpArguments *arguments); - +int taosDumpOut(struct arguments *arguments); +int taosDumpIn(struct arguments *arguments); void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp); +int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon); +int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon); +void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp); +void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp); +int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon); +int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon); +int taosCheckParam(struct arguments *arguments); +void taosFreeDbInfos(); +static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName); -int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp); - -void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp); - -void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments, - FILE *fp); - -int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp); - -int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp); - -int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments); - -int taosCheckParam(SDumpArguments *arguments); +struct arguments tsArguments = { + // connection option + NULL, + "root", + "taosdata", + 0, + "", + 0, + // outpath and inpath + "", + "", + NULL, + // dump unit option + false, + false, + // dump format option + false, + false, + 0, + INT64_MAX, + 1, + 1, + false, + // other options + 5, + 0, + NULL, + 0, + false +}; -void taosFreeDbInfos(); +int queryDB(TAOS *taos, char *command) { + TAOS_RES *pSql = NULL; + int32_t code = -1; + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (code) { + fprintf(stderr, "sql error: %s, reason:%s\n", command, taos_errstr(pSql)); + } + taos_free_result(pSql); + return code; +} int main(int argc, char *argv[]) { - SDumpArguments arguments = { - // connection option - NULL, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS, 0, - // output file - DEFAULT_DUMP_FILE, DEFAULT_DUMP_FILE, NULL, - // dump unit option - false, false, - // dump format option - false, false, 0, INT64_MAX, 1, false, - // other options - 0, NULL, 0, false}; /* Parse our arguments; every option seen by parse_opt will be reflected in arguments. */ - argp_parse(&argp, argc, argv, 0, 0, &arguments); + argp_parse(&argp, argc, argv, 0, 0, &tsArguments); - if (arguments.abort) { + if (tsArguments.abort) { #ifndef _ALPINE error(10, 0, "ABORTED"); #else @@ -347,14 +399,48 @@ int main(int argc, char *argv[]) { #endif } - if (taosCheckParam(&arguments) < 0) { + printf("====== arguments config ======\n"); + { + printf("host: %s\n", tsArguments.host); + printf("user: %s\n", tsArguments.user); + printf("password: %s\n", tsArguments.password); + printf("port: %u\n", tsArguments.port); + printf("cversion: %s\n", tsArguments.cversion); + printf("mysqlFlag: %d", tsArguments.mysqlFlag); + printf("outpath: %s\n", tsArguments.outpath); + printf("inpath: %s\n", tsArguments.inpath); + printf("encode: %s\n", tsArguments.encode); + printf("all_databases: %d\n", tsArguments.all_databases); + printf("databases: %d\n", tsArguments.databases); + printf("schemaonly: %d\n", tsArguments.schemaonly); + printf("with_property: %d\n", tsArguments.with_property); + printf("start_time: %" PRId64 "\n", tsArguments.start_time); + printf("end_time: %" PRId64 "\n", tsArguments.end_time); + printf("data_batch: %d\n", tsArguments.data_batch); + printf("table_batch: %d\n", tsArguments.table_batch); + printf("allow_sys: %d\n", tsArguments.allow_sys); + printf("abort: %d\n", tsArguments.abort); + printf("isDumpIn: %d\n", tsArguments.isDumpIn); + printf("arg_list_len: %d\n", tsArguments.arg_list_len); + + for (int32_t i = 0; i < tsArguments.arg_list_len; i++) { + printf("arg_list[%d]: %s\n", i, tsArguments.arg_list[i]); + } + } + printf("==============================\n"); + + if (tsArguments.cversion[0] != 0){ + strcpy(version, tsArguments.cversion); + } + + if (taosCheckParam(&tsArguments) < 0) { exit(EXIT_FAILURE); } - if (arguments.isDumpIn) { - if (taosDumpIn(&arguments) < 0) return -1; + if (tsArguments.isDumpIn) { + if (taosDumpIn(&tsArguments) < 0) return -1; } else { - if (taosDumpOut(&arguments) < 0) return -1; + if (taosDumpOut(&tsArguments) < 0) return -1; } return 0; @@ -362,96 +448,214 @@ int main(int argc, char *argv[]) { void taosFreeDbInfos() { if (dbInfos == NULL) return; - for (int i = 0; i < MAX_DBS; i++) tfree(dbInfos[i]); + for (int i = 0; i < 128; i++) tfree(dbInfos[i]); tfree(dbInfos); } -int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) { +// check table is normal table or super table +int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) { TAOS_ROW row = NULL; bool isSet = false; + TAOS_RES *result = NULL; memset(pTableRecordInfo, 0, sizeof(STableRecordInfo)); - sprintf(command, "show tables like %s", table); - TAOS_RES *result = taos_query(taos, command);\ - int32_t code = taos_errno(result); + char* tempCommand = (char *)malloc(COMMAND_SIZE); + if (tempCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return -1; + } + sprintf(tempCommand, "show tables like %s", table); + + result = taos_query(taosCon, tempCommand); + int32_t code = taos_errno(result); + if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); + fprintf(stderr, "failed to run command %s\n", tempCommand); + free(tempCommand); taos_free_result(result); return -1; } TAOS_FIELD *fields = taos_fetch_fields(result); - if ((row = taos_fetch_row(result)) != NULL) { + while ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = false; strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); + break; } taos_free_result(result); result = NULL; - if (isSet) return 0; - - sprintf(command, "show stables like %s", table); - - result = taos_query(taos, command); + if (isSet) { + free(tempCommand); + return 0; + } + + sprintf(tempCommand, "show stables like %s", table); + + result = taos_query(taosCon, tempCommand); code = taos_errno(result); + if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); + fprintf(stderr, "failed to run command %s\n", tempCommand); + free(tempCommand); taos_free_result(result); return -1; } - if ((row = taos_fetch_row(result)) != NULL) { + while ((row = taos_fetch_row(result)) != NULL) { isSet = true; pTableRecordInfo->isMetric = true; - tstrncpy(pTableRecordInfo->tableRecord.metric, table, TSDB_TABLE_NAME_LEN); + strcpy(pTableRecordInfo->tableRecord.metric, table); + break; } taos_free_result(result); result = NULL; - if (isSet) return 0; - + if (isSet) { + free(tempCommand); + return 0; + } fprintf(stderr, "invalid table/metric %s\n", table); + free(tempCommand); return -1; } -int taosDumpOut(SDumpArguments *arguments) { + +int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter, char* metric, int* fd) { + STableRecord tableRecord; + + if (-1 == *fd) { + *fd = open(".tables.tmp.0", O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + if (*fd == -1) { + fprintf(stderr, "failed to open temp file: .tables.tmp.0\n"); + return -1; + } + } + + memset(tableRecord.name, 0, sizeof(STableRecord)); + strcpy(tableRecord.name, meter); + strcpy(tableRecord.metric, metric); + + twrite(*fd, &tableRecord, sizeof(STableRecord)); + return 0; +} + + +int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct arguments *arguments, int32_t* totalNumOfThread) { + TAOS_ROW row; + int fd = -1; + STableRecord tableRecord; + + char* tmpCommand = (char *)malloc(COMMAND_SIZE); + if (tmpCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return -1; + } + + sprintf(tmpCommand, "select tbname from %s", metric); + + TAOS_RES *result = taos_query(taosCon, tmpCommand); + int32_t code = taos_errno(result); + if (code != 0) { + fprintf(stderr, "failed to run command %s\n", tmpCommand); + free(tmpCommand); + taos_free_result(result); + return -1; + } + + TAOS_FIELD *fields = taos_fetch_fields(result); + + int32_t numOfTable = 0; + int32_t numOfThread = *totalNumOfThread; + char tmpFileName[TSDB_FILENAME_LEN + 1]; + while ((row = taos_fetch_row(result)) != NULL) { + if (0 == numOfTable) { + memset(tmpFileName, 0, TSDB_FILENAME_LEN); + sprintf(tmpFileName, ".tables.tmp.%d", numOfThread); + fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + if (fd == -1) { + fprintf(stderr, "failed to open temp file: %s\n", tmpFileName); + taos_free_result(result); + for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { + sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); + remove(tmpFileName); + } + free(tmpCommand); + return -1; + } + + numOfThread++; + } + + memset(tableRecord.name, 0, sizeof(STableRecord)); + strncpy(tableRecord.name, (char *)row[0], fields[0].bytes); + strcpy(tableRecord.metric, metric); + + twrite(fd, &tableRecord, sizeof(STableRecord)); + + numOfTable++; + + if (numOfTable >= arguments->table_batch) { + numOfTable = 0; + tclose(fd); + fd = -1; + } + } + tclose(fd); + fd = -1; + taos_free_result(result); + + *totalNumOfThread = numOfThread; + + free(tmpCommand); + + return 0; +} + +int taosDumpOut(struct arguments *arguments) { + TAOS *taos = NULL; + TAOS_RES *result = NULL; + char *command = NULL; + TAOS_ROW row; - TAOS_RES* result = NULL; - char *temp = NULL; FILE *fp = NULL; - int count = 0; + int32_t count = 0; STableRecordInfo tableRecordInfo; - fp = fopen(arguments->output, "w"); + char tmpBuf[TSDB_FILENAME_LEN+9] = {0}; + if (arguments->outpath[0] != 0) { + sprintf(tmpBuf, "%s/dbs.sql", arguments->outpath); + } else { + sprintf(tmpBuf, "dbs.sql"); + } + + fp = fopen(tmpBuf, "w"); if (fp == NULL) { - fprintf(stderr, "failed to open file %s\n", arguments->output); + fprintf(stderr, "failed to open file %s\n", tmpBuf); return -1; } - dbInfos = (SDbInfo **)calloc(MAX_DBS, sizeof(SDbInfo *)); + dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *)); if (dbInfos == NULL) { fprintf(stderr, "failed to allocate memory\n"); goto _exit_failure; } - temp = (char *)malloc(2 * COMMAND_SIZE); - if (temp == NULL) { + command = (char *)malloc(COMMAND_SIZE); + if (command == NULL) { fprintf(stderr, "failed to allocate memory\n"); goto _exit_failure; } - command = temp; - buffer = command + COMMAND_SIZE; - /* Connect to server */ taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port); if (taos == NULL) { @@ -465,29 +669,30 @@ int taosDumpOut(SDumpArguments *arguments) { taosDumpCharset(fp); sprintf(command, "show databases"); - result = taos_query(taos, command); + result = taos_query(taos, command); int32_t code = taos_errno(result); + if (code != 0) { - fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(result)); - taos_free_result(result); + fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos)); goto _exit_failure; } TAOS_FIELD *fields = taos_fetch_fields(result); while ((row = taos_fetch_row(result)) != NULL) { + // sys database name : 'monitor', but subsequent version changed to 'log' if (strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "monitor", fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0 && (!arguments->allow_sys)) continue; - if (arguments->databases) { + if (arguments->databases) { // input multi dbs for (int i = 0; arguments->arg_list[i]; i++) { if (strncasecmp(arguments->arg_list[i], (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) goto _dump_db_point; } continue; - } else if (!arguments->all_databases) { + } else if (!arguments->all_databases) { // only input one db if (strncasecmp(arguments->arg_list[0], (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0) goto _dump_db_point; @@ -504,19 +709,19 @@ int taosDumpOut(SDumpArguments *arguments) { } strncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX], fields[TSDB_SHOW_DB_NAME_INDEX].bytes); - if (strcmp(arguments->user, TSDB_DEFAULT_USER) == 0) { - dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX])); - dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX])); - dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]); - dbInfos[count]->tables = *((int *)row[TSDB_SHOW_DB_TABLES_INDEX]); - dbInfos[count]->rows = *((int *)row[TSDB_SHOW_DB_ROWS_INDEX]); - dbInfos[count]->cache = *((int *)row[TSDB_SHOW_DB_CACHE_INDEX]); - dbInfos[count]->ablocks = *((int *)row[TSDB_SHOW_DB_ABLOCKS_INDEX]); - dbInfos[count]->tblocks = (int)(*((int16_t *)row[TSDB_SHOW_DB_TBLOCKS_INDEX])); - dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]); - dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX])); - dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); - } + #if 0 + dbInfos[count]->replica = (int)(*((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX])); + dbInfos[count]->days = (int)(*((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX])); + dbInfos[count]->keep = *((int *)row[TSDB_SHOW_DB_KEEP_INDEX]); + dbInfos[count]->tables = *((int *)row[TSDB_SHOW_DB_TABLES_INDEX]); + dbInfos[count]->rows = *((int *)row[TSDB_SHOW_DB_ROWS_INDEX]); + dbInfos[count]->cache = *((int *)row[TSDB_SHOW_DB_CACHE_INDEX]); + dbInfos[count]->ablocks = *((int *)row[TSDB_SHOW_DB_ABLOCKS_INDEX]); + dbInfos[count]->tblocks = (int)(*((int16_t *)row[TSDB_SHOW_DB_TBLOCKS_INDEX])); + dbInfos[count]->ctime = *((int *)row[TSDB_SHOW_DB_CTIME_INDEX]); + dbInfos[count]->clog = (int)(*((int8_t *)row[TSDB_SHOW_DB_CLOG_INDEX])); + dbInfos[count]->comp = (int)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); +#endif count++; @@ -528,43 +733,72 @@ int taosDumpOut(SDumpArguments *arguments) { } } - // taos_free_result(result); - if (count == 0) { fprintf(stderr, "No databases valid to dump\n"); goto _exit_failure; } - if (arguments->databases || arguments->all_databases) { + if (arguments->databases || arguments->all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases for (int i = 0; i < count; i++) { - (void)taosDumpDb(dbInfos[i], arguments, fp); + taosDumpDb(dbInfos[i], arguments, fp, taos); } } else { - if (arguments->arg_list_len == 1) { - (void)taosDumpDb(dbInfos[0], arguments, fp); - } else { + if (arguments->arg_list_len == 1) { // case: taosdump + taosDumpDb(dbInfos[0], arguments, fp, taos); + } else { // case: taosdump tablex tabley ... taosDumpCreateDbClause(dbInfos[0], arguments->with_property, fp); sprintf(command, "use %s", dbInfos[0]->name); - if (taos_query(taos, command) == NULL ) { + + result = taos_query(taos, command); + int32_t code = taos_errno(result); + if (code != 0) { fprintf(stderr, "invalid database %s\n", dbInfos[0]->name); goto _exit_failure; } fprintf(fp, "USE %s;\n\n", dbInfos[0]->name); + int32_t totalNumOfThread = 1; // 0: all normal talbe into .tables.tmp.0 + int normalTblFd = -1; + int32_t retCode; for (int i = 1; arguments->arg_list[i]; i++) { - if (taosGetTableRecordInfo(arguments->arg_list[i], &tableRecordInfo) < 0) { - fprintf(stderr, "invalide table %s\n", arguments->arg_list[i]); + if (taosGetTableRecordInfo(arguments->arg_list[i], &tableRecordInfo, taos) < 0) { + fprintf(stderr, "input the invalide table %s\n", arguments->arg_list[i]); continue; } - if (tableRecordInfo.isMetric) { // dump whole metric - (void)taosDumpMetric(tableRecordInfo.tableRecord.metric, arguments, fp); - } else { // dump MTable and NTable - (void)taosDumpTable(tableRecordInfo.tableRecord.name, tableRecordInfo.tableRecord.metric, arguments, fp); + if (tableRecordInfo.isMetric) { // dump all table of this metric + (void)taosDumpStable(tableRecordInfo.tableRecord.metric, fp, taos); + retCode = taosSaveTableOfMetricToTempFile(taos, tableRecordInfo.tableRecord.metric, arguments, &totalNumOfThread); + } else { + if (tableRecordInfo.tableRecord.metric[0] != '\0') { // dump this sub table and it's metric + (void)taosDumpStable(tableRecordInfo.tableRecord.metric, fp, taos); + } + retCode = taosSaveAllNormalTableToTempFile(taos, tableRecordInfo.tableRecord.name, tableRecordInfo.tableRecord.metric, &normalTblFd); + } + + if (retCode < 0) { + if (-1 != normalTblFd){ + tclose(normalTblFd); + } + goto _clean_tmp_file; } } + + if (-1 != normalTblFd){ + tclose(normalTblFd); + } + + // start multi threads to dumpout + taosStartDumpOutWorkThreads(arguments, totalNumOfThread, dbInfos[0]->name); + + char tmpFileName[TSDB_FILENAME_LEN + 1]; + _clean_tmp_file: + for (int loopCnt = 0; loopCnt < totalNumOfThread; loopCnt++) { + sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); + remove(tmpFileName); + } } } @@ -572,413 +806,600 @@ int taosDumpOut(SDumpArguments *arguments) { fclose(fp); taos_close(taos); taos_free_result(result); - tfree(temp); - taosFreeDbInfos(); + tfree(command); + taosFreeDbInfos(); + fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows); return 0; _exit_failure: fclose(fp); taos_close(taos); taos_free_result(result); - tfree(temp); + tfree(command); taosFreeDbInfos(); + fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows); return -1; } -void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { - char *pstr = buffer; - - pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s", dbInfo->name); - if (isDumpProperty) { - pstr += sprintf(pstr, - " REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d", - dbInfo->replica, dbInfo->days, dbInfo->keep, dbInfo->tables, dbInfo->rows, dbInfo->cache, - dbInfo->ablocks, dbInfo->tblocks, dbInfo->ctime, dbInfo->clog, dbInfo->comp); - } - - fprintf(fp, "%s\n\n", buffer); -} - -int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) { - TAOS_ROW row; - int fd = -1; - STableRecord tableRecord; - - taosDumpCreateDbClause(dbInfo, arguments->with_property, fp); +int taosGetTableDes(char *table, STableDef *tableDes, TAOS* taosCon) { + TAOS_ROW row = NULL; + TAOS_RES *tmpResult = NULL; + int count = 0; - sprintf(command, "use %s", dbInfo->name); - if (taos_errno(taos_query(taos, command)) != 0) { - fprintf(stderr, "invalid database %s\n", dbInfo->name); + char* tempCommand = (char *)malloc(COMMAND_SIZE); + if (tempCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); return -1; } - fprintf(fp, "USE %s\n\n", dbInfo->name); - - sprintf(command, "show tables"); - TAOS_RES* result = taos_query(taos,command); - int32_t code = taos_errno(result); + sprintf(tempCommand, "describe %s", table); + + tmpResult = taos_query(taosCon, tempCommand); + int32_t code = taos_errno(tmpResult); if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); - taos_free_result(result); + fprintf(stderr, "failed to run command %s\n", tempCommand); + free(tempCommand); + taos_free_result(tmpResult); return -1; } - TAOS_FIELD *fields = taos_fetch_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(tmpResult); - fd = open(".table.tmp", O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd == -1) { - fprintf(stderr, "failed to open temp file\n"); - taos_free_result(result); - return -1; - } + strcpy(tableDes->name, table); - while ((row = taos_fetch_row(result)) != NULL) { - memset(&tableRecord, 0, sizeof(STableRecord)); - strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); - strncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); + while ((row = taos_fetch_row(tmpResult)) != NULL) { + strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], + fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); + strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], + fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); + tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); + strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], + fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); - twrite(fd, &tableRecord, sizeof(STableRecord)); + count++; } - taos_free_result(result); + taos_free_result(tmpResult); + tmpResult = NULL; - (void)lseek(fd, 0, SEEK_SET); + free(tempCommand); - STableRecord tableInfo; - while (1) { - memset(&tableInfo, 0, sizeof(STableRecord)); - ssize_t ret = read(fd, &tableInfo, sizeof(STableRecord)); - if (ret <= 0) break; - - tableInfo.name[sizeof(tableInfo.name) - 1] = 0; - tableInfo.metric[sizeof(tableInfo.metric) - 1] = 0; - taosDumpTable(tableInfo.name, tableInfo.metric, arguments, fp); - } + return count; +} - close(fd); - (void)remove(".table.tmp"); +int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon) { + int count = 0; - return 0; -} + STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); -void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, SDumpArguments *arguments, FILE *fp) { - char *pstr = NULL; - pstr = buffer; - int counter = 0; - int count_temp = 0; + if (metric != NULL && metric[0] != '\0') { // dump table schema which is created by using super table + /* + count = taosGetTableDes(metric, tableDes, taosCon); + + if (count < 0) { + free(tableDes); + return -1; + } - pstr += sprintf(buffer, "CREATE TABLE IF NOT EXISTS %s", tableDes->name); + taosDumpCreateTableClause(tableDes, count, fp); - for (; counter < numOfCols; counter++) { - if (tableDes->cols[counter].note[0] != '\0') break; + memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); + */ - if (counter == 0) { - pstr += sprintf(pstr, " (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); - } else { - pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); + count = taosGetTableDes(table, tableDes, taosCon); + + if (count < 0) { + free(tableDes); + return -1; } - if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || - strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); + taosDumpCreateMTableClause(tableDes, metric, count, fp); + + } else { // dump table definition + count = taosGetTableDes(table, tableDes, taosCon); + + if (count < 0) { + free(tableDes); + return -1; } + + taosDumpCreateTableClause(tableDes, count, fp); } - count_temp = counter; + free(tableDes); - for (; counter < numOfCols; counter++) { - if (counter == count_temp) { - pstr += sprintf(pstr, ") TAGS (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); - } else { - pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); - } + return taosDumpTableData(fp, table, arguments, taosCon); +} - if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || - strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); - } +void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { + + char* tmpCommand = (char *)malloc(COMMAND_SIZE); + if (tmpCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return; + } + + char *pstr = tmpCommand; + + pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s", dbInfo->name); + if (isDumpProperty) { + pstr += sprintf(pstr, + " REPLICA %d DAYS %d KEEP %d TABLES %d ROWS %d CACHE %d ABLOCKS %d TBLOCKS %d CTIME %d CLOG %d COMP %d", + dbInfo->replica, dbInfo->days, dbInfo->keep, dbInfo->tables, dbInfo->rows, dbInfo->cache, + dbInfo->ablocks, dbInfo->tblocks, dbInfo->ctime, dbInfo->clog, dbInfo->comp); } - pstr += sprintf(pstr, ")"); + pstr += sprintf(pstr, ";"); - fprintf(fp, "%s\n\n", buffer); + fprintf(fp, "%s\n\n", tmpCommand); + free(tmpCommand); } -void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, SDumpArguments *arguments, - FILE *fp) { - char *pstr = NULL; - pstr = buffer; - int counter = 0; - int count_temp = 0; +void* taosDumpOutWorkThreadFp(void *arg) +{ + SThreadParaObj *pThread = (SThreadParaObj*)arg; + STableRecord tableRecord; + int fd; + + char tmpFileName[TSDB_FILENAME_LEN*4] = {0}; + sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex); + fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + if (fd == -1) { + fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpFileName); + return NULL; + } - pstr += sprintf(buffer, "CREATE TABLE IF NOT EXISTS %s USING %s TAGS (", tableDes->name, metric); + FILE *fp = NULL; + memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128); + + if (tsArguments.outpath[0] != 0) { + sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex); + } else { + sprintf(tmpFileName, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex); + } + + fp = fopen(tmpFileName, "w"); + if (fp == NULL) { + fprintf(stderr, "failed to open file %s\n", tmpFileName); + return NULL; + } - for (; counter < numOfCols; counter++) { - if (tableDes->cols[counter].note[0] != '\0') break; + memset(tmpFileName, 0, TSDB_FILENAME_LEN); + sprintf(tmpFileName, "use %s", pThread->dbName); + + TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpFileName); + int32_t code = taos_errno(tmpResult); + if (code != 0) { + fprintf(stderr, "invalid database %s\n", pThread->dbName); + taos_free_result(tmpResult); + return NULL; } - assert(counter < numOfCols); - count_temp = counter; + fprintf(fp, "USE %s\n\n", pThread->dbName); + while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) { + taosDumpTable(tableRecord.name, tableRecord.metric, &tsArguments, fp, pThread->taosCon); + } - for (; counter < numOfCols; counter++) { - TAOS_ROW row = NULL; + taos_free_result(tmpResult); + tclose(fd); + fclose(fp); - sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name); + return NULL; +} - TAOS_RES* result = taos_query(taos, command); - int32_t code = taos_errno(result); - if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); - return; +static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName) +{ + pthread_attr_t thattr; + SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj)); + for (int t = 0; t < numOfThread; ++t) { + SThreadParaObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = numOfThread; + strcpy(pThread->dbName, dbName); + pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); + + if (pThread->taosCon == NULL) { + fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taosCon)); + exit(0); } - TAOS_FIELD *fields = taos_fetch_fields(result); - - row = taos_fetch_row(result); - switch (fields[0].type) { - case TSDB_DATA_TYPE_BOOL: - sprintf(tableDes->cols[counter].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0)); - break; - case TSDB_DATA_TYPE_TINYINT: - sprintf(tableDes->cols[counter].note, "%d", (int)(*((char *)row[0]))); - break; - case TSDB_DATA_TYPE_SMALLINT: - sprintf(tableDes->cols[counter].note, "%d", (int)(*((short *)row[0]))); - break; - case TSDB_DATA_TYPE_INT: - sprintf(tableDes->cols[counter].note, "%d", *((int *)row[0])); - break; - case TSDB_DATA_TYPE_BIGINT: - sprintf(tableDes->cols[counter].note, "%" PRId64 "", *((int64_t *)row[0])); - break; - case TSDB_DATA_TYPE_FLOAT: - sprintf(tableDes->cols[counter].note, "%f", GET_FLOAT_VAL(row[0])); - break; - case TSDB_DATA_TYPE_DOUBLE: - sprintf(tableDes->cols[counter].note, "%f", GET_DOUBLE_VAL(row[0])); - break; - case TSDB_DATA_TYPE_TIMESTAMP: - sprintf(tableDes->cols[counter].note, "%" PRId64 "", *(int64_t *)row[0]); - break; - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - default: - strncpy(tableDes->cols[counter].note, (char *)row[0], fields[0].bytes); - break; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, taosDumpOutWorkThreadFp, (void*)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); } + } - taos_free_result(result); + for (int32_t t = 0; t < numOfThread; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } - if (counter != count_temp) { - if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || - strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note); - } else { - pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note); - } - } else { - if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || - strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { - pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note); - } else { - pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); - } - /* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */ - } + for (int32_t t = 0; t < numOfThread; ++t) { + taos_close(threadObj[t].taosCon); + } + free(threadObj); +} - /* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar") - * == 0) { */ - /* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */ - /* } */ + + +int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon) { + int count = 0; + + STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); + if (NULL == tableDes) { + fprintf(stderr, "failed to allocate memory\n"); + exit(-1); + } + + count = taosGetTableDes(table, tableDes, taosCon); + + if (count < 0) { + free(tableDes); + fprintf(stderr, "failed to get stable schema\n"); + exit(-1); } - pstr += sprintf(pstr, ")"); + taosDumpCreateTableClause(tableDes, count, fp); - fprintf(fp, "%s\n\n", buffer); + free(tableDes); + return 0; } -int taosGetTableDes(char *table, STableDef *tableDes) { - TAOS_ROW row = NULL; - int count = 0; - sprintf(command, "describe %s", table); +int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp) +{ + TAOS_ROW row; + int fd = -1; + STableRecord tableRecord; + + char* tmpCommand = (char *)malloc(COMMAND_SIZE); + if (tmpCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + exit(-1); + } - TAOS_RES* result = taos_query(taos, command); - int32_t code = taos_errno(result); + sprintf(tmpCommand, "use %s", dbName); + + TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand); + int32_t code = taos_errno(tmpResult); if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); - taos_free_result(result); - return -1; + fprintf(stderr, "invalid database %s, error: %s\n", dbName, taos_errstr(taosCon)); + free(tmpCommand); + taos_free_result(tmpResult); + exit(-1); } + + taos_free_result(tmpResult); - TAOS_FIELD *fields = taos_fetch_fields(result); + sprintf(tmpCommand, "show stables"); + + tmpResult = taos_query(taosCon, tmpCommand); + code = taos_errno(tmpResult); + if (code != 0) { + fprintf(stderr, "failed to run command %s, error: %s\n", tmpCommand, taos_errstr(taosCon)); + free(tmpCommand); + taos_free_result(tmpResult); + exit(-1); + } + taos_free_result(tmpResult); - tstrncpy(tableDes->name, table, TSDB_COL_NAME_LEN); + TAOS_FIELD *fields = taos_fetch_fields(tmpResult); - while ((row = taos_fetch_row(result)) != NULL) { - strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX], - fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes); - strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX], - fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes); - tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]); - strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX], - fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes); - - count++; + char tmpFileName[TSDB_FILENAME_LEN + 1]; + memset(tmpFileName, 0, TSDB_FILENAME_LEN); + sprintf(tmpFileName, ".stables.tmp"); + fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + if (fd == -1) { + fprintf(stderr, "failed to open temp file: %s\n", tmpFileName); + taos_free_result(tmpResult); + free(tmpCommand); + remove(".stables.tmp"); + exit(-1); } + + while ((row = taos_fetch_row(tmpResult)) != NULL) { + memset(&tableRecord, 0, sizeof(STableRecord)); + strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); + twrite(fd, &tableRecord, sizeof(STableRecord)); + } + + taos_free_result(tmpResult); + lseek(fd, 0, SEEK_SET); - taos_free_result(result); - result = NULL; + while (read(fd, &tableRecord, sizeof(STableRecord)) > 0) { + (void)taosDumpStable(tableRecord.name, fp, taosCon); + } - return count; + tclose(fd); + remove(".stables.tmp"); + + free(tmpCommand); + return 0; } -int32_t taosDumpTable(char *table, char *metric, SDumpArguments *arguments, FILE *fp) { - int count = 0; - STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); +int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon) { + TAOS_ROW row; + int fd = -1; + STableRecord tableRecord; - if (metric != NULL && metric[0] != '\0') { // dump metric definition - count = taosGetTableDes(metric, tableDes); + taosDumpCreateDbClause(dbInfo, arguments->with_property, fp); - if (count < 0) { - free(tableDes); - return -1; - } + char* tmpCommand = (char *)malloc(COMMAND_SIZE); + if (tmpCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return -1; + } - taosDumpCreateTableClause(tableDes, count, arguments, fp); + sprintf(tmpCommand, "use %s", dbInfo->name); + + TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand); + int32_t code = taos_errno(tmpResult); + if (code != 0) { + fprintf(stderr, "invalid database %s\n", dbInfo->name); + free(tmpCommand); + taos_free_result(tmpResult); + return -1; + } + taos_free_result(tmpResult); - memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS); + fprintf(fp, "USE %s\n\n", dbInfo->name); + + (void)taosDumpCreateSuperTableClause(taosCon, dbInfo->name, fp); - count = taosGetTableDes(table, tableDes); + sprintf(tmpCommand, "show tables"); + + tmpResult = taos_query(taosCon, tmpCommand); + code = taos_errno(tmpResult); + if (code != 0) { + fprintf(stderr, "failed to run command %s\n", tmpCommand); + free(tmpCommand); + taos_free_result(tmpResult); + return -1; + } - if (count < 0) { - free(tableDes); - return -1; + TAOS_FIELD *fields = taos_fetch_fields(tmpResult); + + int32_t numOfTable = 0; + int32_t numOfThread = 0; + char tmpFileName[TSDB_FILENAME_LEN + 1]; + while ((row = taos_fetch_row(tmpResult)) != NULL) { + if (0 == numOfTable) { + memset(tmpFileName, 0, TSDB_FILENAME_LEN); + sprintf(tmpFileName, ".tables.tmp.%d", numOfThread); + fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + if (fd == -1) { + fprintf(stderr, "failed to open temp file: %s\n", tmpFileName); + taos_free_result(tmpResult); + for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { + sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); + remove(tmpFileName); + } + free(tmpCommand); + return -1; + } + + numOfThread++; } + + memset(&tableRecord, 0, sizeof(STableRecord)); + strncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX], fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes); + strncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes); - taosDumpCreateMTableClause(tableDes, metric, count, arguments, fp); + twrite(fd, &tableRecord, sizeof(STableRecord)); - } else { // dump table definition - count = taosGetTableDes(table, tableDes); + numOfTable++; - if (count < 0) { - free(tableDes); - return -1; + if (numOfTable >= arguments->table_batch) { + numOfTable = 0; + tclose(fd); + fd = -1; } + } + tclose(fd); + fd = -1; + taos_free_result(tmpResult); + + // start multi threads to dumpout + taosStartDumpOutWorkThreads(arguments, numOfThread, dbInfo->name); + for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) { + sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); + remove(tmpFileName); + } + + free(tmpCommand); - taosDumpCreateTableClause(tableDes, count, arguments, fp); + return 0; +} + +void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp) { + int counter = 0; + int count_temp = 0; + + char* tmpBuf = (char *)malloc(COMMAND_SIZE); + if (tmpBuf == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return; } - free(tableDes); + char* pstr = tmpBuf; - return taosDumpTableData(fp, table, arguments); -} + pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s", tableDes->name); -int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) { - TAOS_ROW row = NULL; - int fd = -1; - STableRecord tableRecord; + for (; counter < numOfCols; counter++) { + if (tableDes->cols[counter].note[0] != '\0') break; - //tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN); + if (counter == 0) { + pstr += sprintf(pstr, " (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); + } else { + pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); + } - sprintf(command, "select tbname from %s", metric); - TAOS_RES* result = taos_query(taos, command); - int32_t code = taos_errno(result); - if (code != 0) { - fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result)); - taos_free_result(result); - return -1; + if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || + strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { + pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); + } } - fd = open(".table.tmp", O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); - if (fd < 0) { - fprintf(stderr, "failed to open temp file"); - return -1; + count_temp = counter; + + for (; counter < numOfCols; counter++) { + if (counter == count_temp) { + pstr += sprintf(pstr, ") TAGS (%s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); + } else { + pstr += sprintf(pstr, ", %s %s", tableDes->cols[counter].field, tableDes->cols[counter].type); + } + + if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || + strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { + pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); + } } - TAOS_FIELD *fields = taos_fetch_fields(result); + pstr += sprintf(pstr, ");"); - while ((row = taos_fetch_row(result)) != NULL) { - memset(&tableRecord, 0, sizeof(STableRecord)); - tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes); - tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN); - twrite(fd, &tableRecord, sizeof(STableRecord)); + fprintf(fp, "%s\n", tmpBuf); + + free(tmpBuf); +} + +void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp) { + int counter = 0; + int count_temp = 0; + + char* tmpBuf = (char *)malloc(COMMAND_SIZE); + if (tmpBuf == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return; } - taos_free_result(result); - result = NULL; + char *pstr = NULL; + pstr = tmpBuf; - (void)lseek(fd, 0, SEEK_SET); + pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s USING %s TAGS (", tableDes->name, metric); - //STableRecord tableInfo; - char tableName[TSDB_TABLE_NAME_LEN] ; - char metricName[TSDB_TABLE_NAME_LEN]; - ssize_t ret; - while (1) { - //memset(&tableInfo, 0, sizeof(STableRecord)); - memset(tableName, 0, TSDB_TABLE_NAME_LEN); - memset(metricName, 0, TSDB_TABLE_NAME_LEN); - //ssize_t ret = read(fd, &tableInfo, sizeof(STableRecord)); - //if (ret <= 0) break; - ret = read(fd, tableName, TSDB_TABLE_NAME_LEN); - if (ret <= 0) break; - - ret = read(fd, metricName, TSDB_TABLE_NAME_LEN); - if (ret <= 0) break; - - //tableInfo.name[sizeof(tableInfo.name) - 1] = 0; - //tableInfo.metric[sizeof(tableInfo.metric) - 1] = 0; - //taosDumpTable(tableInfo.name, tableInfo.metric, arguments, fp); - //tstrncpy(tableName, tableInfo.name, TSDB_TABLE_NAME_LEN-1); - //tstrncpy(metricName, tableInfo.metric, TSDB_TABLE_NAME_LEN-1); - taosDumpTable(tableName, metricName, arguments, fp); + for (; counter < numOfCols; counter++) { + if (tableDes->cols[counter].note[0] != '\0') break; } - close(fd); - (void)remove(".table.tmp"); + assert(counter < numOfCols); + count_temp = counter; - return 0; + for (; counter < numOfCols; counter++) { + if (counter != count_temp) { + if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || + strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { + pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note); + } else { + pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note); + } + } else { + if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || + strcasecmp(tableDes->cols[counter].type, "nchar") == 0) { + pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note); + } else { + pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); + } + /* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */ + } + + /* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar") + * == 0) { */ + /* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */ + /* } */ + } + + pstr += sprintf(pstr, ");"); + + fprintf(fp, "%s\n", tmpBuf); + free(tmpBuf); } -int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) { +int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon) { /* char temp[MAX_COMMAND_SIZE] = "\0"; */ + int64_t totalRows = 0; int count = 0; char *pstr = NULL; TAOS_ROW row = NULL; int numFields = 0; char *tbuf = NULL; - if (arguments->schemaonly) return 0; + char* tmpCommand = (char *)calloc(1, COMMAND_SIZE); + if (tmpCommand == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return -1; + } + + char* tmpBuffer = (char *)calloc(1, COMMAND_SIZE); + if (tmpBuffer == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + free(tmpCommand); + return -1; + } + + pstr = tmpBuffer; - sprintf(command, "select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc", tbname, arguments->start_time, + if (arguments->schemaonly) { + free(tmpCommand); + free(tmpBuffer); + return 0; + } + + sprintf(tmpCommand, + "select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc", + tbname, + arguments->start_time, arguments->end_time); - - TAOS_RES* result = taos_query(taos, command); - int32_t code = taos_errno(result); + + TAOS_RES* tmpResult = taos_query(taosCon, tmpCommand); + int32_t code = taos_errno(tmpResult); if (code != 0) { - fprintf(stderr, "failed to run command %s, reason: %s\n", command, taos_errstr(result)); - taos_free_result(result); + fprintf(stderr, "failed to run command %s, reason: %s\n", tmpCommand, taos_errstr(taosCon)); + free(tmpCommand); + free(tmpBuffer); + taos_free_result(tmpResult); return -1; } - numFields = taos_field_count(result); + numFields = taos_field_count(taosCon); assert(numFields > 0); - TAOS_FIELD *fields = taos_fetch_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(tmpResult); tbuf = (char *)malloc(COMMAND_SIZE); if (tbuf == NULL) { fprintf(stderr, "No enough memory\n"); + free(tmpCommand); + free(tmpBuffer); + taos_free_result(tmpResult); return -1; } + char sqlStr[8] = "\0"; + if (arguments->mysqlFlag) { + sprintf(sqlStr, "INSERT"); + } else { + sprintf(sqlStr, "IMPORT"); + } + + int rowFlag = 0; count = 0; - while ((row = taos_fetch_row(result)) != NULL) { - pstr = buffer; + while ((row = taos_fetch_row(tmpResult)) != NULL) { + pstr = tmpBuffer; if (count == 0) { - pstr += sprintf(pstr, "INSERT INTO %s VALUES (", tbname); - } else { - pstr += sprintf(pstr, "("); + pstr += sprintf(pstr, "%s INTO %s VALUES (", sqlStr, tbname); + } else { + if (arguments->mysqlFlag) { + if (0 == rowFlag) { + pstr += sprintf(pstr, "("); + rowFlag++; + } else { + pstr += sprintf(pstr, ", ("); + } + } else { + pstr += sprintf(pstr, "("); + } } for (int col = 0; col < numFields; col++) { @@ -1003,7 +1424,7 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) { pstr += sprintf(pstr, "%d", *((int *)row[col])); break; case TSDB_DATA_TYPE_BIGINT: - pstr += sprintf(pstr, "%" PRId64, *((int64_t *)row[col])); + pstr += sprintf(pstr, "%" PRId64 "", *((int64_t *)row[col])); break; case TSDB_DATA_TYPE_FLOAT: pstr += sprintf(pstr, "%f", GET_FLOAT_VAL(row[col])); @@ -1022,34 +1443,52 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) { pstr += sprintf(pstr, "\'%s\'", tbuf); break; case TSDB_DATA_TYPE_TIMESTAMP: - pstr += sprintf(pstr, "%" PRId64, *(int64_t *)row[col]); + if (!arguments->mysqlFlag) { + pstr += sprintf(pstr, "%" PRId64 "", *(int64_t *)row[col]); + } else { + char buf[64] = "\0"; + int64_t ts = *((int64_t *)row[col]); + time_t tt = (time_t)(ts / 1000); + struct tm *ptm = localtime(&tt); + strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm); + pstr += sprintf(pstr, "\'%s.%03d\'", buf, (int)(ts % 1000)); + } break; default: break; } } - sprintf(pstr, ")"); + pstr += sprintf(pstr, ") "); + + totalRows++; count++; - fprintf(fp, "%s", buffer); + fprintf(fp, "%s", tmpBuffer); if (count >= arguments->data_batch) { - fprintf(fp, "\n"); + fprintf(fp, ";\n"); count = 0; - } else { - fprintf(fp, "\\\n"); - } + } //else { + //fprintf(fp, "\\\n"); + //} } + atomic_add_fetch_64(&totalDumpOutRows, totalRows); + fprintf(fp, "\n"); - if (tbuf) free(tbuf); - taos_free_result(result); - result = NULL; + if (tbuf) { + free(tbuf); + } + + taos_free_result(tmpResult); + tmpResult = NULL; + free(tmpCommand); + free(tmpBuffer); return 0; } -int taosCheckParam(SDumpArguments *arguments) { +int taosCheckParam(struct arguments *arguments) { if (arguments->all_databases && arguments->databases) { fprintf(stderr, "conflict option --all-databases and --databases\n"); return -1; @@ -1059,23 +1498,29 @@ int taosCheckParam(SDumpArguments *arguments) { fprintf(stderr, "start time is larger than end time\n"); return -1; } + if (arguments->arg_list_len == 0) { if ((!arguments->all_databases) && (!arguments->isDumpIn)) { fprintf(stderr, "taosdump requires parameters\n"); return -1; } } - - if (arguments->isDumpIn && (strcmp(arguments->output, DEFAULT_DUMP_FILE) != 0)) { - fprintf(stderr, "duplicate parameter input and output file\n"); +/* + if (arguments->isDumpIn && (strcmp(arguments->outpath, DEFAULT_DUMP_FILE) != 0)) { + fprintf(stderr, "duplicate parameter input and output file path\n"); return -1; } - +*/ if (!arguments->isDumpIn && arguments->encode != NULL) { fprintf(stderr, "invalid option in dump out\n"); return -1; } + if (arguments->table_batch <= 0) { + fprintf(stderr, "invalid option in dump out\n"); + return -1; + } + return 0; } @@ -1134,36 +1579,273 @@ void taosReplaceCtrlChar(char *str) { *pstr = '\0'; } -int taosDumpIn(SDumpArguments *arguments) { - assert(arguments->isDumpIn); +char *ascii_literal_list[] = { + "\\x00", "\\x01", "\\x02", "\\x03", "\\x04", "\\x05", "\\x06", "\\x07", "\\x08", "\\t", "\\n", "\\x0b", "\\x0c", + "\\r", "\\x0e", "\\x0f", "\\x10", "\\x11", "\\x12", "\\x13", "\\x14", "\\x15", "\\x16", "\\x17", "\\x18", "\\x19", + "\\x1a", "\\x1b", "\\x1c", "\\x1d", "\\x1e", "\\x1f", " ", "!", "\\\"", "#", "$", "%", "&", + "\\'", "(", ")", "*", "+", ",", "-", ".", "/", "0", "1", "2", "3", + "4", "5", "6", "7", "8", "9", ":", ";", "<", "=", ">", "?", "@", + "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", + "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", + "[", "\\\\", "]", "^", "_", "`", "a", "b", "c", "d", "e", "f", "g", + "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", + "u", "v", "w", "x", "y", "z", "{", "|", "}", "~", "\\x7f", "\\x80", "\\x81", + "\\x82", "\\x83", "\\x84", "\\x85", "\\x86", "\\x87", "\\x88", "\\x89", "\\x8a", "\\x8b", "\\x8c", "\\x8d", "\\x8e", + "\\x8f", "\\x90", "\\x91", "\\x92", "\\x93", "\\x94", "\\x95", "\\x96", "\\x97", "\\x98", "\\x99", "\\x9a", "\\x9b", + "\\x9c", "\\x9d", "\\x9e", "\\x9f", "\\xa0", "\\xa1", "\\xa2", "\\xa3", "\\xa4", "\\xa5", "\\xa6", "\\xa7", "\\xa8", + "\\xa9", "\\xaa", "\\xab", "\\xac", "\\xad", "\\xae", "\\xaf", "\\xb0", "\\xb1", "\\xb2", "\\xb3", "\\xb4", "\\xb5", + "\\xb6", "\\xb7", "\\xb8", "\\xb9", "\\xba", "\\xbb", "\\xbc", "\\xbd", "\\xbe", "\\xbf", "\\xc0", "\\xc1", "\\xc2", + "\\xc3", "\\xc4", "\\xc5", "\\xc6", "\\xc7", "\\xc8", "\\xc9", "\\xca", "\\xcb", "\\xcc", "\\xcd", "\\xce", "\\xcf", + "\\xd0", "\\xd1", "\\xd2", "\\xd3", "\\xd4", "\\xd5", "\\xd6", "\\xd7", "\\xd8", "\\xd9", "\\xda", "\\xdb", "\\xdc", + "\\xdd", "\\xde", "\\xdf", "\\xe0", "\\xe1", "\\xe2", "\\xe3", "\\xe4", "\\xe5", "\\xe6", "\\xe7", "\\xe8", "\\xe9", + "\\xea", "\\xeb", "\\xec", "\\xed", "\\xee", "\\xef", "\\xf0", "\\xf1", "\\xf2", "\\xf3", "\\xf4", "\\xf5", "\\xf6", + "\\xf7", "\\xf8", "\\xf9", "\\xfa", "\\xfb", "\\xfc", "\\xfd", "\\xfe", "\\xff"}; + +int converStringToReadable(char *str, int size, char *buf, int bufsize) { + char *pstr = str; + char *pbuf = buf; + while (size > 0) { + if (*pstr == '\0') break; + pbuf = stpcpy(pbuf, ascii_literal_list[((uint8_t)(*pstr))]); + pstr++; + size--; + } + *pbuf = '\0'; + return 0; +} + +int convertNCharToReadable(char *str, int size, char *buf, int bufsize) { + char *pstr = str; + char *pbuf = buf; + // TODO + wchar_t wc; + while (size > 0) { + if (*pstr == '\0') break; + int byte_width = mbtowc(&wc, pstr, MB_CUR_MAX); + + if ((int)wc < 256) { + pbuf = stpcpy(pbuf, ascii_literal_list[(int)wc]); + } else { + memcpy(pbuf, pstr, byte_width); + pbuf += byte_width; + } + pstr += byte_width; + } + + *pbuf = '\0'; + + return 0; +} + +void taosDumpCharset(FILE *fp) { + char charsetline[256]; + + fseek(fp, 0, SEEK_SET); + sprintf(charsetline, "#!%s\n", tsCharset); + fwrite(charsetline, strlen(charsetline), 1, fp); +} + +void taosLoadFileCharset(FILE *fp, char *fcharset) { + char * line = NULL; + size_t line_size = 0; + + fseek(fp, 0, SEEK_SET); + ssize_t size = getline(&line, &line_size, fp); + if (size <= 2) { + goto _exit_no_charset; + } + + if (strncmp(line, "#!", 2) != 0) { + goto _exit_no_charset; + } + if (line[size - 1] == '\n') { + line[size - 1] = '\0'; + size--; + } + strcpy(fcharset, line + 2); + + tfree(line); + return; + +_exit_no_charset: + fseek(fp, 0, SEEK_SET); + *fcharset = '\0'; + tfree(line); + return; +} + +// ======== dumpIn support multi threads functions ================================// - int tsize = 0; - FILE * fp = NULL; - char * line = NULL; - _Bool isRun = true; - size_t line_size = 0; - char * pstr = NULL, *lstr = NULL; - iconv_t cd = (iconv_t)-1; - size_t inbytesleft = 0; - size_t outbytesleft = COMMAND_SIZE; - char fcharset[64]; - char * tcommand = NULL; - - fp = fopen(arguments->input, "r"); +static char **tsDumpInSqlFiles = NULL; +static int32_t tsSqlFileNum = 0; +static char tsDbSqlFile[TSDB_FILENAME_LEN] = {0}; +static char tsfCharset[64] = {0}; +static int taosGetFilesNum(const char *directoryName, const char *prefix) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); if (fp == NULL) { - fprintf(stderr, "failed to open input file %s\n", arguments->input); - return -1; + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); } - taosLoadFileCharset(fp, fcharset); + int fileNum = 0; + if (fscanf(fp, "%d", &fileNum) != 1) { + fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd); + exit(0); + } - taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port); - if (taos == NULL) { - fprintf(stderr, "failed to connect to TDengine server\n"); - goto _dumpin_exit_failure; + if (fileNum <= 0) { + fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName); + exit(0); } - command = (char *)malloc(COMMAND_SIZE); + pclose(fp); + return fileNum; +} + +static void taosParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + while (fscanf(fp, "%s", fileArray[fileNum++])) { + if (strcmp(fileArray[fileNum-1], tsDbSqlFile) == 0) { + fileNum--; + } + if (fileNum >= totalFiles) { + break; + } + } + + if (fileNum != totalFiles) { + fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName); + exit(0); + } + + pclose(fp); +} + +static void taosCheckTablesSQLFile(const char *directoryName) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/dbs.sql", directoryName); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + while (fscanf(fp, "%s", tsDbSqlFile)) { + break; + } + + pclose(fp); +} + +static void taosMallocSQLFiles() +{ + tsDumpInSqlFiles = (char**)calloc(tsSqlFileNum, sizeof(char*)); + for (int i = 0; i < tsSqlFileNum; i++) { + tsDumpInSqlFiles[i] = calloc(1, TSDB_FILENAME_LEN); + } +} + +static void taosFreeSQLFiles() +{ + for (int i = 0; i < tsSqlFileNum; i++) { + tfree(tsDumpInSqlFiles[i]); + } + tfree(tsDumpInSqlFiles); +} + +static void taosGetDirectoryFileList(char *inputDir) +{ + struct stat fileStat; + if (stat(inputDir, &fileStat) < 0) { + fprintf(stderr, "ERROR: %s not exist\n", inputDir); + exit(0); + } + + if (fileStat.st_mode & S_IFDIR) { + taosCheckTablesSQLFile(inputDir); + tsSqlFileNum = taosGetFilesNum(inputDir, "sql"); + int totalSQLFileNum = tsSqlFileNum; + if (tsDbSqlFile[0] != 0) { + tsSqlFileNum--; + } + taosMallocSQLFiles(); + taosParseDirectory(inputDir, "sql", tsDumpInSqlFiles, tsSqlFileNum); + fprintf(stdout, "\nstart to dispose %d files in %s\n", totalSQLFileNum, inputDir); + } + else { + fprintf(stderr, "ERROR: %s is not a directory\n", inputDir); + exit(0); + } +} + +static FILE* taosOpenDumpInFile(char *fptr) { + wordexp_t full_path; + + if (wordexp(fptr, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: illegal file name: %s\n", fptr); + return NULL; + } + + char *fname = full_path.we_wordv[0]; + + if (access(fname, F_OK) != 0) { + fprintf(stderr, "ERROR: file %s is not exist\n", fptr); + + wordfree(&full_path); + return NULL; + } + + if (access(fname, R_OK) != 0) { + fprintf(stderr, "ERROR: file %s is not readable\n", fptr); + + wordfree(&full_path); + return NULL; + } + + FILE *f = fopen(fname, "r"); + if (f == NULL) { + fprintf(stderr, "ERROR: failed to open file %s\n", fname); + wordfree(&full_path); + return NULL; + } + + wordfree(&full_path); + + return f; +} + +int taosDumpInOneFile_old(TAOS * taos, FILE* fp, char* fcharset, char* encode) { + char *command = NULL; + char *lcommand = NULL; + int tsize = 0; + char *line = NULL; + _Bool isRun = true; + size_t line_size = 0; + char *pstr = NULL; + char *lstr = NULL; + size_t inbytesleft = 0; + size_t outbytesleft = COMMAND_SIZE; + char *tcommand = NULL; + char *charsetOfFile = NULL; + iconv_t cd = (iconv_t)(-1); + + command = (char *)malloc(COMMAND_SIZE); lcommand = (char *)malloc(COMMAND_SIZE); if (command == NULL || lcommand == NULL) { fprintf(stderr, "failed to connect to allocate memory\n"); @@ -1172,12 +1854,14 @@ int taosDumpIn(SDumpArguments *arguments) { // Resolve locale if (*fcharset != '\0') { - arguments->encode = fcharset; + charsetOfFile = fcharset; + } else { + charsetOfFile = encode; } - if (arguments->encode != NULL && strcasecmp(tsCharset, arguments->encode) != 0) { - cd = iconv_open(tsCharset, arguments->encode); - if (cd == (iconv_t)-1) { + if (charsetOfFile != NULL && strcasecmp(tsCharset, charsetOfFile) != 0) { + cd = iconv_open(tsCharset, charsetOfFile); + if (cd == ((iconv_t)(-1))) { fprintf(stderr, "Failed to open iconv handle\n"); goto _dumpin_exit_failure; } @@ -1196,21 +1880,20 @@ int taosDumpIn(SDumpArguments *arguments) { pstr = command; lstr = lcommand; outbytesleft = COMMAND_SIZE; - if (cd != (iconv_t)-1) { + if (cd != ((iconv_t)(-1))) { iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft); tcommand = lcommand; } else { tcommand = command; } + taosReplaceCtrlChar(tcommand); - - TAOS_RES* result = taos_query(taos, tcommand); - if (taos_errno(result) != 0){ - fprintf(stderr, "linenu: %" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command, - taos_errstr(result)); - taos_free_result(result); + + if (queryDB(taos, tcommand) != 0) { + fprintf(stderr, "error sql: linenu: %" PRId64 " failed\n", linenu); + exit(0); } - + pstr = command; pstr[0] = '\0'; tsize = 0; @@ -1248,21 +1931,17 @@ int taosDumpIn(SDumpArguments *arguments) { pstr = command; lstr = lcommand; outbytesleft = COMMAND_SIZE; - if (cd != (iconv_t)-1) { + if (cd != ((iconv_t)(-1))) { iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft); tcommand = lcommand; } else { tcommand = command; } taosReplaceCtrlChar(tcommand); - TAOS_RES* result = taos_query(taos, tcommand); - int32_t code = taos_errno(result); - if (code != 0) - { - fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason: %s \ncontinue...\n", linenu, command, - taos_errstr(result)); + if (queryDB(taos, tcommand) != 0) { + fprintf(stderr, "error sql: linenu:%" PRId64 " failed\n", linenu); + exit(0); } - taos_free_result(result); } pstr = command; @@ -1276,19 +1955,18 @@ int taosDumpIn(SDumpArguments *arguments) { pstr = command; lstr = lcommand; outbytesleft = COMMAND_SIZE; - if (cd != (iconv_t)-1) { + if (cd != ((iconv_t)(-1))) { iconv(cd, &pstr, &inbytesleft, &lstr, &outbytesleft); tcommand = lcommand; } else { tcommand = command; } taosReplaceCtrlChar(lcommand); - if (taos_query(taos, tcommand) == NULL) - fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason:%s \ncontinue...\n", linenu, command, - taos_errstr(taos)); + if (queryDB(taos, tcommand) != 0) + fprintf(stderr, "error sql: linenu:%" PRId64 " failed \n", linenu); } - if (cd != (iconv_t)-1) iconv_close(cd); + if (cd != ((iconv_t)(-1))) iconv_close(cd); tfree(line); tfree(command); tfree(lcommand); @@ -1297,7 +1975,7 @@ int taosDumpIn(SDumpArguments *arguments) { return 0; _dumpin_exit_failure: - if (cd != (iconv_t)-1) iconv_close(cd); + if (cd != ((iconv_t)(-1))) iconv_close(cd); tfree(command); tfree(lcommand); taos_close(taos); @@ -1305,97 +1983,143 @@ _dumpin_exit_failure: return -1; } -char *ascii_literal_list[] = { - "\\x00", "\\x01", "\\x02", "\\x03", "\\x04", "\\x05", "\\x06", "\\x07", "\\x08", "\\t", "\\n", "\\x0b", "\\x0c", - "\\r", "\\x0e", "\\x0f", "\\x10", "\\x11", "\\x12", "\\x13", "\\x14", "\\x15", "\\x16", "\\x17", "\\x18", "\\x19", - "\\x1a", "\\x1b", "\\x1c", "\\x1d", "\\x1e", "\\x1f", " ", "!", "\\\"", "#", "$", "%", "&", - "\\'", "(", ")", "*", "+", ",", "-", ".", "/", "0", "1", "2", "3", - "4", "5", "6", "7", "8", "9", ":", ";", "<", "=", ">", "?", "@", - "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", - "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", - "[", "\\\\", "]", "^", "_", "`", "a", "b", "c", "d", "e", "f", "g", - "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", - "u", "v", "w", "x", "y", "z", "{", "|", "}", "~", "\\x7f", "\\x80", "\\x81", - "\\x82", "\\x83", "\\x84", "\\x85", "\\x86", "\\x87", "\\x88", "\\x89", "\\x8a", "\\x8b", "\\x8c", "\\x8d", "\\x8e", - "\\x8f", "\\x90", "\\x91", "\\x92", "\\x93", "\\x94", "\\x95", "\\x96", "\\x97", "\\x98", "\\x99", "\\x9a", "\\x9b", - "\\x9c", "\\x9d", "\\x9e", "\\x9f", "\\xa0", "\\xa1", "\\xa2", "\\xa3", "\\xa4", "\\xa5", "\\xa6", "\\xa7", "\\xa8", - "\\xa9", "\\xaa", "\\xab", "\\xac", "\\xad", "\\xae", "\\xaf", "\\xb0", "\\xb1", "\\xb2", "\\xb3", "\\xb4", "\\xb5", - "\\xb6", "\\xb7", "\\xb8", "\\xb9", "\\xba", "\\xbb", "\\xbc", "\\xbd", "\\xbe", "\\xbf", "\\xc0", "\\xc1", "\\xc2", - "\\xc3", "\\xc4", "\\xc5", "\\xc6", "\\xc7", "\\xc8", "\\xc9", "\\xca", "\\xcb", "\\xcc", "\\xcd", "\\xce", "\\xcf", - "\\xd0", "\\xd1", "\\xd2", "\\xd3", "\\xd4", "\\xd5", "\\xd6", "\\xd7", "\\xd8", "\\xd9", "\\xda", "\\xdb", "\\xdc", - "\\xdd", "\\xde", "\\xdf", "\\xe0", "\\xe1", "\\xe2", "\\xe3", "\\xe4", "\\xe5", "\\xe6", "\\xe7", "\\xe8", "\\xe9", - "\\xea", "\\xeb", "\\xec", "\\xed", "\\xee", "\\xef", "\\xf0", "\\xf1", "\\xf2", "\\xf3", "\\xf4", "\\xf5", "\\xf6", - "\\xf7", "\\xf8", "\\xf9", "\\xfa", "\\xfb", "\\xfc", "\\xfd", "\\xfe", "\\xff"}; +int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, char* fileName) { + int read_len = 0; + char * cmd = NULL; + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; -int converStringToReadable(char *str, int size, char *buf, int bufsize) { - char *pstr = str; - char *pbuf = buf; - while (size > 0) { - if (*pstr == '\0') break; - pbuf = stpcpy(pbuf, ascii_literal_list[((uint8_t)(*pstr))]); - pstr++; - size--; + cmd = (char *)malloc(COMMAND_SIZE); + if (cmd == NULL) { + fprintf(stderr, "failed to allocate memory\n"); + return -1; } - *pbuf = '\0'; - return 0; -} -int convertNCharToReadable(char *str, int size, char *buf, int bufsize) { - char *pstr = str; - char *pbuf = buf; - // TODO - wchar_t wc; - while (size > 0) { - if (*pstr == '\0') break; - int byte_width = mbtowc(&wc, pstr, MB_CUR_MAX); + int lineNo = 0; + while ((read_len = getline(&line, &line_len, fp)) != -1) { + ++lineNo; + if (read_len >= COMMAND_SIZE) continue; + line[--read_len] = '\0'; - if ((int)wc < 256) { - pbuf = stpcpy(pbuf, ascii_literal_list[(int)wc]); - } else if (byte_width > 0) { - memcpy(pbuf, pstr, byte_width); - pbuf += byte_width; + //if (read_len == 0 || isCommentLine(line)) { // line starts with # + if (read_len == 0 ) { + continue; } - pstr += byte_width; - } - *pbuf = '\0'; + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + memcpy(cmd + cmd_len, line, read_len); + if (queryDB(taos, cmd)) { + fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName); + } + + memset(cmd, 0, COMMAND_SIZE); + cmd_len = 0; + } + + tfree(cmd); + tfree(line); + fclose(fp); return 0; } -void taosDumpCharset(FILE *fp) { - char charsetline[256]; +void* taosDumpInWorkThreadFp(void *arg) +{ + SThreadParaObj *pThread = (SThreadParaObj*)arg; + for (int32_t f = 0; f < tsSqlFileNum; ++f) { + if (f % pThread->totalThreads == pThread->threadIndex) { + char *SQLFileName = tsDumpInSqlFiles[f]; + FILE* fp = taosOpenDumpInFile(SQLFileName); + if (NULL == fp) { + continue; + } + fprintf(stderr, "Success Open input file: %s\n", SQLFileName); + taosDumpInOneFile(pThread->taosCon, fp, tsfCharset, tsArguments.encode, SQLFileName); + } + } - fseek(fp, 0, SEEK_SET); - sprintf(charsetline, "#!%s\n", tsCharset); - fwrite(charsetline, strlen(charsetline), 1, fp); + return NULL; } -void taosLoadFileCharset(FILE *fp, char *fcharset) { - char * line = NULL; - size_t line_size = 0; +static void taosStartDumpInWorkThreads(struct arguments *args) +{ + pthread_attr_t thattr; + SThreadParaObj *pThread; + int32_t totalThreads = args->thread_num; - fseek(fp, 0, SEEK_SET); - ssize_t size = getline(&line, &line_size, fp); - if (size <= 2) { - goto _exit_no_charset; + if (totalThreads > tsSqlFileNum) { + totalThreads = tsSqlFileNum; } + + SThreadParaObj *threadObj = (SThreadParaObj *)calloc(totalThreads, sizeof(SThreadParaObj)); + for (int32_t t = 0; t < totalThreads; ++t) { + pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = totalThreads; + pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); + if (pThread->taosCon == NULL) { + fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taosCon)); + exit(0); + } - if (strncmp(line, "#!", 2) != 0) { - goto _exit_no_charset; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, taosDumpInWorkThreadFp, (void*)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } } - if (line[size - 1] == '\n') { - line[size - 1] = '\0'; - size--; + + for (int t = 0; t < totalThreads; ++t) { + pthread_join(threadObj[t].threadID, NULL); } - strcpy(fcharset, line + 2); - tfree(line); - return; + for (int t = 0; t < totalThreads; ++t) { + taos_close(threadObj[t].taosCon); + } + free(threadObj); +} -_exit_no_charset: - fseek(fp, 0, SEEK_SET); - *fcharset = '\0'; - tfree(line); - return; + +int taosDumpIn(struct arguments *arguments) { + assert(arguments->isDumpIn); + + TAOS *taos = NULL; + FILE *fp = NULL; + + taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port); + if (taos == NULL) { + fprintf(stderr, "failed to connect to TDengine server\n"); + return -1; + } + + taosGetDirectoryFileList(arguments->inpath); + + if (tsDbSqlFile[0] != 0) { + fp = taosOpenDumpInFile(tsDbSqlFile); + if (NULL == fp) { + fprintf(stderr, "failed to open input file %s\n", tsDbSqlFile); + return -1; + } + fprintf(stderr, "Success Open input file: %s\n", tsDbSqlFile); + + taosLoadFileCharset(fp, tsfCharset); + + taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile); + } + + taosStartDumpInWorkThreads(arguments); + + taos_close(taos); + taosFreeSQLFiles(); + return 0; } + + diff --git a/src/kit/taosmigrate/taosmigrate.c b/src/kit/taosmigrate/taosmigrate.c index b7bf6fc1baecc51757a6425371cc171fcca759e4..599cc493343f8f793cbd6b1d1c7c4618afa8174d 100644 --- a/src/kit/taosmigrate/taosmigrate.c +++ b/src/kit/taosmigrate/taosmigrate.c @@ -51,6 +51,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { break; case 'f': arguments->fqdn = arg; + break; case 'g': arguments->dnodeGroups = arg; break; diff --git a/src/kit/taosmigrate/taosmigrateMnodeWal.c b/src/kit/taosmigrate/taosmigrateMnodeWal.c index 6315ff99f79cf3414fc147ac2913cc7733069e4b..28e2b7772b29641c5cb2f736606c6cb03169402e 100644 --- a/src/kit/taosmigrate/taosmigrateMnodeWal.c +++ b/src/kit/taosmigrate/taosmigrateMnodeWal.c @@ -96,6 +96,7 @@ void walModWalFile(char* walfile) { if (wfd < 0) { printf("wal:%s, failed to open(%s)\n", newWalFile, strerror(errno)); free(buffer); + close(rfd); return ; } @@ -116,6 +117,11 @@ void walModWalFile(char* walfile) { break; } + if (pHead->len >= 1024000 - sizeof(SWalHead)) { + printf("wal:%s, SWalHead.len(%d) overflow, skip the rest of file\n", walfile, pHead->len); + break; + } + ret = read(rfd, pHead->cont, pHead->len); if ( ret != pHead->len) { printf("wal:%s, failed to read body, skip, len:%d ret:%d\n", walfile, pHead->len, ret); diff --git a/src/kit/taosmigrate/taosmigrateVnodeCfg.c b/src/kit/taosmigrate/taosmigrateVnodeCfg.c index 1cb2fee353d0dd6f59afccfc69272f5a101493f2..b925fb10aa3bb59951726497b3c77a7a8d5ef142 100644 --- a/src/kit/taosmigrate/taosmigrateVnodeCfg.c +++ b/src/kit/taosmigrate/taosmigrateVnodeCfg.c @@ -99,6 +99,8 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile) goto PARSE_OVER; } + content[maxLen] = (char)0; + root = cJSON_Parse(content); if (root == NULL) { printf("failed to json parse %s, invalid json format\n", cfgFile);