diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c index bdfea26294201e4ee8a96acc3ec0a8c17c524f13..f89dc6cb78a219b3ec57431d9779906e2cf11dff 100644 --- a/src/kit/taosdump/taosdump.c +++ b/src/kit/taosdump/taosdump.c @@ -14,6 +14,9 @@ */ #include +#include +#include + #include "os.h" #include "taos.h" #include "taosdef.h" @@ -366,6 +369,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { static struct argp argp = {options, parse_opt, args_doc, doc}; static resultStatistics g_resultStatistics = {0}; static FILE *g_fpOfResult = NULL; +static int g_numOfCores = 1; int taosDumpOut(struct arguments *arguments); int taosDumpIn(struct arguments *arguments); @@ -378,7 +382,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName); int taosCheckParam(struct arguments *arguments); void taosFreeDbInfos(); -static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName); +static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName); struct arguments tsArguments = { // connection option @@ -540,6 +544,8 @@ int main(int argc, char *argv[]) { } } + g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN); + time_t tTime = time(NULL); struct tm tm = *localtime(&tTime); @@ -692,7 +698,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu sprintf(tmpCommand, "select tbname from %s", metric); - TAOS_RES *result = taos_query(taosCon, tmpCommand); + TAOS_RES *result = taos_query(taosCon, tmpCommand); int32_t code = taos_errno(result); if (code != 0) { fprintf(stderr, "failed to run command %s\n", tmpCommand); @@ -701,6 +707,21 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu return -1; } + int table_batch = arguments->table_batch; + int affectdRows = taos_affected_rows(result); + if (affectdRows <= 0) { + free(tmpCommand); + taos_free_result(result); + return -1; + } + + int maxNumOfThread = affectdRows / table_batch + 1; + if (maxNumOfThread > 2 * g_numOfCores) { + maxNumOfThread = 2 * g_numOfCores; + } + + table_batch = affectdRows / maxNumOfThread + 1; + TAOS_FIELD *fields = taos_fetch_fields(result); int32_t numOfTable = 0; @@ -733,7 +754,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu numOfTable++; - if (numOfTable >= arguments->table_batch) { + if (numOfTable >= table_batch) { numOfTable = 0; close(fd); fd = -1; @@ -946,7 +967,7 @@ int taosDumpOut(struct arguments *arguments) { } // start multi threads to dumpout - taosStartDumpOutWorkThreads(arguments, totalNumOfThread, dbInfos[0]->name); + taosStartDumpOutWorkThreads(taos, arguments, totalNumOfThread, dbInfos[0]->name); char tmpFileName[TSDB_FILENAME_LEN + 1]; _clean_tmp_file: @@ -1181,34 +1202,34 @@ void* taosDumpOutWorkThreadFp(void *arg) STableRecord tableRecord; int fd; - char tmpFileName[TSDB_FILENAME_LEN*4] = {0}; - sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex); - fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + char tmpBuf[TSDB_FILENAME_LEN*4] = {0}; + sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex); + fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); if (fd == -1) { - fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpFileName); + fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpBuf); return NULL; } FILE *fp = NULL; - memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128); + memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128); if (tsArguments.outpath[0] != 0) { - sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex); + sprintf(tmpBuf, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex); } else { - sprintf(tmpFileName, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex); + sprintf(tmpBuf, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex); } - fp = fopen(tmpFileName, "w"); + fp = fopen(tmpBuf, "w"); if (fp == NULL) { - fprintf(stderr, "failed to open file %s\n", tmpFileName); + fprintf(stderr, "failed to open file %s\n", tmpBuf); close(fd); return NULL; } - memset(tmpFileName, 0, TSDB_FILENAME_LEN); - sprintf(tmpFileName, "use %s", pThread->dbName); + memset(tmpBuf, 0, TSDB_FILENAME_LEN); + sprintf(tmpBuf, "use %s", pThread->dbName); - TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpFileName); + TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf); int32_t code = taos_errno(tmpResult); if (code != 0) { fprintf(stderr, "invalid database %s\n", pThread->dbName); @@ -1218,6 +1239,9 @@ void* taosDumpOutWorkThreadFp(void *arg) return NULL; } + int fileNameIndex = 1; + int tablesInOneFile = 0; + int64_t lastRowsPrint = 5000000; fprintf(fp, "USE %s;\n\n", pThread->dbName); while (1) { ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord)); @@ -1228,6 +1252,33 @@ void* taosDumpOutWorkThreadFp(void *arg) // TODO: sum table count and table rows by self pThread->tablesOfDumpOut++; pThread->rowsOfDumpOut += ret; + + if (pThread->rowsOfDumpOut >= lastRowsPrint) { + printf(" %"PRId64 " rows already be dumpout from database %s\n", pThread->rowsOfDumpOut, pThread->dbName); + lastRowsPrint += 5000000; + } + + tablesInOneFile++; + if (tablesInOneFile >= tsArguments.table_batch) { + fclose(fp); + tablesInOneFile = 0; + + memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128); + if (tsArguments.outpath[0] != 0) { + sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex, fileNameIndex); + } else { + sprintf(tmpBuf, "%s.tables.%d-%d.sql", pThread->dbName, pThread->threadIndex, fileNameIndex); + } + fileNameIndex++; + + fp = fopen(tmpBuf, "w"); + if (fp == NULL) { + fprintf(stderr, "failed to open file %s\n", tmpBuf); + close(fd); + taos_free_result(tmpResult); + return NULL; + } + } } } @@ -1238,7 +1289,7 @@ void* taosDumpOutWorkThreadFp(void *arg) return NULL; } -static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName) +static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName) { pthread_attr_t thattr; SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj)); @@ -1249,12 +1300,7 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh pThread->threadIndex = t; pThread->totalThreads = numOfThread; tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN); - pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); - - if (pThread->taosCon == NULL) { - fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL)); - exit(0); - } + pThread->taosCon = taosCon; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -1273,7 +1319,6 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh int64_t totalRowsOfDumpOut = 0; int64_t totalChildTblsOfDumpOut = 0; for (int32_t t = 0; t < numOfThread; ++t) { - taos_close(threadObj[t].taosCon); totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut; totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut; } @@ -1398,22 +1443,36 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao return -1; } + int table_batch = arguments->table_batch; + int affectdRows = taos_affected_rows(res); + if (affectdRows <= 0) { + taos_free_result(res); + return -1; + } + + int maxNumOfThread = affectdRows / table_batch + 1; + if (maxNumOfThread > 2 * g_numOfCores) { + maxNumOfThread = 2 * g_numOfCores; + } + + table_batch = affectdRows / maxNumOfThread + 1; + TAOS_FIELD *fields = taos_fetch_fields(res); int32_t numOfTable = 0; int32_t numOfThread = 0; - char tmpFileName[TSDB_FILENAME_LEN + 1]; + char tmpBuf[TSDB_FILENAME_LEN + 1]; while ((row = taos_fetch_row(res)) != NULL) { if (0 == numOfTable) { - memset(tmpFileName, 0, TSDB_FILENAME_LEN); - sprintf(tmpFileName, ".tables.tmp.%d", numOfThread); - fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); + memset(tmpBuf, 0, TSDB_FILENAME_LEN); + sprintf(tmpBuf, ".tables.tmp.%d", numOfThread); + fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); if (fd == -1) { - fprintf(stderr, "failed to open temp file: %s\n", tmpFileName); + fprintf(stderr, "failed to open temp file: %s\n", tmpBuf); taos_free_result(res); for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { - sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); - (void)remove(tmpFileName); + sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); + (void)remove(tmpBuf); } return -1; } @@ -1429,7 +1488,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao numOfTable++; - if (numOfTable >= arguments->table_batch) { + if (numOfTable >= table_batch) { numOfTable = 0; close(fd); fd = -1; @@ -1444,10 +1503,10 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao taos_free_result(res); // start multi threads to dumpout - taosStartDumpOutWorkThreads(arguments, numOfThread, dbInfo->name); + taosStartDumpOutWorkThreads(taosCon, arguments, numOfThread, dbInfo->name); for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) { - sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); - (void)remove(tmpFileName); + sprintf(tmpBuf, ".tables.tmp.%d", loopCnt); + (void)remove(tmpBuf); } return 0; @@ -1552,8 +1611,8 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols } int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName) { - /* char temp[MAX_COMMAND_SIZE] = "\0"; */ - int64_t totalRows = 0; + int64_t lastRowsPrint = 5000000; + int64_t totalRows = 0; int count = 0; char *pstr = NULL; TAOS_ROW row = NULL; @@ -1680,9 +1739,14 @@ int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ") "); - totalRows++; + totalRows++; count++; fprintf(fp, "%s", tmpBuffer); + + if (totalRows >= lastRowsPrint) { + printf(" %"PRId64 " rows already be dumpout from %s.%s\n", totalRows, dbName, tbname); + lastRowsPrint += 5000000; + } total_sqlstr_len += curr_sqlstr_len; @@ -2048,6 +2112,7 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c return -1; } + int lastRowsPrint = 5000000; int lineNo = 0; while ((read_len = getline(&line, &line_len, fp)) != -1) { ++lineNo; @@ -2074,7 +2139,12 @@ int taosDumpInOneFile(TAOS * taos, FILE* fp, char* fcharset, char* encode, c } memset(cmd, 0, TSDB_MAX_ALLOWED_SQL_LEN); - cmd_len = 0; + cmd_len = 0; + + if (lineNo >= lastRowsPrint) { + printf(" %d lines already be executed from file %s\n", lineNo, fileName); + lastRowsPrint += 5000000; + } } tfree(cmd); @@ -2101,7 +2171,7 @@ void* taosDumpInWorkThreadFp(void *arg) return NULL; } -static void taosStartDumpInWorkThreads(struct arguments *args) +static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args) { pthread_attr_t thattr; SThreadParaObj *pThread; @@ -2116,11 +2186,7 @@ static void taosStartDumpInWorkThreads(struct arguments *args) pThread = threadObj + t; pThread->threadIndex = t; pThread->totalThreads = totalThreads; - pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); - if (pThread->taosCon == NULL) { - fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL)); - exit(0); - } + pThread->taosCon = taosCon; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); @@ -2169,7 +2235,7 @@ int taosDumpIn(struct arguments *arguments) { taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile); } - taosStartDumpInWorkThreads(arguments); + taosStartDumpInWorkThreads(taos, arguments); taos_close(taos); taosFreeSQLFiles();