提交 71cd6058 编写于 作者: H Hui Li

[TD-2410]

上级 5658ac88
...@@ -14,6 +14,9 @@ ...@@ -14,6 +14,9 @@
*/ */
#include <iconv.h> #include <iconv.h>
#include <sys/stat.h>
#include <sys/syscall.h>
#include "os.h" #include "os.h"
#include "taos.h" #include "taos.h"
#include "taosdef.h" #include "taosdef.h"
...@@ -366,6 +369,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -366,6 +369,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
static struct argp argp = {options, parse_opt, args_doc, doc}; static struct argp argp = {options, parse_opt, args_doc, doc};
static resultStatistics g_resultStatistics = {0}; static resultStatistics g_resultStatistics = {0};
static FILE *g_fpOfResult = NULL; static FILE *g_fpOfResult = NULL;
static int g_numOfCores = 1;
int taosDumpOut(struct arguments *arguments); int taosDumpOut(struct arguments *arguments);
int taosDumpIn(struct arguments *arguments); int taosDumpIn(struct arguments *arguments);
...@@ -378,7 +382,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI ...@@ -378,7 +382,7 @@ int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FI
int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName); int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName);
int taosCheckParam(struct arguments *arguments); int taosCheckParam(struct arguments *arguments);
void taosFreeDbInfos(); void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName); static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName);
struct arguments tsArguments = { struct arguments tsArguments = {
// connection option // connection option
...@@ -540,6 +544,8 @@ int main(int argc, char *argv[]) { ...@@ -540,6 +544,8 @@ int main(int argc, char *argv[]) {
} }
} }
g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN);
time_t tTime = time(NULL); time_t tTime = time(NULL);
struct tm tm = *localtime(&tTime); struct tm tm = *localtime(&tTime);
...@@ -692,7 +698,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu ...@@ -692,7 +698,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
sprintf(tmpCommand, "select tbname from %s", metric); sprintf(tmpCommand, "select tbname from %s", metric);
TAOS_RES *result = taos_query(taosCon, tmpCommand); TAOS_RES *result = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(result); int32_t code = taos_errno(result);
if (code != 0) { if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tmpCommand); fprintf(stderr, "failed to run command %s\n", tmpCommand);
...@@ -701,6 +707,21 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu ...@@ -701,6 +707,21 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
return -1; return -1;
} }
int table_batch = arguments->table_batch;
int affectdRows = taos_affected_rows(result);
if (affectdRows <= 0) {
free(tmpCommand);
taos_free_result(result);
return -1;
}
int maxNumOfThread = affectdRows / table_batch + 1;
if (maxNumOfThread > 2 * g_numOfCores) {
maxNumOfThread = 2 * g_numOfCores;
}
table_batch = affectdRows / maxNumOfThread + 1;
TAOS_FIELD *fields = taos_fetch_fields(result); TAOS_FIELD *fields = taos_fetch_fields(result);
int32_t numOfTable = 0; int32_t numOfTable = 0;
...@@ -733,7 +754,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu ...@@ -733,7 +754,7 @@ int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric, struct argu
numOfTable++; numOfTable++;
if (numOfTable >= arguments->table_batch) { if (numOfTable >= table_batch) {
numOfTable = 0; numOfTable = 0;
close(fd); close(fd);
fd = -1; fd = -1;
...@@ -946,7 +967,7 @@ int taosDumpOut(struct arguments *arguments) { ...@@ -946,7 +967,7 @@ int taosDumpOut(struct arguments *arguments) {
} }
// start multi threads to dumpout // start multi threads to dumpout
taosStartDumpOutWorkThreads(arguments, totalNumOfThread, dbInfos[0]->name); taosStartDumpOutWorkThreads(taos, arguments, totalNumOfThread, dbInfos[0]->name);
char tmpFileName[TSDB_FILENAME_LEN + 1]; char tmpFileName[TSDB_FILENAME_LEN + 1];
_clean_tmp_file: _clean_tmp_file:
...@@ -1181,34 +1202,34 @@ void* taosDumpOutWorkThreadFp(void *arg) ...@@ -1181,34 +1202,34 @@ void* taosDumpOutWorkThreadFp(void *arg)
STableRecord tableRecord; STableRecord tableRecord;
int fd; int fd;
char tmpFileName[TSDB_FILENAME_LEN*4] = {0}; char tmpBuf[TSDB_FILENAME_LEN*4] = {0};
sprintf(tmpFileName, ".tables.tmp.%d", pThread->threadIndex); sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) { if (fd == -1) {
fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpFileName); fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpBuf);
return NULL; return NULL;
} }
FILE *fp = NULL; FILE *fp = NULL;
memset(tmpFileName, 0, TSDB_FILENAME_LEN + 128); memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) { if (tsArguments.outpath[0] != 0) {
sprintf(tmpFileName, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex); sprintf(tmpBuf, "%s/%s.tables.%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex);
} else { } else {
sprintf(tmpFileName, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex); sprintf(tmpBuf, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
} }
fp = fopen(tmpFileName, "w"); fp = fopen(tmpBuf, "w");
if (fp == NULL) { if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpFileName); fprintf(stderr, "failed to open file %s\n", tmpBuf);
close(fd); close(fd);
return NULL; return NULL;
} }
memset(tmpFileName, 0, TSDB_FILENAME_LEN); memset(tmpBuf, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, "use %s", pThread->dbName); sprintf(tmpBuf, "use %s", pThread->dbName);
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpFileName); TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf);
int32_t code = taos_errno(tmpResult); int32_t code = taos_errno(tmpResult);
if (code != 0) { if (code != 0) {
fprintf(stderr, "invalid database %s\n", pThread->dbName); fprintf(stderr, "invalid database %s\n", pThread->dbName);
...@@ -1218,6 +1239,8 @@ void* taosDumpOutWorkThreadFp(void *arg) ...@@ -1218,6 +1239,8 @@ void* taosDumpOutWorkThreadFp(void *arg)
return NULL; return NULL;
} }
int fileNameIndex = 1;
int tablesInOneFile = 0;
int64_t lastRowsPrint = 5000000; int64_t lastRowsPrint = 5000000;
fprintf(fp, "USE %s;\n\n", pThread->dbName); fprintf(fp, "USE %s;\n\n", pThread->dbName);
while (1) { while (1) {
...@@ -1234,6 +1257,28 @@ void* taosDumpOutWorkThreadFp(void *arg) ...@@ -1234,6 +1257,28 @@ void* taosDumpOutWorkThreadFp(void *arg)
printf(" %"PRId64 " rows already be dumpout from database %s\n", pThread->rowsOfDumpOut, pThread->dbName); printf(" %"PRId64 " rows already be dumpout from database %s\n", pThread->rowsOfDumpOut, pThread->dbName);
lastRowsPrint += 5000000; lastRowsPrint += 5000000;
} }
tablesInOneFile++;
if (tablesInOneFile >= tsArguments.table_batch) {
fclose(fp);
tablesInOneFile = 0;
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
if (tsArguments.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql", tsArguments.outpath, pThread->dbName, pThread->threadIndex, fileNameIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d-%d.sql", pThread->dbName, pThread->threadIndex, fileNameIndex);
}
fileNameIndex++;
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpBuf);
close(fd);
taos_free_result(tmpResult);
return NULL;
}
}
} }
} }
...@@ -1244,7 +1289,7 @@ void* taosDumpOutWorkThreadFp(void *arg) ...@@ -1244,7 +1289,7 @@ void* taosDumpOutWorkThreadFp(void *arg)
return NULL; return NULL;
} }
static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfThread, char *dbName) static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName)
{ {
pthread_attr_t thattr; pthread_attr_t thattr;
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj)); SThreadParaObj *threadObj = (SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
...@@ -1255,12 +1300,7 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh ...@@ -1255,12 +1300,7 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh
pThread->threadIndex = t; pThread->threadIndex = t;
pThread->totalThreads = numOfThread; pThread->totalThreads = numOfThread;
tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN); tstrncpy(pThread->dbName, dbName, TSDB_TABLE_NAME_LEN);
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); pThread->taosCon = taosCon;
if (pThread->taosCon == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL));
exit(0);
}
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
...@@ -1279,7 +1319,6 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh ...@@ -1279,7 +1319,6 @@ static void taosStartDumpOutWorkThreads(struct arguments* args, int32_t numOfTh
int64_t totalRowsOfDumpOut = 0; int64_t totalRowsOfDumpOut = 0;
int64_t totalChildTblsOfDumpOut = 0; int64_t totalChildTblsOfDumpOut = 0;
for (int32_t t = 0; t < numOfThread; ++t) { for (int32_t t = 0; t < numOfThread; ++t) {
taos_close(threadObj[t].taosCon);
totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut; totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut;
totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut; totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut;
} }
...@@ -1404,22 +1443,36 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao ...@@ -1404,22 +1443,36 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
return -1; return -1;
} }
int table_batch = arguments->table_batch;
int affectdRows = taos_affected_rows(res);
if (affectdRows <= 0) {
taos_free_result(res);
return -1;
}
int maxNumOfThread = affectdRows / table_batch + 1;
if (maxNumOfThread > 2 * g_numOfCores) {
maxNumOfThread = 2 * g_numOfCores;
}
table_batch = affectdRows / maxNumOfThread + 1;
TAOS_FIELD *fields = taos_fetch_fields(res); TAOS_FIELD *fields = taos_fetch_fields(res);
int32_t numOfTable = 0; int32_t numOfTable = 0;
int32_t numOfThread = 0; int32_t numOfThread = 0;
char tmpFileName[TSDB_FILENAME_LEN + 1]; char tmpBuf[TSDB_FILENAME_LEN + 1];
while ((row = taos_fetch_row(res)) != NULL) { while ((row = taos_fetch_row(res)) != NULL) {
if (0 == numOfTable) { if (0 == numOfTable) {
memset(tmpFileName, 0, TSDB_FILENAME_LEN); memset(tmpBuf, 0, TSDB_FILENAME_LEN);
sprintf(tmpFileName, ".tables.tmp.%d", numOfThread); sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH); fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) { if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName); fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
taos_free_result(res); taos_free_result(res);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) { for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpFileName); (void)remove(tmpBuf);
} }
return -1; return -1;
} }
...@@ -1435,7 +1488,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao ...@@ -1435,7 +1488,7 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
numOfTable++; numOfTable++;
if (numOfTable >= arguments->table_batch) { if (numOfTable >= table_batch) {
numOfTable = 0; numOfTable = 0;
close(fd); close(fd);
fd = -1; fd = -1;
...@@ -1450,10 +1503,10 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao ...@@ -1450,10 +1503,10 @@ int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *tao
taos_free_result(res); taos_free_result(res);
// start multi threads to dumpout // start multi threads to dumpout
taosStartDumpOutWorkThreads(arguments, numOfThread, dbInfo->name); taosStartDumpOutWorkThreads(taosCon, arguments, numOfThread, dbInfo->name);
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) { for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt); sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpFileName); (void)remove(tmpBuf);
} }
return 0; return 0;
...@@ -2118,7 +2171,7 @@ void* taosDumpInWorkThreadFp(void *arg) ...@@ -2118,7 +2171,7 @@ void* taosDumpInWorkThreadFp(void *arg)
return NULL; return NULL;
} }
static void taosStartDumpInWorkThreads(struct arguments *args) static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
{ {
pthread_attr_t thattr; pthread_attr_t thattr;
SThreadParaObj *pThread; SThreadParaObj *pThread;
...@@ -2133,11 +2186,7 @@ static void taosStartDumpInWorkThreads(struct arguments *args) ...@@ -2133,11 +2186,7 @@ static void taosStartDumpInWorkThreads(struct arguments *args)
pThread = threadObj + t; pThread = threadObj + t;
pThread->threadIndex = t; pThread->threadIndex = t;
pThread->totalThreads = totalThreads; pThread->totalThreads = totalThreads;
pThread->taosCon = taos_connect(args->host, args->user, args->password, NULL, args->port); pThread->taosCon = taosCon;
if (pThread->taosCon == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, reason:%s\n", pThread->threadIndex, taos_errstr(NULL));
exit(0);
}
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
...@@ -2186,7 +2235,7 @@ int taosDumpIn(struct arguments *arguments) { ...@@ -2186,7 +2235,7 @@ int taosDumpIn(struct arguments *arguments) {
taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile); taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile);
} }
taosStartDumpInWorkThreads(arguments); taosStartDumpInWorkThreads(taos, arguments);
taos_close(taos); taos_close(taos);
taosFreeSQLFiles(); taosFreeSQLFiles();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册