未验证 提交 46f5a490 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 6479 taosdump db seq (#7994)

* [TD-6479]<fix>: taosdump cmdline align for database sequence.

adopt 'taosdump -D db1,db2,...' format.

* fix core dump issue.

* refactor in progress

* heavily refactor in progress.

* change dumped filename back to thread index to avoid exceed file system limit.
上级 34527673
......@@ -25,7 +25,6 @@
#include "tsclient.h"
#include "tsdb.h"
#include "tutil.h"
#include <taos.h>
#define TSDB_SUPPORT_NANOSECOND 1
......@@ -120,7 +119,7 @@ enum _show_tables_index {
TSDB_MAX_SHOW_TABLES
};
// ---------------------------------- DESCRIBE METRIC CONFIGURE ------------------------------
// ---------------------------------- DESCRIBE STABLE CONFIGURE ------------------------------
enum _describe_table_index {
TSDB_DESCRIBE_METRIC_FIELD_INDEX,
TSDB_DESCRIBE_METRIC_TYPE_INDEX,
......@@ -148,10 +147,28 @@ extern char version[];
#define DB_PRECISION_LEN 8
#define DB_STATUS_LEN 16
typedef struct {
char name[TSDB_TABLE_NAME_LEN];
bool belongStb;
char stable[TSDB_TABLE_NAME_LEN];
} TableInfo;
typedef struct {
char name[TSDB_TABLE_NAME_LEN];
char stable[TSDB_TABLE_NAME_LEN];
} TableRecord;
typedef struct {
bool isStable;
int64_t dumpNtbCount;
TableRecord **dumpNtbInfos;
TableRecord tableRecord;
} TableRecordInfo;
typedef struct {
char name[TSDB_DB_NAME_LEN];
char create_time[32];
int32_t ntables;
int64_t ntables;
int32_t vgroups;
int16_t replica;
int16_t quorum;
......@@ -171,28 +188,22 @@ typedef struct {
char precision[DB_PRECISION_LEN]; // time resolution
int8_t update;
char status[DB_STATUS_LEN];
int64_t dumpTbCount;
TableRecordInfo **dumpTbInfos;
} SDbInfo;
typedef struct {
char name[TSDB_TABLE_NAME_LEN];
char metric[TSDB_TABLE_NAME_LEN];
} STableRecord;
typedef struct {
bool isMetric;
STableRecord tableRecord;
} STableRecordInfo;
typedef struct {
pthread_t threadID;
int32_t threadIndex;
int32_t totalThreads;
char dbName[TSDB_DB_NAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
int precision;
void *taosCon;
TAOS *taos;
int64_t rowsOfDumpOut;
int64_t tablesOfDumpOut;
} SThreadParaObj;
int64_t tableFrom;
} threadInfo;
typedef struct {
int64_t totalRowsOfDumpOut;
......@@ -204,6 +215,7 @@ typedef struct {
static int64_t g_totalDumpOutRows = 0;
SDbInfo **g_dbInfos = NULL;
TableInfo *g_tablesList = NULL;
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
......@@ -306,7 +318,7 @@ typedef struct arguments {
bool verbose_print;
bool performance_print;
int dbCount;
int dumpDbCount;
} SArguments;
/* Our argp parser. */
......@@ -321,25 +333,21 @@ static int taosDumpOut();
static int taosDumpIn();
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty,
FILE *fp);
static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon);
static int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon,
char* dbName);
static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
//static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taos);
static int dumpStable(char *table, FILE *fp, TAOS* taos,
SDbInfo *dbInfo);
static int taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
FILE *fp, char* dbName);
static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable,
int numOfCols, FILE *fp, char* dbName);
static int32_t taosDumpTable(char *tbName, char *metric,
FILE *fp, TAOS* taosCon, char* dbName, int precision);
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName,
static int64_t taosDumpTable(char *tbName, char *stable,
FILE *fp, TAOS* taos, char* dbName, int precision);
static int64_t taosDumpTableData(FILE *fp, char *tbName,
TAOS* taos, char* dbName,
int precision,
char *jsonAvroSchema);
static int taosCheckParam(struct arguments *arguments);
static void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(
int32_t numOfThread,
char *dbName,
int precision);
struct arguments g_args = {
// connection option
......@@ -383,7 +391,7 @@ struct arguments g_args = {
false, // debug_print
false, // verbose_print
false, // performance_print
0, // dbCount
0, // dumpDbCount
};
// get taosdump commit number version
......@@ -585,11 +593,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
}
static int queryDbImpl(TAOS *taos, char *command) {
int i;
TAOS_RES *res = NULL;
int32_t code = -1;
for (i = 0; i < 5; i++) {
if (NULL != res) {
taos_free_result(res);
res = NULL;
......@@ -597,54 +603,19 @@ static int queryDbImpl(TAOS *taos, char *command) {
res = taos_query(taos, command);
code = taos_errno(res);
if (0 == code) {
break;
}
}
if (code != 0) {
errorPrint("Failed to run <%s>, reason: %s\n", command, taos_errstr(res));
errorPrint("Failed to run <%s>, reason: %s\n",
command, taos_errstr(res));
taos_free_result(res);
//taos_close(taos);
return -1;
return code;
}
taos_free_result(res);
return 0;
}
UNUSED_FUNC static void parse_precision_first(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-C") == 0) {
if (NULL == argv[i+1]) {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
char *tmp = strdup(argv[i+1]);
if (tmp == NULL) {
errorPrint("%s() LN%d, strdup() cannot allocate memory\n",
__func__, __LINE__);
exit(-1);
}
if ((0 != strncasecmp(tmp, "ms", strlen("ms")))
&& (0 != strncasecmp(tmp, "us", strlen("us")))
#if TSDB_SUPPORT_NANOSECOND == 1
&& (0 != strncasecmp(tmp, "ns", strlen("ns")))
#endif
) {
//
errorPrint("input precision: %s is invalid value\n", tmp);
free(tmp);
exit(-1);
}
tstrncpy(g_args.precision, tmp,
min(DB_PRECISION_LEN, strlen(tmp) + 1));
free(tmp);
}
}
}
static void parse_args(
int argc, char *argv[], SArguments *arguments) {
......@@ -779,245 +750,42 @@ static int getPrecisionByString(char *precision)
return -1;
}
/*
static void parse_timestamp(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if ((strcmp(argv[i], "-S") == 0)
|| (strcmp(argv[i], "-E") == 0)) {
if (NULL == argv[i+1]) {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
char *tmp = strdup(argv[i+1]);
if (NULL == tmp) {
errorPrint("%s() LN%d, strdup() cannot allocate memory\n",
__func__, __LINE__);
exit(-1);
}
int64_t tmpEpoch;
if (strchr(tmp, ':') && strchr(tmp, '-')) {
strcpy(g_args.humanStartTime, tmp)
int32_t timePrec;
if (0 == strncasecmp(arguments->precision,
"ms", strlen("ms"))) {
timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(arguments->precision,
"us", strlen("us"))) {
timePrec = TSDB_TIME_PRECISION_MICRO;
#if TSDB_SUPPORT_NANOSECOND == 1
} else if (0 == strncasecmp(arguments->precision,
"ns", strlen("ns"))) {
timePrec = TSDB_TIME_PRECISION_NANO;
#endif
} else {
errorPrint("Invalid time precision: %s",
arguments->precision);
free(tmp);
return;
}
if (TSDB_CODE_SUCCESS != taosParseTime(
tmp, &tmpEpoch, strlen(tmp),
timePrec, 0)) {
errorPrint("Input %s, end time error!\n", tmp);
free(tmp);
return;
}
} else {
tstrncpy(arguments->precision, "n/a", strlen("n/a") + 1);
tmpEpoch = atoll(tmp);
}
sprintf(argv[i+1], "%"PRId64"", tmpEpoch);
debugPrint("%s() LN%d, tmp is: %s, argv[%d]: %s\n",
__func__, __LINE__, tmp, i, argv[i]);
free(tmp);
}
}
}
*/
int main(int argc, char *argv[]) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
int ret = 0;
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
if (argc > 1) {
// parse_precision_first(argc, argv, &g_args);
parse_timestamp(argc, argv, &g_args);
parse_args(argc, argv, &g_args);
}
argp_parse(&argp, argc, argv, 0, 0, &g_args);
if (g_args.abort) {
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
abort();
#endif
}
printf("====== arguments config ======\n");
{
printf("host: %s\n", g_args.host);
printf("user: %s\n", g_args.user);
printf("password: %s\n", g_args.password);
printf("port: %u\n", g_args.port);
printf("mysqlFlag: %d\n", g_args.mysqlFlag);
printf("outpath: %s\n", g_args.outpath);
printf("inpath: %s\n", g_args.inpath);
printf("resultFile: %s\n", g_args.resultFile);
printf("encode: %s\n", g_args.encode);
printf("all_databases: %s\n", g_args.all_databases?"true":"false");
printf("databases: %d\n", g_args.databases);
printf("databasesSeq: %s\n", g_args.databasesSeq);
printf("schemaonly: %s\n", g_args.schemaonly?"true":"false");
printf("with_property: %s\n", g_args.with_property?"true":"false");
printf("avro format: %s\n", g_args.avro?"true":"false");
printf("start_time: %" PRId64 "\n", g_args.start_time);
printf("human readable start time: %s \n", g_args.humanStartTime);
printf("end_time: %" PRId64 "\n", g_args.end_time);
printf("human readable end time: %s \n", g_args.humanEndTime);
printf("precision: %s\n", g_args.precision);
printf("data_batch: %d\n", g_args.data_batch);
printf("max_sql_len: %d\n", g_args.max_sql_len);
printf("table_batch: %d\n", g_args.table_batch);
printf("thread_num: %d\n", g_args.thread_num);
printf("allow_sys: %d\n", g_args.allow_sys);
printf("abort: %d\n", g_args.abort);
printf("isDumpIn: %d\n", g_args.isDumpIn);
printf("arg_list_len: %d\n", g_args.arg_list_len);
printf("debug_print: %d\n", g_args.debug_print);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
printf("arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
printf("==============================\n");
if (taosCheckParam(&g_args) < 0) {
exit(EXIT_FAILURE);
}
g_fpOfResult = fopen(g_args.resultFile, "a");
if (NULL == g_fpOfResult) {
errorPrint("Failed to open %s for save result\n", g_args.resultFile);
exit(-1);
};
fprintf(g_fpOfResult, "#############################################################################\n");
fprintf(g_fpOfResult, "============================== arguments config =============================\n");
{
fprintf(g_fpOfResult, "host: %s\n", g_args.host);
fprintf(g_fpOfResult, "user: %s\n", g_args.user);
fprintf(g_fpOfResult, "password: %s\n", g_args.password);
fprintf(g_fpOfResult, "port: %u\n", g_args.port);
fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag);
fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath);
fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath);
fprintf(g_fpOfResult, "resultFile: %s\n", g_args.resultFile);
fprintf(g_fpOfResult, "encode: %s\n", g_args.encode);
fprintf(g_fpOfResult, "all_databases: %s\n", g_args.all_databases?"true":"false");
fprintf(g_fpOfResult, "databases: %d\n", g_args.databases);
fprintf(g_fpOfResult, "databasesSeq: %s\n", g_args.databasesSeq);
fprintf(g_fpOfResult, "schemaonly: %s\n", g_args.schemaonly?"true":"false");
fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false");
fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false");
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time);
fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time);
fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime);
fprintf(g_fpOfResult, "precision: %s\n", g_args.precision);
fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len);
fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch);
fprintf(g_fpOfResult, "thread_num: %d\n", g_args.thread_num);
fprintf(g_fpOfResult, "allow_sys: %d\n", g_args.allow_sys);
fprintf(g_fpOfResult, "abort: %d\n", g_args.abort);
fprintf(g_fpOfResult, "isDumpIn: %d\n", g_args.isDumpIn);
fprintf(g_fpOfResult, "arg_list_len: %d\n", g_args.arg_list_len);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN);
time_t tTime = time(NULL);
struct tm tm = *localtime(&tTime);
if (g_args.isDumpIn) {
fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n");
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn() < 0) {
ret = -1;
}
} else {
fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n");
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut() < 0) {
ret = -1;
} else {
fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n");
fprintf(g_fpOfResult, "# total database count: %d\n",
g_resultStatistics.totalDatabasesOfDumpOut);
fprintf(g_fpOfResult, "# total super table count: %d\n",
g_resultStatistics.totalSuperTblsOfDumpOut);
fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n",
g_resultStatistics.totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# total row count: %"PRId64"\n",
g_resultStatistics.totalRowsOfDumpOut);
}
}
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return ret;
}
static void taosFreeDbInfos() {
if (g_dbInfos == NULL) return;
for (int i = 0; i < g_args.dbCount; i++)
for (int i = 0; i < g_args.dumpDbCount; i++)
tfree(g_dbInfos[i]);
tfree(g_dbInfos);
}
// check table is normal table or super table
static int taosGetTableRecordInfo(
char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) {
char *dbName,
char *table, TableRecordInfo *pTableRecordInfo, TAOS *taos) {
TAOS_ROW row = NULL;
bool isSet = false;
TAOS_RES *result = NULL;
memset(pTableRecordInfo, 0, sizeof(STableRecordInfo));
memset(pTableRecordInfo, 0, sizeof(TableRecordInfo));
char* tempCommand = (char *)malloc(COMMAND_SIZE);
if (tempCommand == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
return -1;
char command[COMMAND_SIZE];
sprintf(command, "USE %s", dbName);
result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (code != 0) {
errorPrint("invalid database %s, reason: %s\n",
dbName, taos_errstr(result));
return 0;
}
sprintf(tempCommand, "show tables like %s", table);
sprintf(command, "SHOW TABLES LIKE \'%s\'", table);
result = taos_query(taosCon, tempCommand);
int32_t code = taos_errno(result);
result = taos_query(taos, command);
code = taos_errno(result);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, tempCommand);
free(tempCommand);
errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
taos_free_result(result);
return -1;
}
......@@ -1026,12 +794,12 @@ static int taosGetTableRecordInfo(
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = false;
pTableRecordInfo->isStable = false;
tstrncpy(pTableRecordInfo->tableRecord.name,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1));
tstrncpy(pTableRecordInfo->tableRecord.metric,
tstrncpy(pTableRecordInfo->tableRecord.stable,
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1));
......@@ -1042,27 +810,25 @@ static int taosGetTableRecordInfo(
result = NULL;
if (isSet) {
free(tempCommand);
return 0;
}
sprintf(tempCommand, "show stables like %s", table);
sprintf(command, "SHOW STABLES LIKE \'%s\'", table);
result = taos_query(taosCon, tempCommand);
result = taos_query(taos, command);
code = taos_errno(result);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, tempCommand);
free(tempCommand);
errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
taos_free_result(result);
return -1;
}
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = true;
tstrncpy(pTableRecordInfo->tableRecord.metric, table,
pTableRecordInfo->isStable = true;
tstrncpy(pTableRecordInfo->tableRecord.stable, table,
TSDB_TABLE_NAME_LEN);
break;
}
......@@ -1071,248 +837,540 @@ static int taosGetTableRecordInfo(
result = NULL;
if (isSet) {
free(tempCommand);
return 0;
}
errorPrint("%s() LN%d, invalid table/metric %s\n",
errorPrint("%s() LN%d, invalid table/stable %s\n",
__func__, __LINE__, table);
free(tempCommand);
return -1;
}
static int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter,
char* metric, int* fd) {
STableRecord tableRecord;
if (-1 == *fd) {
*fd = open(".tables.tmp.0",
O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (*fd == -1) {
errorPrint("%s() LN%d, failed to open temp file: .tables.tmp.0\n",
__func__, __LINE__);
return -1;
static int inDatabasesSeq(
char *name,
int len)
{
if (strstr(g_args.databasesSeq, ",") == NULL) {
if (0 == strncmp(g_args.databasesSeq, name, len)) {
return 0;
}
} else {
char *dupSeq = strdup(g_args.databasesSeq);
char *running = dupSeq;
char *dbname = strsep(&running, ",");
while (dbname) {
if (0 == strncmp(dbname, name, len)) {
tfree(dupSeq);
return 0;
}
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, meter, TSDB_TABLE_NAME_LEN);
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
dbname = strsep(&running, ",");
}
taosWrite(*fd, &tableRecord, sizeof(STableRecord));
return 0;
}
}
static int32_t taosSaveTableOfMetricToTempFile(
TAOS *taosCon, char* metric,
int32_t* totalNumOfThread) {
return -1;
}
static int getDumpDbCount()
{
int count = 0;
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = "show databases";
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
return -1;
/* Connect to server */
taos = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (NULL == taos) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
return 0;
}
sprintf(tmpCommand, "select tbname from %s", metric);
result = taos_query(taos, command);
int32_t code = taos_errno(result);
TAOS_RES *res = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, tmpCommand);
free(tmpCommand);
taos_free_result(res);
return -1;
if (0 != code) {
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
return 0;
}
free(tmpCommand);
char tmpBuf[MAX_FILE_NAME_LEN];
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".select-tbname.tmp");
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result)) != NULL) {
// sys database name : 'log', but subsequent version changed to 'log'
if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
&& (!g_args.allow_sys)) {
continue;
}
if (g_args.databases) { // input multi dbs
if (inDatabasesSeq(
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0)
continue;
} else if (!g_args.all_databases) { // only input one db
if (strncasecmp(g_args.arg_list[0],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0)
continue;
}
count++;
}
if (count == 0) {
errorPrint("%d databases valid to dump\n", count);
}
return count;
}
static int64_t dumpNormalTableWithoutStb(TAOS *taos, SDbInfo *dbInfo, char *ntbName)
{
int64_t count = 0;
char tmpBuf[4096] = {0};
FILE *fp = NULL;
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.%s.sql",
g_args.outpath, dbInfo->name, ntbName);
} else {
sprintf(tmpBuf, "%s.%s.sql",
dbInfo->name, ntbName);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
taos_free_result(res);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(res);
count = taosDumpTable(ntbName, NULL,
fp, taos, dbInfo->name, getPrecisionByString(dbInfo->precision));
int32_t numOfTable = 0;
while ((row = taos_fetch_row(res)) != NULL) {
fclose(fp);
return count;
}
static int64_t dumpNormalTable(FILE *fp, TAOS *taos, char *dbName, char *tbName,
char *stbName,
int precision)
{
int64_t count = 0;
count = taosDumpTable(tbName, stbName,
fp, taos, dbName, precision);
return count;
}
static void *dumpNtbOfDb(void *arg) {
threadInfo *pThreadInfo = (threadInfo *)arg;
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, (char *)row[0], fields[0].bytes);
tstrncpy(tableRecord.metric, metric, TSDB_TABLE_NAME_LEN);
debugPrint("dump table from = \t%"PRId64"\n", pThreadInfo->tableFrom);
debugPrint("dump table count = \t%"PRId64"\n",
pThreadInfo->tablesOfDumpOut);
taosWrite(fd, &tableRecord, sizeof(STableRecord));
numOfTable++;
FILE *fp = NULL;
char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.%d.sql",
g_args.outpath, pThreadInfo->dbName, pThreadInfo->threadIndex);
} else {
sprintf(tmpBuf, "%s.%d.sql",
pThreadInfo->dbName, pThreadInfo->threadIndex);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
return NULL;
}
for (int64_t i = 0; i < pThreadInfo->tablesOfDumpOut; i++) {
debugPrint("[%d] No.\t%"PRId64" table name: %s\n",
pThreadInfo->threadIndex, i,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name);
dumpNormalTable(fp,
pThreadInfo->taos,
pThreadInfo->dbName,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->stable,
pThreadInfo->precision);
}
fclose(fp);
return NULL;
}
static void *dumpNormalTablesOfStb(void *arg) {
threadInfo *pThreadInfo = (threadInfo *)arg;
debugPrint("dump table from = \t%"PRId64"\n", pThreadInfo->tableFrom);
debugPrint("dump table count = \t%"PRId64"\n", pThreadInfo->tablesOfDumpOut);
char command[COMMAND_SIZE];
sprintf(command, "SELECT TBNAME FROM %s.%s LIMIT %"PRId64" OFFSET %"PRId64"",
pThreadInfo->dbName, pThreadInfo->stbName,
pThreadInfo->tablesOfDumpOut, pThreadInfo->tableFrom);
TAOS_RES *res = taos_query(pThreadInfo->taos, command);
int32_t code = taos_errno(res);
if (code) {
errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res);
lseek(fd, 0, SEEK_SET);
return NULL;
}
int maxThreads = g_args.thread_num;
int tableOfPerFile ;
if (numOfTable <= g_args.thread_num) {
tableOfPerFile = 1;
maxThreads = numOfTable;
FILE *fp = NULL;
char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.%d.sql",
g_args.outpath, pThreadInfo->dbName, pThreadInfo->threadIndex);
} else {
tableOfPerFile = numOfTable / g_args.thread_num;
if (0 != numOfTable % g_args.thread_num) {
tableOfPerFile += 1;
sprintf(tmpBuf, "%s.%d.sql",
pThreadInfo->dbName, pThreadInfo->threadIndex);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
return NULL;
}
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
if (NULL == tblBuf){
errorPrint("%s() LN%d, failed to calloc %" PRIzu "\n",
__func__, __LINE__, tableOfPerFile * sizeof(STableRecord));
close(fd);
return -1;
TAOS_ROW row = NULL;
int64_t i = 0;
while((row = taos_fetch_row(res)) != NULL) {
debugPrint("[%d] sub table %"PRId64": name: %s\n",
pThreadInfo->threadIndex, i++, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
dumpNormalTable(fp,
pThreadInfo->taos,
pThreadInfo->dbName,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
pThreadInfo->precision);
}
int32_t numOfThread = *totalNumOfThread;
int subFd = -1;
for (; numOfThread <= maxThreads; numOfThread++) {
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (subFd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
}
sprintf(tmpBuf, ".select-tbname.tmp");
(void)remove(tmpBuf);
free(tblBuf);
close(fd);
return -1;
fclose(fp);
return NULL;
}
static int64_t dumpNtbOfDbByThreads(
SDbInfo *dbInfo,
int64_t ntbCount)
{
if (ntbCount <= 0) {
return 0;
}
// read tableOfPerFile for fd, write to subFd
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
if (readLen <= 0) {
close(subFd);
break;
int threads = g_args.thread_num;
int64_t a = ntbCount / threads;
if (a < 1) {
threads = ntbCount;
a = 1;
}
assert(threads);
int64_t b = ntbCount % threads;
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
assert(pids);
assert(infos);
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->taos = taos_connect(
g_args.host,
g_args.user,
g_args.password,
dbInfo->name,
g_args.port
);
if (NULL == pThreadInfo->taos) {
errorPrint("%s() LN%d, Failed to connect to TDengine, reason: %s\n",
__func__,
__LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
return -1;
}
taosWrite(subFd, tblBuf, readLen);
close(subFd);
pThreadInfo->threadIndex = i;
pThreadInfo->tablesOfDumpOut = (i<b)?a+1:a;
pThreadInfo->tableFrom = (i==0)?0:
((threadInfo *)(infos + i - 1))->tableFrom +
((threadInfo *)(infos + i - 1))->tablesOfDumpOut;
strcpy(pThreadInfo->dbName, dbInfo->name);
pThreadInfo->precision = getPrecisionByString(dbInfo->precision);
pthread_create(pids + i, NULL, dumpNtbOfDb, pThreadInfo);
}
sprintf(tmpBuf, ".select-tbname.tmp");
(void)remove(tmpBuf);
for (int64_t i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
if (fd >= 0) {
close(fd);
fd = -1;
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
taos_close(pThreadInfo->taos);
}
*totalNumOfThread = numOfThread;
free(pids);
free(infos);
free(tblBuf);
return 0;
}
static int inDatabasesSeq(
char *name,
int len)
static int64_t getNtbCountOfStb(TAOS *taos, char *dbName, char *stbName)
{
if (strstr(g_args.databasesSeq, ",") == NULL) {
if (0 == strncmp(g_args.databasesSeq, name, len)) {
return 0;
int64_t count = 0;
char command[COMMAND_SIZE];
sprintf(command, "SELECT COUNT(TBNAME) FROM %s.%s", dbName, stbName);
TAOS_RES *res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>. reason: %s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res);
return -1;
}
} else {
char *dupSeq = strdup(g_args.databasesSeq);
char *running = dupSeq;
char *dbname = strsep(&running, ",");
while (dbname) {
if (0 == strncmp(dbname, name, len)) {
tfree(dupSeq);
return 0;
TAOS_ROW row = NULL;
if ((row = taos_fetch_row(res)) != NULL) {
count = *(int64_t*)row[TSDB_SHOW_TABLES_NAME_INDEX];
}
dbname = strsep(&running, ",");
return count;
}
static int64_t dumpNtbOfStbByThreads(
TAOS *taos,
SDbInfo *dbInfo, char *stbName)
{
int64_t ntbCount = getNtbCountOfStb(taos, dbInfo->name, stbName);
if (ntbCount <= 0) {
return 0;
}
free(dupSeq);
int threads = g_args.thread_num;
int64_t a = ntbCount / threads;
if (a < 1) {
threads = ntbCount;
a = 1;
}
assert(threads);
int64_t b = ntbCount % threads;
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
assert(pids);
assert(infos);
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->taos = taos_connect(
g_args.host,
g_args.user,
g_args.password,
dbInfo->name,
g_args.port
);
if (NULL == pThreadInfo->taos) {
errorPrint("%s() LN%d, Failed to connect to TDengine, reason: %s\n",
__func__,
__LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
return -1;
}
pThreadInfo->threadIndex = i;
pThreadInfo->tablesOfDumpOut = (i<b)?a+1:a;
pThreadInfo->tableFrom = (i==0)?0:
((threadInfo *)(infos + i - 1))->tableFrom +
((threadInfo *)(infos + i - 1))->tablesOfDumpOut;
strcpy(pThreadInfo->dbName, dbInfo->name);
pThreadInfo->precision = getPrecisionByString(dbInfo->precision);
strcpy(pThreadInfo->stbName, stbName);
pthread_create(pids + i, NULL, dumpNormalTablesOfStb, pThreadInfo);
}
for (int64_t i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
int64_t records = 0;
for (int64_t i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
records += pThreadInfo->rowsOfDumpOut;
taos_close(pThreadInfo->taos);
}
free(pids);
free(infos);
return records;
}
static int getDbCount()
static int64_t dumpCreateSTableClauseOfDb(
SDbInfo *dbInfo, FILE *fp)
{
int count = 0;
TAOS *taos = taos_connect(g_args.host,
g_args.user, g_args.password, dbInfo->name, g_args.port);
if (NULL == taos) {
errorPrint(
"Failed to connect to TDengine server %s by specified database %s\n",
g_args.host, dbInfo->name);
return 0;
}
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = "show databases";
TAOS_ROW row;
char command[COMMAND_SIZE] = {0};
/* Connect to server */
taos = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
sprintf(command, "SHOW %s.STABLES", dbInfo->name);
TAOS_RES* res = taos_query(taos, command);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res);
taos_close(taos);
exit(-1);
}
int64_t superTblCnt = 0;
while ((row = taos_fetch_row(res)) != NULL) {
if (0 == dumpStable(row[TSDB_SHOW_TABLES_NAME_INDEX], fp, taos, dbInfo)) {
superTblCnt ++;
}
}
taos_free_result(res);
fprintf(g_fpOfResult,
"# super table counter: %"PRId64"\n",
superTblCnt);
g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt;
taos_close(taos);
return superTblCnt;
}
static int64_t dumpNTablesOfDb(SDbInfo *dbInfo)
{
TAOS *taos = taos_connect(g_args.host,
g_args.user, g_args.password, dbInfo->name, g_args.port);
if (NULL == taos) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
errorPrint(
"Failed to connect to TDengine server %s by specified database %s\n",
g_args.host, dbInfo->name);
return 0;
}
char command[COMMAND_SIZE];
TAOS_RES *result;
int32_t code;
sprintf(command, "USE %s", dbInfo->name);
result = taos_query(taos, command);
int32_t code = taos_errno(result);
code = taos_errno(result);
if (code != 0) {
errorPrint("invalid database %s, reason: %s\n",
dbInfo->name, taos_errstr(result));
taos_close(taos);
return 0;
}
if (0 != code) {
errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
sprintf(command, "SHOW TABLES");
result = taos_query(taos, command);
code = taos_errno(result);
if (code != 0) {
errorPrint("Failed to show %s\'s tables, reason: %s\n",
dbInfo->name, taos_errstr(result));
taos_close(taos);
return 0;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
g_tablesList = calloc(1, dbInfo->ntables * sizeof(TableInfo));
while ((row = taos_fetch_row(result)) != NULL) {
// sys database name : 'log', but subsequent version changed to 'log'
if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
&& (!g_args.allow_sys)) {
continue;
TAOS_ROW row;
int64_t count = 0;
while(NULL != (row = taos_fetch_row(result))) {
debugPrint("%s() LN%d, No.\t%"PRId64" table name: %s\n",
__func__, __LINE__,
count, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
tstrncpy(((TableInfo *)(g_tablesList + count))->name,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX], TSDB_TABLE_NAME_LEN);
char *stbName = (char *) row[TSDB_SHOW_TABLES_METRIC_INDEX];
if (stbName) {
tstrncpy(((TableInfo *)(g_tablesList + count))->stable,
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], TSDB_TABLE_NAME_LEN);
((TableInfo *)(g_tablesList + count))->belongStb = true;
}
count ++;
}
taos_close(taos);
if (g_args.databases) { // input multi dbs
if (inDatabasesSeq(
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0)
continue;
} else if (!g_args.all_databases) { // only input one db
if (strncasecmp(g_args.arg_list[0],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) != 0)
continue;
}
int64_t records = dumpNtbOfDbByThreads(dbInfo, count);
count++;
}
free(g_tablesList);
g_tablesList = NULL;
if (count == 0) {
errorPrint("%d databases valid to dump\n", count);
}
return records;
}
return count;
static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp)
{
taosDumpCreateDbClause(dbInfo, g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n",
dbInfo->name);
g_resultStatistics.totalDatabasesOfDumpOut++;
dumpCreateSTableClauseOfDb(dbInfo, fp);
return dumpNTablesOfDb(dbInfo);
}
static int taosDumpOut() {
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = NULL;
TAOS_ROW row;
FILE *fp = NULL;
int32_t count = 0;
STableRecordInfo tableRecordInfo;
TableRecordInfo tableRecordInfo;
char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) {
......@@ -1328,26 +1386,24 @@ static int taosDumpOut() {
return -1;
}
g_args.dbCount = getDbCount();
g_args.dumpDbCount = getDumpDbCount();
debugPrint("%s() LN%d, dump db count: %d\n",
__func__, __LINE__, g_args.dumpDbCount);
if (0 == g_args.dbCount) {
if (0 == g_args.dumpDbCount) {
errorPrint("%d databases valid to dump\n", g_args.dumpDbCount);
fclose(fp);
errorPrint("%d databases valid to dump\n", g_args.dbCount);
return -1;
}
g_dbInfos = (SDbInfo **)calloc(g_args.dbCount, sizeof(SDbInfo *));
g_dbInfos = (SDbInfo **)calloc(g_args.dumpDbCount, sizeof(SDbInfo *));
if (g_dbInfos == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
goto _exit_failure;
}
command = (char *)malloc(COMMAND_SIZE);
if (command == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
goto _exit_failure;
}
char command[COMMAND_SIZE];
/* Connect to server */
taos = taos_connect(g_args.host, g_args.user, g_args.password,
......@@ -1367,7 +1423,7 @@ static int taosDumpOut() {
int32_t code = taos_errno(result);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n",
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
goto _exit_failure;
}
......@@ -1450,10 +1506,11 @@ static int taosDumpOut() {
count++;
if (g_args.databases) {
if (count > g_args.dbCount) break;
if (count > g_args.dumpDbCount)
break;
} else if (!g_args.all_databases) {
if (count >= 1) break;
if (count >= 1)
break;
}
}
......@@ -1462,94 +1519,52 @@ static int taosDumpOut() {
goto _exit_failure;
}
if (g_args.databases || g_args.all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases
if (g_args.databases || g_args.all_databases) { // case: taosdump --databases dbx,dby ... OR taosdump --all-databases
for (int i = 0; i < count; i++) {
taosDumpDb(g_dbInfos[i], fp, taos);
int64_t records = 0;
records = dumpWholeDatabase(g_dbInfos[i], fp);
if (records >= 0) {
okPrint("Database %s dumped\n", g_dbInfos[i]->name);
g_totalDumpOutRows += records;
}
}
} else {
if (1 == g_args.arg_list_len) {
int64_t records = dumpWholeDatabase(g_dbInfos[0], fp);
if (records >= 0) {
okPrint("Database %s dumped\n", g_dbInfos[0]->name);
g_totalDumpOutRows += records;
}
} else {
if (g_args.dbCount == 1) { // case: taosdump <db>
taosDumpDb(g_dbInfos[0], fp, taos);
} else { // case: taosdump <db> tablex tabley ...
taosDumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n",
g_dbInfos[0]->name);
g_resultStatistics.totalDatabasesOfDumpOut++;
sprintf(command, "use %s", g_dbInfos[0]->name);
result = taos_query(taos, command);
code = taos_errno(result);
if (code != 0) {
errorPrint("invalid database %s\n", g_dbInfos[0]->name);
goto _exit_failure;
}
fprintf(fp, "USE %s;\n\n", g_dbInfos[0]->name);
int32_t totalNumOfThread = 1; // 0: all normal table into .tables.tmp.0
int normalTblFd = -1;
int32_t retCode;
int superTblCnt = 0 ;
for (int i = 1; g_args.arg_list[i]; i++) {
if (taosGetTableRecordInfo(g_args.arg_list[i],
if (taosGetTableRecordInfo(g_dbInfos[0]->name,
g_args.arg_list[i],
&tableRecordInfo, taos) < 0) {
errorPrint("input the invalid table %s\n",
g_args.arg_list[i]);
continue;
}
if (tableRecordInfo.isMetric) { // dump all table of this metric
int ret = taosDumpStable(
tableRecordInfo.tableRecord.metric,
fp, taos, g_dbInfos[0]->name);
if (0 == ret) {
int64_t records = 0;
if (tableRecordInfo.isStable) { // dump all table of this stable
int ret = dumpStable(
tableRecordInfo.tableRecord.stable,
fp, taos, g_dbInfos[0]);
if (ret >= 0) {
superTblCnt++;
records = dumpNtbOfStbByThreads(taos, g_dbInfos[0], g_args.arg_list[i]);
}
retCode = taosSaveTableOfMetricToTempFile(
taos, tableRecordInfo.tableRecord.metric,
&totalNumOfThread);
} else {
if (tableRecordInfo.tableRecord.metric[0] != '\0') { // dump this sub table and it's metric
int ret = taosDumpStable(
tableRecordInfo.tableRecord.metric,
fp, taos, g_dbInfos[0]->name);
if (0 == ret) {
superTblCnt++;
}
}
retCode = taosSaveAllNormalTableToTempFile(
taos, tableRecordInfo.tableRecord.name,
tableRecordInfo.tableRecord.metric, &normalTblFd);
}
if (retCode < 0) {
if (-1 != normalTblFd){
taosClose(normalTblFd);
}
goto _clean_tmp_file;
records = dumpNormalTableWithoutStb(taos, g_dbInfos[0], g_args.arg_list[i]);
}
}
// TODO: save dump super table <superTblCnt> into result_output.txt
fprintf(g_fpOfResult, "# super table counter: %d\n",
superTblCnt);
g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt;
if (-1 != normalTblFd){
taosClose(normalTblFd);
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(totalNumOfThread,
g_dbInfos[0]->name,
getPrecisionByString(g_dbInfos[0]->precision));
char tmpFileName[MAX_FILE_NAME_LEN];
_clean_tmp_file:
for (int loopCnt = 0; loopCnt < totalNumOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
remove(tmpFileName);
if (records >= 0) {
okPrint("table: %s dumped\n", g_args.arg_list[i]);
g_totalDumpOutRows += records;
}
}
}
......@@ -1558,7 +1573,6 @@ _clean_tmp_file:
fclose(fp);
taos_close(taos);
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return 0;
......@@ -1567,7 +1581,6 @@ _exit_failure:
fclose(fp);
taos_close(taos);
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return -1;
......@@ -1575,18 +1588,18 @@ _exit_failure:
static int taosGetTableDes(
char* dbName, char *table,
STableDef *stableDes, TAOS* taosCon, bool isSuperTable) {
STableDef *stableDes, TAOS* taos, bool isSuperTable) {
TAOS_ROW row = NULL;
TAOS_RES* res = NULL;
int count = 0;
int colCount = 0;
char sqlstr[COMMAND_SIZE];
sprintf(sqlstr, "describe %s.%s;", dbName, table);
res = taos_query(taosCon, sqlstr);
res = taos_query(taos, sqlstr);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n",
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
return -1;
......@@ -1596,41 +1609,41 @@ static int taosGetTableDes(
tstrncpy(stableDes->name, table, TSDB_TABLE_NAME_LEN);
while ((row = taos_fetch_row(res)) != NULL) {
tstrncpy(stableDes->cols[count].field,
tstrncpy(stableDes->cols[colCount].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
min(TSDB_COL_NAME_LEN + 1,
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes + 1));
tstrncpy(stableDes->cols[count].type,
tstrncpy(stableDes->cols[colCount].type,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(16, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes + 1));
stableDes->cols[count].length =
stableDes->cols[colCount].length =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(stableDes->cols[count].note,
tstrncpy(stableDes->cols[colCount].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
min(COL_NOTE_LEN,
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes + 1));
count++;
colCount++;
}
taos_free_result(res);
res = NULL;
if (isSuperTable) {
return count;
return colCount;
}
// if child-table have tag, using select tagName from table to get tagValue
for (int i = 0 ; i < count; i++) {
for (int i = 0 ; i < colCount; i++) {
if (strcmp(stableDes->cols[i].note, "TAG") != 0) continue;
sprintf(sqlstr, "select %s from %s.%s",
stableDes->cols[i].field, dbName, table);
res = taos_query(taosCon, sqlstr);
res = taos_query(taos, sqlstr);
code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n",
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
return -1;
......@@ -1646,7 +1659,7 @@ static int taosGetTableDes(
return -1;
}
if (row[0] == NULL) {
if (row[TSDB_SHOW_TABLES_NAME_INDEX] == NULL) {
sprintf(stableDes->cols[i].note, "%s", "NULL");
taos_free_result(res);
res = NULL;
......@@ -1659,32 +1672,33 @@ static int taosGetTableDes(
switch (fields[0].type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(stableDes->cols[i].note, "%d",
((((int32_t)(*((char *)row[0]))) == 1) ? 1 : 0));
((((int32_t)(*((char *)row[TSDB_SHOW_TABLES_NAME_INDEX]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(stableDes->cols[i].note, "%d", *((int8_t *)row[0]));
sprintf(stableDes->cols[i].note, "%d",
*((int8_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(stableDes->cols[i].note, "%d", *((int16_t *)row[0]));
sprintf(stableDes->cols[i].note, "%d", *((int16_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(stableDes->cols[i].note, "%d", *((int32_t *)row[0]));
sprintf(stableDes->cols[i].note, "%d", *((int32_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(stableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[0]));
sprintf(stableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_FLOAT:
sprintf(stableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[0]));
sprintf(stableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_DOUBLE:
sprintf(stableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0]));
sprintf(stableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_BINARY:
{
memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note));
stableDes->cols[i].note[0] = '\'';
char tbuf[COL_NOTE_LEN];
converStringToReadable((char *)row[0], length[0], tbuf, COL_NOTE_LEN);
converStringToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN);
char* pstr = stpcpy(&(stableDes->cols[i].note[1]), tbuf);
*(pstr++) = '\'';
break;
......@@ -1693,518 +1707,156 @@ static int taosGetTableDes(
{
memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note));
char tbuf[COL_NOTE_LEN-2]; // need reserve 2 bytes for ' '
convertNCharToReadable((char *)row[0], length[0], tbuf, COL_NOTE_LEN);
convertNCharToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN);
sprintf(stableDes->cols[i].note, "\'%s\'", tbuf);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
sprintf(stableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
sprintf(stableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
#if 0
if (!g_args.mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[0]);
int64_t ts = *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
break;
default:
break;
}
taos_free_result(res);
res = NULL;
}
return count;
}
static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
return 0;
}
static int32_t taosDumpTable(
char *tbName, char *metric,
FILE *fp, TAOS* taosCon, char* dbName, int precision) {
int count = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef)
+ sizeof(SColDes) * TSDB_MAX_COLUMNS);
if (metric != NULL && metric[0] != '\0') { // dump table schema which is created by using super table
/*
count = taosGetTableDes(metric, tableDes, taosCon);
if (count < 0) {
free(tableDes);
return -1;
}
taosDumpCreateTableClause(tableDes, count, fp);
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
count = taosGetTableDes(dbName, tbName, tableDes, taosCon, false);
if (count < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, metric, count, fp, dbName);
} else { // dump table definition
count = taosGetTableDes(dbName, tbName, tableDes, taosCon, false);
if (count < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
taosDumpCreateTableClause(tableDes, count, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
convertSchemaToAvroSchema(tableDes, &jsonAvroSchema);
}
free(tableDes);
int32_t ret = 0;
if (!g_args.schemaonly) {
ret = taosDumpTableData(fp, tbName, taosCon, dbName, precision,
jsonAvroSchema);
}
return ret;
}
static void taosDumpCreateDbClause(
SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
char *pstr = sqlstr;
pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name);
if (isDumpProperty) {
pstr += sprintf(pstr,
"REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d",
dbInfo->replica, dbInfo->quorum, dbInfo->days,
dbInfo->keeplist,
dbInfo->cache,
dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows,
dbInfo->fsync,
dbInfo->cachelast,
dbInfo->comp, dbInfo->precision, dbInfo->update);
}
pstr += sprintf(pstr, ";");
fprintf(fp, "%s\n\n", sqlstr);
}
static void* taosDumpOutWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
STableRecord tableRecord;
int fd;
setThreadName("dumpOutWorkThrd");
char tmpBuf[4096] = {0};
sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
return NULL;
}
FILE *fp = NULL;
memset(tmpBuf, 0, 4096);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d.sql",
g_args.outpath, pThread->dbName, pThread->threadIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d.sql",
pThread->dbName, pThread->threadIndex);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
close(fd);
return NULL;
}
memset(tmpBuf, 0, 4096);
sprintf(tmpBuf, "use %s", pThread->dbName);
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
errorPrint("%s() LN%d, invalid database %s. reason: %s\n",
__func__, __LINE__, pThread->dbName, taos_errstr(tmpResult));
taos_free_result(tmpResult);
fclose(fp);
close(fd);
return NULL;
}
#if 0
int fileNameIndex = 1;
int tablesInOneFile = 0;
#endif
int64_t lastRowsPrint = 5000000;
fprintf(fp, "USE %s;\n\n", pThread->dbName);
while (1) {
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
if (readLen <= 0) break;
int ret = taosDumpTable(
tableRecord.name, tableRecord.metric,
fp, pThread->taosCon, pThread->dbName,
pThread->precision);
if (ret >= 0) {
// TODO: sum table count and table rows by self
pThread->tablesOfDumpOut++;
pThread->rowsOfDumpOut += ret;
if (pThread->rowsOfDumpOut >= lastRowsPrint) {
printf(" %"PRId64 " rows already be dumpout from database %s\n",
pThread->rowsOfDumpOut, pThread->dbName);
lastRowsPrint += 5000000;
}
#if 0
tablesInOneFile++;
if (tablesInOneFile >= g_args.table_batch) {
fclose(fp);
tablesInOneFile = 0;
memset(tmpBuf, 0, 4096);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql",
g_args.outpath, pThread->dbName,
pThread->threadIndex, fileNameIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d-%d.sql",
pThread->dbName, pThread->threadIndex, fileNameIndex);
}
fileNameIndex++;
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
close(fd);
taos_free_result(tmpResult);
return NULL;
}
}
#endif
}
}
taos_free_result(tmpResult);
close(fd);
fclose(fp);
return NULL;
}
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName, int precision)
{
pthread_attr_t thattr;
SThreadParaObj *threadObj =
(SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
if (threadObj == NULL) {
errorPrint("%s() LN%d, memory allocation failed!\n",
__func__, __LINE__);
return;
}
for (int t = 0; t < numOfThread; ++t) {
SThreadParaObj *pThread = threadObj + t;
pThread->rowsOfDumpOut = 0;
pThread->tablesOfDumpOut = 0;
pThread->threadIndex = t;
pThread->totalThreads = numOfThread;
tstrncpy(pThread->dbName, dbName, TSDB_DB_NAME_LEN);
pThread->precision = precision;
pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (pThread->taosCon == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
free(threadObj);
return;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr,
taosDumpOutWorkThreadFp,
(void*)pThread) != 0) {
errorPrint("%s() LN%d, thread:%d failed to start\n",
__func__, __LINE__, pThread->threadIndex);
exit(-1);
}
}
for (int32_t t = 0; t < numOfThread; ++t) {
pthread_join(threadObj[t].threadID, NULL);
}
// TODO: sum all thread dump table count and rows of per table, then save into result_output.txt
int64_t totalRowsOfDumpOut = 0;
int64_t totalChildTblsOfDumpOut = 0;
for (int32_t t = 0; t < numOfThread; ++t) {
totalChildTblsOfDumpOut += threadObj[t].tablesOfDumpOut;
totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut;
}
fprintf(g_fpOfResult, "# child table counter: %"PRId64"\n",
totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# row counter: %"PRId64"\n",
totalRowsOfDumpOut);
g_resultStatistics.totalChildTblsOfDumpOut += totalChildTblsOfDumpOut;
g_resultStatistics.totalRowsOfDumpOut += totalRowsOfDumpOut;
free(threadObj);
}
static int32_t taosDumpStable(char *table, FILE *fp,
TAOS* taosCon, char* dbName) {
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
STableDef *stableDes = (STableDef *)calloc(1, sizeOfTableDes);
if (NULL == stableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
int count = taosGetTableDes(dbName, table, stableDes, taosCon, true);
if (count < 0) {
free(stableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, table);
exit(-1);
}
taosDumpCreateTableClause(stableDes, count, fp, dbName);
free(stableDes);
return 0;
}
static int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE *fp)
{
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
sprintf(sqlstr, "show %s.stables", dbName);
TAOS_RES* res = taos_query(taosCon, sqlstr);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
exit(-1);
}
TAOS_FIELD *fields = taos_fetch_fields(res);
char tmpFileName[MAX_FILE_NAME_LEN];
memset(tmpFileName, 0, MAX_FILE_NAME_LEN);
sprintf(tmpFileName, ".stables.tmp");
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpFileName);
taos_free_result(res);
(void)remove(".stables.tmp");
exit(-1);
}
while ((row = taos_fetch_row(res)) != NULL) {
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1));
taosWrite(fd, &tableRecord, sizeof(STableRecord));
}
#endif
break;
default:
break;
}
taos_free_result(res);
(void)lseek(fd, 0, SEEK_SET);
int superTblCnt = 0;
while (1) {
ssize_t readLen = read(fd, &tableRecord, sizeof(STableRecord));
if (readLen <= 0) break;
int ret = taosDumpStable(tableRecord.name, fp, taosCon, dbName);
if (0 == ret) {
superTblCnt++;
}
res = NULL;
}
// TODO: save dump super table <superTblCnt> into result_output.txt
fprintf(g_fpOfResult, "# super table counter: %d\n", superTblCnt);
g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt;
close(fd);
(void)remove(".stables.tmp");
return colCount;
}
static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
return 0;
}
static int64_t taosDumpTable(
char *tbName, char *stable,
FILE *fp, TAOS* taos, char* dbName, int precision) {
int colCount = 0;
static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
taosDumpCreateDbClause(dbInfo, g_args.with_property, fp);
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef)
+ sizeof(SColDes) * TSDB_MAX_COLUMNS);
fprintf(g_fpOfResult, "\n#### database: %s\n",
dbInfo->name);
g_resultStatistics.totalDatabasesOfDumpOut++;
if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table
/*
colCount = taosGetTableDes(stable, tableDes, taos);
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
if (count < 0) {
free(tableDes);
return -1;
}
fprintf(fp, "USE %s;\n\n", dbInfo->name);
taosDumpCreateTableClause(tableDes, count, fp);
(void)taosDumpCreateSuperTableClause(taosCon, dbInfo->name, fp);
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
sprintf(sqlstr, "show %s.tables", dbInfo->name);
colCount = taosGetTableDes(dbName, tbName, tableDes, taos, false);
TAOS_RES* res = taos_query(taosCon, sqlstr);
int code = taos_errno(res);
if (code != 0) {
errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
if (colCount < 0) {
free(tableDes);
return -1;
}
char tmpBuf[MAX_FILE_NAME_LEN];
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".show-tables.tmp");
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
taos_free_result(res);
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, stable, colCount, fp, dbName);
} else { // dump table definition
colCount = taosGetTableDes(dbName, tbName, tableDes, taos, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
TAOS_FIELD *fields = taos_fetch_fields(res);
// create normal-table or super-table
taosDumpCreateTableClause(tableDes, colCount, fp, dbName);
}
int32_t numOfTable = 0;
while ((row = taos_fetch_row(res)) != NULL) {
memset(&tableRecord, 0, sizeof(STableRecord));
tstrncpy(tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1));
tstrncpy(tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1));
char *jsonAvroSchema = NULL;
if (g_args.avro) {
convertSchemaToAvroSchema(tableDes, &jsonAvroSchema);
}
taosWrite(fd, &tableRecord, sizeof(STableRecord));
free(tableDes);
numOfTable++;
int64_t ret = 0;
if (!g_args.schemaonly) {
ret = taosDumpTableData(fp, tbName, taos, dbName, precision,
jsonAvroSchema);
}
taos_free_result(res);
lseek(fd, 0, SEEK_SET);
int maxThreads = g_args.thread_num;
int tableOfPerFile ;
if (numOfTable <= g_args.thread_num) {
tableOfPerFile = 1;
maxThreads = numOfTable;
} else {
tableOfPerFile = numOfTable / g_args.thread_num;
if (0 != numOfTable % g_args.thread_num) {
tableOfPerFile += 1;
}
}
return ret;
}
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
if (NULL == tblBuf){
errorPrint("failed to calloc %" PRIzu "\n",
tableOfPerFile * sizeof(STableRecord));
close(fd);
return -1;
}
static void taosDumpCreateDbClause(
SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
int32_t numOfThread = 0;
int subFd = -1;
for (numOfThread = 0; numOfThread < maxThreads; numOfThread++) {
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (subFd == -1) {
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
}
sprintf(tmpBuf, ".show-tables.tmp");
(void)remove(tmpBuf);
free(tblBuf);
close(fd);
return -1;
char *pstr = sqlstr;
pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name);
if (isDumpProperty) {
pstr += sprintf(pstr,
"REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d",
dbInfo->replica, dbInfo->quorum, dbInfo->days,
dbInfo->keeplist,
dbInfo->cache,
dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows,
dbInfo->fsync,
dbInfo->cachelast,
dbInfo->comp, dbInfo->precision, dbInfo->update);
}
// read tableOfPerFile for fd, write to subFd
ssize_t readLen = read(fd, tblBuf, tableOfPerFile * sizeof(STableRecord));
if (readLen <= 0) {
close(subFd);
break;
}
taosWrite(subFd, tblBuf, readLen);
close(subFd);
}
pstr += sprintf(pstr, ";");
fprintf(fp, "%s\n\n", sqlstr);
}
static int dumpStable(char *stbName, FILE *fp,
TAOS* taos, SDbInfo *dbInfo)
{
sprintf(tmpBuf, ".show-tables.tmp");
(void)remove(tmpBuf);
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
if (fd >= 0) {
close(fd);
fd = -1;
STableDef *stableDes = (STableDef *)calloc(1, sizeOfTableDes);
if (NULL == stableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(numOfThread, dbInfo->name,
getPrecisionByString(dbInfo->precision));
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
int colCount = taosGetTableDes(dbInfo->name,
stbName, stableDes, taos, true);
if (colCount < 0) {
free(stableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, stbName);
exit(-1);
}
free(tblBuf);
taosDumpCreateTableClause(stableDes, colCount, fp, dbInfo->name);
free(stableDes);
return 0;
}
static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
static int taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
FILE *fp, char* dbName) {
int counter = 0;
int count_temp = 0;
......@@ -2251,10 +1903,11 @@ static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n\n", sqlstr);
debugPrint("%s() LN%d, write string: %s\n", __func__, __LINE__, sqlstr);
return fprintf(fp, "%s\n\n", sqlstr);
}
static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable,
int numOfCols, FILE *fp, char* dbName) {
int counter = 0;
int count_temp = 0;
......@@ -2271,7 +1924,7 @@ static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, metric);
dbName, tableDes->name, dbName, stable);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
......@@ -2471,8 +2124,8 @@ static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbN
return 0;
}
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName, int precision,
static int64_t taosDumpTableData(FILE *fp, char *tbName,
TAOS* taos, char* dbName, int precision,
char *jsonAvroSchema) {
int64_t totalRows = 0;
......@@ -2505,7 +2158,7 @@ static int taosDumpTableData(FILE *fp, char *tbName,
"select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;",
dbName, tbName, start_time, end_time);
TAOS_RES* res = taos_query(taosCon, sqlstr);
TAOS_RES* res = taos_query(taos, sqlstr);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("failed to run command %s, reason: %s\n",
......@@ -2941,7 +2594,7 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static void* taosDumpInWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
threadInfo *pThread = (threadInfo*)arg;
setThreadName("dumpInWorkThrd");
for (int32_t f = 0; f < g_tsSqlFileNum; ++f) {
......@@ -2953,7 +2606,7 @@ static void* taosDumpInWorkThreadFp(void *arg)
}
fprintf(stderr, ", Success Open input file: %s\n",
SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, g_tsCharset, g_args.encode, SQLFileName);
taosDumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName);
}
}
......@@ -2963,15 +2616,15 @@ static void* taosDumpInWorkThreadFp(void *arg)
static void taosStartDumpInWorkThreads()
{
pthread_attr_t thattr;
SThreadParaObj *pThread;
threadInfo *pThread;
int32_t totalThreads = g_args.thread_num;
if (totalThreads > g_tsSqlFileNum) {
totalThreads = g_tsSqlFileNum;
}
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(
totalThreads, sizeof(SThreadParaObj));
threadInfo *threadObj = (threadInfo *)calloc(
totalThreads, sizeof(threadInfo));
if (NULL == threadObj) {
errorPrint("%s() LN%d, memory allocation failed\n", __func__, __LINE__);
......@@ -2981,9 +2634,9 @@ static void taosStartDumpInWorkThreads()
pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = totalThreads;
pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password,
pThread->taos = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (pThread->taosCon == NULL) {
if (pThread->taos == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
free(threadObj);
return;
......@@ -3004,7 +2657,7 @@ static void taosStartDumpInWorkThreads()
}
for (int t = 0; t < totalThreads; ++t) {
taos_close(threadObj[t].taosCon);
taos_close(threadObj[t].taos);
}
free(threadObj);
}
......@@ -3054,3 +2707,154 @@ static int taosDumpIn() {
return 0;
}
int main(int argc, char *argv[]) {
static char verType[32] = {0};
sprintf(verType, "version: %s\n", version);
argp_program_version = verType;
int ret = 0;
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
if (argc > 1) {
// parse_precision_first(argc, argv, &g_args);
parse_timestamp(argc, argv, &g_args);
parse_args(argc, argv, &g_args);
}
argp_parse(&argp, argc, argv, 0, 0, &g_args);
if (g_args.abort) {
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
abort();
#endif
}
printf("====== arguments config ======\n");
{
printf("host: %s\n", g_args.host);
printf("user: %s\n", g_args.user);
printf("password: %s\n", g_args.password);
printf("port: %u\n", g_args.port);
printf("mysqlFlag: %d\n", g_args.mysqlFlag);
printf("outpath: %s\n", g_args.outpath);
printf("inpath: %s\n", g_args.inpath);
printf("resultFile: %s\n", g_args.resultFile);
printf("encode: %s\n", g_args.encode);
printf("all_databases: %s\n", g_args.all_databases?"true":"false");
printf("databases: %d\n", g_args.databases);
printf("databasesSeq: %s\n", g_args.databasesSeq);
printf("schemaonly: %s\n", g_args.schemaonly?"true":"false");
printf("with_property: %s\n", g_args.with_property?"true":"false");
printf("avro format: %s\n", g_args.avro?"true":"false");
printf("start_time: %" PRId64 "\n", g_args.start_time);
printf("human readable start time: %s \n", g_args.humanStartTime);
printf("end_time: %" PRId64 "\n", g_args.end_time);
printf("human readable end time: %s \n", g_args.humanEndTime);
printf("precision: %s\n", g_args.precision);
printf("data_batch: %d\n", g_args.data_batch);
printf("max_sql_len: %d\n", g_args.max_sql_len);
printf("table_batch: %d\n", g_args.table_batch);
printf("thread_num: %d\n", g_args.thread_num);
printf("allow_sys: %d\n", g_args.allow_sys);
printf("abort: %d\n", g_args.abort);
printf("isDumpIn: %d\n", g_args.isDumpIn);
printf("arg_list_len: %d\n", g_args.arg_list_len);
printf("debug_print: %d\n", g_args.debug_print);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
printf("arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
printf("==============================\n");
if (taosCheckParam(&g_args) < 0) {
exit(EXIT_FAILURE);
}
g_fpOfResult = fopen(g_args.resultFile, "a");
if (NULL == g_fpOfResult) {
errorPrint("Failed to open %s for save result\n", g_args.resultFile);
exit(-1);
};
fprintf(g_fpOfResult, "#############################################################################\n");
fprintf(g_fpOfResult, "============================== arguments config =============================\n");
{
fprintf(g_fpOfResult, "host: %s\n", g_args.host);
fprintf(g_fpOfResult, "user: %s\n", g_args.user);
fprintf(g_fpOfResult, "password: %s\n", g_args.password);
fprintf(g_fpOfResult, "port: %u\n", g_args.port);
fprintf(g_fpOfResult, "mysqlFlag: %d\n", g_args.mysqlFlag);
fprintf(g_fpOfResult, "outpath: %s\n", g_args.outpath);
fprintf(g_fpOfResult, "inpath: %s\n", g_args.inpath);
fprintf(g_fpOfResult, "resultFile: %s\n", g_args.resultFile);
fprintf(g_fpOfResult, "encode: %s\n", g_args.encode);
fprintf(g_fpOfResult, "all_databases: %s\n", g_args.all_databases?"true":"false");
fprintf(g_fpOfResult, "databases: %d\n", g_args.databases);
fprintf(g_fpOfResult, "databasesSeq: %s\n", g_args.databasesSeq);
fprintf(g_fpOfResult, "schemaonly: %s\n", g_args.schemaonly?"true":"false");
fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false");
fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false");
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time);
fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time);
fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime);
fprintf(g_fpOfResult, "precision: %s\n", g_args.precision);
fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len);
fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch);
fprintf(g_fpOfResult, "thread_num: %d\n", g_args.thread_num);
fprintf(g_fpOfResult, "allow_sys: %d\n", g_args.allow_sys);
fprintf(g_fpOfResult, "abort: %d\n", g_args.abort);
fprintf(g_fpOfResult, "isDumpIn: %d\n", g_args.isDumpIn);
fprintf(g_fpOfResult, "arg_list_len: %d\n", g_args.arg_list_len);
for (int32_t i = 0; i < g_args.arg_list_len; i++) {
fprintf(g_fpOfResult, "arg_list[%d]: %s\n", i, g_args.arg_list[i]);
}
}
g_numOfCores = (int32_t)sysconf(_SC_NPROCESSORS_ONLN);
time_t tTime = time(NULL);
struct tm tm = *localtime(&tTime);
if (g_args.isDumpIn) {
fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n");
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn() < 0) {
ret = -1;
}
} else {
fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n");
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut() < 0) {
ret = -1;
} else {
fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n");
fprintf(g_fpOfResult, "# total database count: %d\n",
g_resultStatistics.totalDatabasesOfDumpOut);
fprintf(g_fpOfResult, "# total super table count: %d\n",
g_resultStatistics.totalSuperTblsOfDumpOut);
fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n",
g_resultStatistics.totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# total row count: %"PRId64"\n",
g_resultStatistics.totalRowsOfDumpOut);
}
}
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
if (g_tablesList) {
free(g_tablesList);
}
return ret;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册