未验证 提交 0d7fbe1f 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 5053 taosdump support nanosecond for master (#6778)

* cherrypick from develop branch

* fix arm32 compile issue.

* [TD-5053]<feature>: taosdump supports nanosecond.

* fix precision parsing order issue.

* [TD-5053]<fix>: taosdump support nanosecond.

pre-cherry-pick.
上级 d3e3a343
......@@ -27,7 +27,11 @@
#include "tutil.h"
#include <taos.h>
#define TSDB_SUPPORT_NANOSECOND 0
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255
#define COMMAND_SIZE 65536
#define MAX_RECORDS_PER_REQ 32766
//#define DEFAULT_DUMP_FILE "taosdump.sql"
// for strncpy buffer overflow
......@@ -172,9 +176,9 @@ typedef struct {
int32_t totalDatabasesOfDumpOut;
} resultStatistics;
static int64_t totalDumpOutRows = 0;
static int64_t g_totalDumpOutRows = 0;
SDbInfo **dbInfos = NULL;
SDbInfo **g_dbInfos = NULL;
const char *argp_program_version = version;
const char *argp_program_bug_address = "<support@taosdata.com>";
......@@ -198,11 +202,11 @@ static struct argp_option options[] = {
// connection option
{"host", 'h', "HOST", 0, "Server host dumping data from. Default is localhost.", 0},
{"user", 'u', "USER", 0, "User name used to connect to server. Default is root.", 0},
#ifdef _TD_POWER_
#ifdef _TD_POWER_
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is powerdb.", 0},
#else
#else
{"password", 'p', "PASSWORD", 0, "User password to connect to server. Default is taosdata.", 0},
#endif
#endif
{"port", 'P', "PORT", 0, "Port to connect", 0},
{"cversion", 'v', "CVERION", 0, "client version", 0},
{"mysqlFlag", 'q', "MYSQLFLAG", 0, "mysqlFlag, Default is 0", 0},
......@@ -210,28 +214,36 @@ static struct argp_option options[] = {
{"outpath", 'o', "OUTPATH", 0, "Output file path.", 1},
{"inpath", 'i', "INPATH", 0, "Input file path.", 1},
{"resultFile", 'r', "RESULTFILE", 0, "DumpOut/In Result file path and name.", 1},
#ifdef _TD_POWER_
#ifdef _TD_POWER_
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/power/taos.cfg.", 1},
#else
#else
{"config", 'c', "CONFIG_DIR", 0, "Configure directory. Default is /etc/taos/taos.cfg.", 1},
#endif
#endif
{"encode", 'e', "ENCODE", 0, "Input file encoding.", 1},
// dump unit options
{"all-databases", 'A', 0, 0, "Dump all databases.", 2},
{"databases", 'D', 0, 0, "Dump assigned databases", 2},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 2},
// dump format options
{"schemaonly", 's', 0, 0, "Only dump schema.", 3},
{"without-property", 'N', 0, 0, "Dump schema without properties.", 3},
{"start-time", 'S', "START_TIME", 0, "Start time to dump. Either Epoch or ISO8601/RFC3339 format is acceptable. Epoch precision millisecond. ISO8601 format example: 2017-10-01T18:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 3},
{"end-time", 'E', "END_TIME", 0, "End time to dump. Either Epoch or ISO8601/RFC3339 format is acceptable. Epoch precision millisecond. ISO8601 format example: 2017-10-01T18:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 3},
{"data-batch", 'B', "DATA_BATCH", 0, "Number of data point per insert statement. Default is 1.", 3},
{"schemaonly", 's', 0, 0, "Only dump schema.", 2},
{"without-property", 'N', 0, 0, "Dump schema without properties.", 2},
{"avro", 'V', 0, 0, "Dump apache avro format data file. By default, dump sql command sequence.", 2},
{"start-time", 'S', "START_TIME", 0, "Start time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T18:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 4},
{"end-time", 'E', "END_TIME", 0, "End time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T18:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 5},
#if TSDB_SUPPORT_NANOSECOND == 1
{"precision", 'C', "PRECISION", 0, "Epoch precision. Valid value is one of ms, us, and ns. Default is ms.", 6},
#else
{"precision", 'C', "PRECISION", 0, "Epoch precision. Valid value is one of ms and us. Default is ms.", 6},
#endif
{"data-batch", 'B', "DATA_BATCH", 0, "Number of data point per insert statement. Max value is 32766. Default is 1.", 3},
{"max-sql-len", 'L', "SQL_LEN", 0, "Max length of one sql. Default is 65480.", 3},
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
{"thread_num", 'T', "THREAD_NUM", 0, "Number of thread for dump in file. Default is 5.", 3},
{"allow-sys", 'a', 0, 0, "Allow to dump sys database", 3},
{"debug", 'g', 0, 0, "Print debug info.", 1},
{"verbose", 'v', 0, 0, "Print verbose debug info.", 1},
{0}};
{"debug", 'g', 0, 0, "Print debug info.", 8},
{"verbose", 'b', 0, 0, "Print verbose debug info.", 9},
{"performanceprint", 'm', 0, 0, "Print performance debug info.", 10},
{0}
};
/* Used by main to communicate with parse_opt. */
typedef struct arguments {
......@@ -243,8 +255,8 @@ typedef struct arguments {
char cversion[12];
uint16_t mysqlFlag;
// output file
char outpath[TSDB_FILENAME_LEN+1];
char inpath[TSDB_FILENAME_LEN+1];
char outpath[MAX_FILE_NAME_LEN];
char inpath[MAX_FILE_NAME_LEN];
// result file
char *resultFile;
char *encode;
......@@ -254,8 +266,10 @@ typedef struct arguments {
// dump format option
bool schemaonly;
bool with_property;
bool avro;
int64_t start_time;
int64_t end_time;
char precision[8];
int32_t data_batch;
int32_t max_sql_len;
int32_t table_batch; // num of table which will be dump into one output file.
......@@ -271,101 +285,175 @@ typedef struct arguments {
bool performance_print;
} SArguments;
/* Our argp parser. */
static error_t parse_opt(int key, char *arg, struct argp_state *state);
static struct argp argp = {options, parse_opt, args_doc, doc};
static resultStatistics g_resultStatistics = {0};
static FILE *g_fpOfResult = NULL;
static int g_numOfCores = 1;
static int taosDumpOut();
static int taosDumpIn();
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty,
FILE *fp);
static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon);
static int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon,
char* dbName);
static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
FILE *fp, char* dbName);
static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
int numOfCols, FILE *fp, char* dbName);
static int32_t taosDumpTable(char *table, char *metric,
FILE *fp, TAOS* taosCon, char* dbName);
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName,
char *jsonAvroSchema);
static int taosCheckParam(struct arguments *arguments);
static void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName);
struct arguments g_args = {
// connection option
NULL,
"root",
#ifdef _TD_POWER_
"powerdb",
#else
"taosdata",
#endif
0,
"",
0,
// outpath and inpath
"",
"",
"./dump_result.txt",
NULL,
// dump unit option
false,
false,
// dump format option
false, // schemeonly
true, // with_property
false, // avro format
-INT64_MAX, // start_time
INT64_MAX, // end_time
"ms", // precision
1, // data_batch
TSDB_MAX_SQL_LEN, // max_sql_len
1, // table_batch
false, // allow_sys
// other options
5, // thread_num
0, // abort
NULL, // arg_list
0, // arg_list_len
false, // isDumpIn
false, // debug_print
false, // verbose_print
false // performance_print
};
/* Parse a single option. */
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
/* Get the input argument from argp_parse, which we
know is a pointer to our arguments structure. */
struct arguments *arguments = state->input;
wordexp_t full_path;
switch (key) {
// connection option
case 'a':
arguments->allow_sys = true;
g_args.allow_sys = true;
break;
case 'h':
arguments->host = arg;
g_args.host = arg;
break;
case 'u':
arguments->user = arg;
g_args.user = arg;
break;
case 'p':
arguments->password = arg;
g_args.password = arg;
break;
case 'P':
arguments->port = atoi(arg);
g_args.port = atoi(arg);
break;
case 'q':
arguments->mysqlFlag = atoi(arg);
g_args.mysqlFlag = atoi(arg);
break;
case 'v':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid client vesion %s\n", arg);
errorPrint("Invalid client vesion %s\n", arg);
return -1;
}
tstrncpy(arguments->cversion, full_path.we_wordv[0], 11);
tstrncpy(g_args.cversion, full_path.we_wordv[0], 11);
wordfree(&full_path);
break;
// output file path
case 'o':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
errorPrint("Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->outpath, full_path.we_wordv[0], TSDB_FILENAME_LEN);
tstrncpy(g_args.outpath, full_path.we_wordv[0],
MAX_FILE_NAME_LEN);
wordfree(&full_path);
break;
case 'g':
arguments->debug_print = true;
g_args.debug_print = true;
break;
case 'i':
arguments->isDumpIn = true;
g_args.isDumpIn = true;
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
errorPrint("Invalid path %s\n", arg);
return -1;
}
tstrncpy(arguments->inpath, full_path.we_wordv[0], TSDB_FILENAME_LEN);
tstrncpy(g_args.inpath, full_path.we_wordv[0],
MAX_FILE_NAME_LEN);
wordfree(&full_path);
break;
case 'r':
arguments->resultFile = arg;
g_args.resultFile = arg;
break;
case 'c':
if (wordexp(arg, &full_path, 0) != 0) {
fprintf(stderr, "Invalid path %s\n", arg);
errorPrint("Invalid path %s\n", arg);
return -1;
}
tstrncpy(configDir, full_path.we_wordv[0], TSDB_FILENAME_LEN);
tstrncpy(configDir, full_path.we_wordv[0], MAX_FILE_NAME_LEN);
wordfree(&full_path);
break;
case 'e':
arguments->encode = arg;
g_args.encode = arg;
break;
// dump unit option
case 'A':
arguments->all_databases = true;
g_args.all_databases = true;
break;
case 'D':
arguments->databases = true;
g_args.databases = true;
break;
// dump format option
case 's':
arguments->schemaonly = true;
g_args.schemaonly = true;
break;
case 'N':
arguments->with_property = false;
g_args.with_property = false;
break;
case 'V':
g_args.avro = true;
break;
case 'S':
// parse time here.
arguments->start_time = atol(arg);
g_args.start_time = atol(arg);
break;
case 'E':
arguments->end_time = atol(arg);
g_args.end_time = atol(arg);
break;
case 'B':
arguments->data_batch = atoi(arg);
if (arguments->data_batch >= INT16_MAX) {
arguments->data_batch = INT16_MAX - 1;
g_args.data_batch = atoi(arg);
if (g_args.data_batch > MAX_RECORDS_PER_REQ) {
g_args.data_batch = MAX_RECORDS_PER_REQ;
}
break;
case 'L':
......@@ -376,21 +464,21 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
} else if (len < TSDB_MAX_SQL_LEN) {
len = TSDB_MAX_SQL_LEN;
}
arguments->max_sql_len = len;
g_args.max_sql_len = len;
break;
}
case 't':
arguments->table_batch = atoi(arg);
g_args.table_batch = atoi(arg);
break;
case 'T':
arguments->thread_num = atoi(arg);
g_args.thread_num = atoi(arg);
break;
case OPT_ABORT:
arguments->abort = 1;
g_args.abort = 1;
break;
case ARGP_KEY_ARG:
arguments->arg_list = &state->argv[state->next - 1];
arguments->arg_list_len = state->argc - state->next + 1;
g_args.arg_list = &state->argv[state->next - 1];
g_args.arg_list_len = state->argc - state->next + 1;
state->next = state->argc;
break;
......@@ -400,65 +488,6 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return 0;
}
/* Our argp parser. */
static struct argp argp = {options, parse_opt, args_doc, doc};
static resultStatistics g_resultStatistics = {0};
static FILE *g_fpOfResult = NULL;
static int g_numOfCores = 1;
static int taosDumpOut(struct arguments *arguments);
static int taosDumpIn(struct arguments *arguments);
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp);
static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon);
static int32_t taosDumpStable(char *table, FILE *fp, TAOS* taosCon, char* dbName);
static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols, FILE *fp, char* dbName);
static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols, FILE *fp, char* dbName);
static int32_t taosDumpTable(char *table, char *metric, struct arguments *arguments, FILE *fp, TAOS* taosCon, char* dbName);
static int taosDumpTableData(FILE *fp, char *tbname, struct arguments *arguments, TAOS* taosCon, char* dbName);
static int taosCheckParam(struct arguments *arguments);
static void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args, int32_t numOfThread, char *dbName);
struct arguments g_args = {
// connection option
NULL,
"root",
#ifdef _TD_POWER_
"powerdb",
#else
"taosdata",
#endif
0,
"",
0,
// outpath and inpath
"",
"",
"./dump_result.txt",
NULL,
// dump unit option
false,
false,
// dump format option
false, // schemeonly
true, // with_property
0,
INT64_MAX,
1,
TSDB_MAX_SQL_LEN,
1,
false,
// other options
5,
0,
NULL,
0,
false,
false, // debug_print
false, // verbose_print
false // performance_print
};
static int queryDbImpl(TAOS *taos, char *command) {
int i;
TAOS_RES *res = NULL;
......@@ -478,7 +507,7 @@ static int queryDbImpl(TAOS *taos, char *command) {
}
if (code != 0) {
fprintf(stderr, "Failed to run <%s>, reason: %s\n", command, taos_errstr(res));
errorPrint("Failed to run <%s>, reason: %s\n", command, taos_errstr(res));
taos_free_result(res);
//taos_close(taos);
return -1;
......@@ -488,19 +517,78 @@ static int queryDbImpl(TAOS *taos, char *command) {
return 0;
}
static void parse_args(int argc, char *argv[], SArguments *arguments) {
static void parse_precision_first(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-C") == 0) {
if (NULL == argv[i+1]) {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
char *tmp = strdup(argv[i+1]);
if (tmp == NULL) {
errorPrint("%s() LN%d, strdup() cannot allocate memory\n",
__func__, __LINE__);
exit(-1);
}
if ((0 != strncasecmp(tmp, "ms", strlen("ms")))
&& (0 != strncasecmp(tmp, "us", strlen("us")))
#if TSDB_SUPPORT_NANOSECOND == 1
&& (0 != strncasecmp(tmp, "ns", strlen("ns")))
#endif
) {
//
errorPrint("input precision: %s is invalid value\n", tmp);
free(tmp);
exit(-1);
}
strncpy(g_args.precision, tmp, strlen(tmp));
free(tmp);
}
}
}
static void parse_timestamp(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if ((strcmp(argv[i], "-S") == 0)
|| (strcmp(argv[i], "-E") == 0)) {
if (argv[i+1]) {
char *tmp = strdup(argv[++i]);
if (NULL == argv[i+1]) {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
char *tmp = strdup(argv[i+1]);
if (NULL == tmp) {
errorPrint("%s() LN%d, strdup() cannot allocate memory\n",
__func__, __LINE__);
exit(-1);
}
if (tmp) {
int64_t tmpEpoch;
if (strchr(tmp, ':') && strchr(tmp, '-')) {
int32_t timePrec;
if (0 == strncasecmp(arguments->precision,
"ms", strlen("ms"))) {
timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(arguments->precision,
"us", strlen("us"))) {
timePrec = TSDB_TIME_PRECISION_MICRO;
#if TSDB_SUPPORT_NANOSECOND == 1
} else if (0 == strncasecmp(arguments->precision,
"ns", strlen("ns"))) {
timePrec = TSDB_TIME_PRECISION_NANO;
#endif
} else {
errorPrint("Invalid time precision: %s",
arguments->precision);
free(tmp);
return;
}
if (TSDB_CODE_SUCCESS != taosParseTime(
tmp, &tmpEpoch, strlen(tmp), TSDB_TIME_PRECISION_MILLI, 0)) {
fprintf(stderr, "Input end time error!\n");
tmp, &tmpEpoch, strlen(tmp),
timePrec, 0)) {
errorPrint("Input %s, end time error!\n", tmp);
free(tmp);
return;
}
......@@ -511,37 +599,29 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
sprintf(argv[i], "%"PRId64"", tmpEpoch);
debugPrint("%s() LN%d, tmp is: %s, argv[%d]: %s\n",
__func__, __LINE__, tmp, i, argv[i]);
free(tmp);
} else {
errorPrint("%s() LN%d, strdup() cannot allocate memory\n", __func__, __LINE__);
exit(-1);
}
} else {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
} else if (strcmp(argv[i], "-g") == 0) {
arguments->debug_print = true;
}
}
}
int main(int argc, char *argv[]) {
int ret = 0;
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
if (argc > 2)
parse_args(argc, argv, &g_args);
if (argc > 2) {
parse_precision_first(argc, argv, &g_args);
parse_timestamp(argc, argv, &g_args);
}
argp_parse(&argp, argc, argv, 0, 0, &g_args);
if (g_args.abort) {
#ifndef _ALPINE
#ifndef _ALPINE
error(10, 0, "ABORTED");
#else
#else
abort();
#endif
#endif
}
printf("====== arguments config ======\n");
......@@ -556,12 +636,14 @@ int main(int argc, char *argv[]) {
printf("inpath: %s\n", g_args.inpath);
printf("resultFile: %s\n", g_args.resultFile);
printf("encode: %s\n", g_args.encode);
printf("all_databases: %d\n", g_args.all_databases);
printf("all_databases: %s\n", g_args.all_databases?"true":"false");
printf("databases: %d\n", g_args.databases);
printf("schemaonly: %d\n", g_args.schemaonly);
printf("with_property: %d\n", g_args.with_property);
printf("schemaonly: %s\n", g_args.schemaonly?"true":"false");
printf("with_property: %s\n", g_args.with_property?"true":"false");
printf("avro format: %s\n", g_args.avro?"true":"false");
printf("start_time: %" PRId64 "\n", g_args.start_time);
printf("end_time: %" PRId64 "\n", g_args.end_time);
printf("precision: %s\n", g_args.precision);
printf("data_batch: %d\n", g_args.data_batch);
printf("max_sql_len: %d\n", g_args.max_sql_len);
printf("table_batch: %d\n", g_args.table_batch);
......@@ -588,8 +670,8 @@ int main(int argc, char *argv[]) {
g_fpOfResult = fopen(g_args.resultFile, "a");
if (NULL == g_fpOfResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_args.resultFile);
return 1;
errorPrint("Failed to open %s for save result\n", g_args.resultFile);
exit(-1);
};
fprintf(g_fpOfResult, "#############################################################################\n");
......@@ -609,8 +691,10 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "databases: %d\n", g_args.databases);
fprintf(g_fpOfResult, "schemaonly: %s\n", g_args.schemaonly?"true":"false");
fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false");
fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false");
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time);
fprintf(g_fpOfResult, "precision: %s\n", g_args.precision);
fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len);
fprintf(g_fpOfResult, "table_batch: %d\n", g_args.table_batch);
......@@ -632,44 +716,47 @@ int main(int argc, char *argv[]) {
if (g_args.isDumpIn) {
fprintf(g_fpOfResult, "============================== DUMP IN ============================== \n");
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn(&g_args) < 0) {
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return -1;
if (taosDumpIn() < 0) {
ret = -1;
}
} else {
fprintf(g_fpOfResult, "============================== DUMP OUT ============================== \n");
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut(&g_args) < 0) {
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return -1;
}
if (taosDumpOut() < 0) {
ret = -1;
} else {
fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n");
fprintf(g_fpOfResult, "# total database count: %d\n", g_resultStatistics.totalDatabasesOfDumpOut);
fprintf(g_fpOfResult, "# total super table count: %d\n", g_resultStatistics.totalSuperTblsOfDumpOut);
fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n", g_resultStatistics.totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# total row count: %"PRId64"\n", g_resultStatistics.totalRowsOfDumpOut);
fprintf(g_fpOfResult, "# total database count: %d\n",
g_resultStatistics.totalDatabasesOfDumpOut);
fprintf(g_fpOfResult, "# total super table count: %d\n",
g_resultStatistics.totalSuperTblsOfDumpOut);
fprintf(g_fpOfResult, "# total child table count: %"PRId64"\n",
g_resultStatistics.totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# total row count: %"PRId64"\n",
g_resultStatistics.totalRowsOfDumpOut);
}
}
fprintf(g_fpOfResult, "\n");
fclose(g_fpOfResult);
return 0;
return ret;
}
static void taosFreeDbInfos() {
if (dbInfos == NULL) return;
for (int i = 0; i < 128; i++) tfree(dbInfos[i]);
tfree(dbInfos);
if (g_dbInfos == NULL) return;
for (int i = 0; i < 128; i++) tfree(g_dbInfos[i]);
tfree(g_dbInfos);
}
// check table is normal table or super table
static int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) {
static int taosGetTableRecordInfo(
char *table, STableRecordInfo *pTableRecordInfo, TAOS *taosCon) {
TAOS_ROW row = NULL;
bool isSet = false;
TAOS_RES *result = NULL;
......@@ -678,7 +765,8 @@ static int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInf
char* tempCommand = (char *)malloc(COMMAND_SIZE);
if (tempCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
......@@ -688,7 +776,8 @@ static int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInf
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tempCommand);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, tempCommand);
free(tempCommand);
taos_free_result(result);
return -1;
......@@ -699,9 +788,11 @@ static int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInf
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = false;
strncpy(pTableRecordInfo->tableRecord.name, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
strncpy(pTableRecordInfo->tableRecord.name,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes);
strncpy(pTableRecordInfo->tableRecord.metric, (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
strncpy(pTableRecordInfo->tableRecord.metric,
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes);
break;
}
......@@ -730,7 +821,8 @@ static int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInf
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isMetric = true;
tstrncpy(pTableRecordInfo->tableRecord.metric, table, TSDB_TABLE_NAME_LEN);
tstrncpy(pTableRecordInfo->tableRecord.metric, table,
TSDB_TABLE_NAME_LEN);
break;
}
......@@ -753,9 +845,11 @@ static int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter,
STableRecord tableRecord;
if (-1 == *fd) {
*fd = open(".tables.tmp.0", O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
*fd = open(".tables.tmp.0",
O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (*fd == -1) {
fprintf(stderr, "failed to open temp file: .tables.tmp.0\n");
errorPrint("%s() LN%d, failed to open temp file: .tables.tmp.0\n",
__func__, __LINE__);
return -1;
}
}
......@@ -768,16 +862,16 @@ static int32_t taosSaveAllNormalTableToTempFile(TAOS *taosCon, char*meter,
return 0;
}
static int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric,
struct arguments *arguments, int32_t* totalNumOfThread) {
static int32_t taosSaveTableOfMetricToTempFile(
TAOS *taosCon, char* metric,
int32_t* totalNumOfThread) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
char* tmpCommand = (char *)malloc(COMMAND_SIZE);
if (tmpCommand == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
return -1;
}
......@@ -786,19 +880,21 @@ static int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric,
TAOS_RES *res = taos_query(taosCon, tmpCommand);
int32_t code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "failed to run command %s\n", tmpCommand);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, tmpCommand);
free(tmpCommand);
taos_free_result(res);
return -1;
}
free(tmpCommand);
char tmpBuf[TSDB_FILENAME_LEN + 1];
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
char tmpBuf[MAX_FILE_NAME_LEN];
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".select-tbname.tmp");
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
taos_free_result(res);
return -1;
}
......@@ -818,21 +914,22 @@ static int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric,
taos_free_result(res);
lseek(fd, 0, SEEK_SET);
int maxThreads = arguments->thread_num;
int maxThreads = g_args.thread_num;
int tableOfPerFile ;
if (numOfTable <= arguments->thread_num) {
if (numOfTable <= g_args.thread_num) {
tableOfPerFile = 1;
maxThreads = numOfTable;
} else {
tableOfPerFile = numOfTable / arguments->thread_num;
if (0 != numOfTable % arguments->thread_num) {
tableOfPerFile = numOfTable / g_args.thread_num;
if (0 != numOfTable % g_args.thread_num) {
tableOfPerFile += 1;
}
}
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
if (NULL == tblBuf){
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
errorPrint("%s() LN%d, failed to calloc %" PRIzu "\n",
__func__, __LINE__, tableOfPerFile * sizeof(STableRecord));
close(fd);
return -1;
}
......@@ -840,11 +937,12 @@ static int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric,
int32_t numOfThread = *totalNumOfThread;
int subFd = -1;
for (; numOfThread < maxThreads; numOfThread++) {
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (subFd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
......@@ -880,7 +978,7 @@ static int32_t taosSaveTableOfMetricToTempFile(TAOS *taosCon, char* metric,
return 0;
}
static int taosDumpOut(struct arguments *arguments) {
static int taosDumpOut() {
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = NULL;
......@@ -890,40 +988,43 @@ static int taosDumpOut(struct arguments *arguments) {
int32_t count = 0;
STableRecordInfo tableRecordInfo;
char tmpBuf[TSDB_FILENAME_LEN+9] = {0};
if (arguments->outpath[0] != 0) {
sprintf(tmpBuf, "%s/dbs.sql", arguments->outpath);
char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/dbs.sql", g_args.outpath);
} else {
sprintf(tmpBuf, "dbs.sql");
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
return -1;
}
dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *));
if (dbInfos == NULL) {
fprintf(stderr, "failed to allocate memory\n");
g_dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *));
if (g_dbInfos == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
goto _exit_failure;
}
command = (char *)malloc(COMMAND_SIZE);
if (command == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate memory\n", __func__, __LINE__);
goto _exit_failure;
}
/* Connect to server */
taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port);
taos = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (taos == NULL) {
fprintf(stderr, "failed to connect to TDengine server\n");
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
goto _exit_failure;
}
/* --------------------------------- Main Code -------------------------------- */
/* if (arguments->databases || arguments->all_databases) { // dump part of databases or all databases */
/* if (g_args.databases || g_args.all_databases) { // dump part of databases or all databases */
/* */
taosDumpCharset(fp);
......@@ -943,19 +1044,21 @@ static int taosDumpOut(struct arguments *arguments) {
// sys database name : 'log', but subsequent version changed to 'log'
if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
&& (!arguments->allow_sys)) {
&& (!g_args.allow_sys)) {
continue;
}
if (arguments->databases) { // input multi dbs
for (int i = 0; arguments->arg_list[i]; i++) {
if (strncasecmp(arguments->arg_list[i], (char *)row[TSDB_SHOW_DB_NAME_INDEX],
if (g_args.databases) { // input multi dbs
for (int i = 0; g_args.arg_list[i]; i++) {
if (strncasecmp(g_args.arg_list[i],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
}
continue;
} else if (!arguments->all_databases) { // only input one db
if (strncasecmp(arguments->arg_list[0], (char *)row[TSDB_SHOW_DB_NAME_INDEX],
} else if (!g_args.all_databases) { // only input one db
if (strncasecmp(g_args.arg_list[0],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
else
......@@ -964,103 +1067,107 @@ static int taosDumpOut(struct arguments *arguments) {
_dump_db_point:
dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo));
if (dbInfos[count] == NULL) {
g_dbInfos[count] = (SDbInfo *)calloc(1, sizeof(SDbInfo));
if (g_dbInfos[count] == NULL) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, (uint64_t)(sizeof(SDbInfo)));
__func__, __LINE__, (uint64_t)sizeof(SDbInfo));
goto _exit_failure;
}
strncpy(dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX],
strncpy(g_dbInfos[count]->name, (char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes);
if (arguments->with_property) {
dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
strncpy(dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX],
if (g_args.with_property) {
g_dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
g_dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
g_dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
g_dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
g_dbInfos[count]->days = *((int16_t *)row[TSDB_SHOW_DB_DAYS_INDEX]);
strncpy(g_dbInfos[count]->keeplist, (char *)row[TSDB_SHOW_DB_KEEP_INDEX],
fields[TSDB_SHOW_DB_KEEP_INDEX].bytes);
//dbInfos[count]->daysToKeep = *((int16_t *)row[TSDB_SHOW_DB_KEEP_INDEX]);
//dbInfos[count]->daysToKeep1;
//dbInfos[count]->daysToKeep2;
dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]);
dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]);
dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]);
dbInfos[count]->maxrows = *((int32_t *)row[TSDB_SHOW_DB_MAXROWS_INDEX]);
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
strncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
//g_dbInfos[count]->daysToKeep = *((int16_t *)row[TSDB_SHOW_DB_KEEP_INDEX]);
//g_dbInfos[count]->daysToKeep1;
//g_dbInfos[count]->daysToKeep2;
g_dbInfos[count]->cache = *((int32_t *)row[TSDB_SHOW_DB_CACHE_INDEX]);
g_dbInfos[count]->blocks = *((int32_t *)row[TSDB_SHOW_DB_BLOCKS_INDEX]);
g_dbInfos[count]->minrows = *((int32_t *)row[TSDB_SHOW_DB_MINROWS_INDEX]);
g_dbInfos[count]->maxrows = *((int32_t *)row[TSDB_SHOW_DB_MAXROWS_INDEX]);
g_dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
g_dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
g_dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
g_dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
strncpy(g_dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
//dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
//g_dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
g_dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
}
count++;
if (arguments->databases) {
if (count > arguments->arg_list_len) break;
if (g_args.databases) {
if (count > g_args.arg_list_len) break;
} else if (!arguments->all_databases) {
} else if (!g_args.all_databases) {
if (count >= 1) break;
}
}
if (count == 0) {
fprintf(stderr, "No databases valid to dump\n");
errorPrint("%d databases valid to dump\n", count);
goto _exit_failure;
}
if (arguments->databases || arguments->all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases
if (g_args.databases || g_args.all_databases) { // case: taosdump --databases dbx dby ... OR taosdump --all-databases
for (int i = 0; i < count; i++) {
taosDumpDb(dbInfos[i], arguments, fp, taos);
taosDumpDb(g_dbInfos[i], fp, taos);
}
} else {
if (arguments->arg_list_len == 1) { // case: taosdump <db>
taosDumpDb(dbInfos[0], arguments, fp, taos);
if (g_args.arg_list_len == 1) { // case: taosdump <db>
taosDumpDb(g_dbInfos[0], fp, taos);
} else { // case: taosdump <db> tablex tabley ...
taosDumpCreateDbClause(dbInfos[0], arguments->with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n", dbInfos[0]->name);
taosDumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n",
g_dbInfos[0]->name);
g_resultStatistics.totalDatabasesOfDumpOut++;
sprintf(command, "use %s", dbInfos[0]->name);
sprintf(command, "use %s", g_dbInfos[0]->name);
result = taos_query(taos, command);
int32_t code = taos_errno(result);
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "invalid database %s\n", dbInfos[0]->name);
errorPrint("invalid database %s\n", g_dbInfos[0]->name);
goto _exit_failure;
}
fprintf(fp, "USE %s;\n\n", dbInfos[0]->name);
fprintf(fp, "USE %s;\n\n", g_dbInfos[0]->name);
int32_t totalNumOfThread = 1; // 0: all normal talbe into .tables.tmp.0
int normalTblFd = -1;
int32_t retCode;
int superTblCnt = 0 ;
for (int i = 1; arguments->arg_list[i]; i++) {
if (taosGetTableRecordInfo(arguments->arg_list[i], &tableRecordInfo, taos) < 0) {
fprintf(stderr, "input the invalide table %s\n", arguments->arg_list[i]);
for (int i = 1; g_args.arg_list[i]; i++) {
if (taosGetTableRecordInfo(g_args.arg_list[i],
&tableRecordInfo, taos) < 0) {
errorPrint("input the invalide table %s\n",
g_args.arg_list[i]);
continue;
}
if (tableRecordInfo.isMetric) { // dump all table of this metric
int ret = taosDumpStable(
tableRecordInfo.tableRecord.metric,
fp, taos, dbInfos[0]->name);
fp, taos, g_dbInfos[0]->name);
if (0 == ret) {
superTblCnt++;
}
retCode = taosSaveTableOfMetricToTempFile(
taos, tableRecordInfo.tableRecord.metric,
arguments, &totalNumOfThread);
&totalNumOfThread);
} else {
if (tableRecordInfo.tableRecord.metric[0] != '\0') { // dump this sub table and it's metric
int ret = taosDumpStable(tableRecordInfo.tableRecord.metric,
fp, taos, dbInfos[0]->name);
int ret = taosDumpStable(
tableRecordInfo.tableRecord.metric,
fp, taos, g_dbInfos[0]->name);
if (0 == ret) {
superTblCnt++;
}
......@@ -1079,7 +1186,8 @@ _dump_db_point:
}
// TODO: save dump super table <superTblCnt> into result_output.txt
fprintf(g_fpOfResult, "# super table counter: %d\n", superTblCnt);
fprintf(g_fpOfResult, "# super table counter: %d\n",
superTblCnt);
g_resultStatistics.totalSuperTblsOfDumpOut += superTblCnt;
if (-1 != normalTblFd){
......@@ -1087,10 +1195,11 @@ _dump_db_point:
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(taos, arguments, totalNumOfThread, dbInfos[0]->name);
taosStartDumpOutWorkThreads(totalNumOfThread,
g_dbInfos[0]->name);
char tmpFileName[TSDB_FILENAME_LEN + 1];
_clean_tmp_file:
char tmpFileName[MAX_FILE_NAME_LEN];
_clean_tmp_file:
for (int loopCnt = 0; loopCnt < totalNumOfThread; loopCnt++) {
sprintf(tmpFileName, ".tables.tmp.%d", loopCnt);
remove(tmpFileName);
......@@ -1104,7 +1213,7 @@ _dump_db_point:
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows);
fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return 0;
_exit_failure:
......@@ -1113,7 +1222,7 @@ _exit_failure:
taos_free_result(result);
tfree(command);
taosFreeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", totalDumpOutRows);
errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return -1;
}
......@@ -1140,12 +1249,16 @@ static int taosGetTableDes(
tstrncpy(tableDes->name, table, TSDB_TABLE_NAME_LEN);
while ((row = taos_fetch_row(res)) != NULL) {
strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
strncpy(tableDes->cols[count].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
strncpy(tableDes->cols[count].type,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
tableDes->cols[count].length =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
count++;
......@@ -1163,7 +1276,8 @@ static int taosGetTableDes(
if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue;
sprintf(sqlstr, "select %s from %s.%s", tableDes->cols[i].field, dbName, table);
sprintf(sqlstr, "select %s from %s.%s",
tableDes->cols[i].field, dbName, table);
res = taos_query(taosCon, sqlstr);
code = taos_errno(res);
......@@ -1196,7 +1310,8 @@ static int taosGetTableDes(
//int32_t* length = taos_fetch_lengths(tmpResult);
switch (fields[0].type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(tableDes->cols[i].note, "%d", ((((int32_t)(*((char *)row[0]))) == 1) ? 1 : 0));
sprintf(tableDes->cols[i].note, "%d",
((((int32_t)(*((char *)row[0]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(tableDes->cols[i].note, "%d", *((int8_t *)row[0]));
......@@ -1216,7 +1331,8 @@ static int taosGetTableDes(
case TSDB_DATA_TYPE_DOUBLE:
sprintf(tableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[0]));
break;
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:
{
memset(tableDes->cols[i].note, 0, sizeof(tableDes->cols[i].note));
tableDes->cols[i].note[0] = '\'';
char tbuf[COL_NOTE_LEN];
......@@ -1225,7 +1341,8 @@ static int taosGetTableDes(
*(pstr++) = '\'';
break;
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
{
memset(tableDes->cols[i].note, 0, sizeof(tableDes->cols[i].note));
char tbuf[COL_NOTE_LEN-2]; // need reserve 2 bytes for ' '
convertNCharToReadable((char *)row[0], length[0], tbuf, COL_NOTE_LEN);
......@@ -1234,8 +1351,8 @@ static int taosGetTableDes(
}
case TSDB_DATA_TYPE_TIMESTAMP:
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
#if 0
if (!arguments->mysqlFlag) {
#if 0
if (!g_args.mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[0]);
} else {
char buf[64] = "\0";
......@@ -1245,7 +1362,7 @@ static int taosGetTableDes(
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
#endif
break;
default:
break;
......@@ -1258,8 +1375,15 @@ static int taosGetTableDes(
return count;
}
static int convertSchemaToAvroSchema(STableDef *tableDes, char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
return 0;
}
static int32_t taosDumpTable(
char *table, char *metric, struct arguments *arguments,
char *table, char *metric,
FILE *fp, TAOS* taosCon, char* dbName) {
int count = 0;
......@@ -1302,12 +1426,24 @@ static int32_t taosDumpTable(
taosDumpCreateTableClause(tableDes, count, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
convertSchemaToAvroSchema(tableDes, &jsonAvroSchema);
}
free(tableDes);
return taosDumpTableData(fp, table, arguments, taosCon, dbName);
int32_t ret = 0;
if (!g_args.schemaonly) {
ret = taosDumpTableData(fp, table, taosCon, dbName,
jsonAvroSchema);
}
return ret;
}
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
static void taosDumpCreateDbClause(
SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
char *pstr = sqlstr;
......@@ -1315,8 +1451,12 @@ static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *f
if (isDumpProperty) {
pstr += sprintf(pstr,
"REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d",
dbInfo->replica, dbInfo->quorum, dbInfo->days, dbInfo->keeplist, dbInfo->cache,
dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows, dbInfo->fsync, dbInfo->cachelast,
dbInfo->replica, dbInfo->quorum, dbInfo->days,
dbInfo->keeplist,
dbInfo->cache,
dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows,
dbInfo->fsync,
dbInfo->cachelast,
dbInfo->comp, dbInfo->precision, dbInfo->update);
}
......@@ -1330,31 +1470,35 @@ static void* taosDumpOutWorkThreadFp(void *arg)
STableRecord tableRecord;
int fd;
char tmpBuf[TSDB_FILENAME_LEN*4] = {0};
char tmpBuf[4096] = {0};
sprintf(tmpBuf, ".tables.tmp.%d", pThread->threadIndex);
fd = open(tmpBuf, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "taosDumpTableFp() failed to open temp file: %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
return NULL;
}
FILE *fp = NULL;
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
memset(tmpBuf, 0, 4096);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d.sql", g_args.outpath, pThread->dbName, pThread->threadIndex);
sprintf(tmpBuf, "%s/%s.tables.%d.sql",
g_args.outpath, pThread->dbName, pThread->threadIndex);
} else {
sprintf(tmpBuf, "%s.tables.%d.sql", pThread->dbName, pThread->threadIndex);
sprintf(tmpBuf, "%s.tables.%d.sql",
pThread->dbName, pThread->threadIndex);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
close(fd);
return NULL;
}
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
memset(tmpBuf, 0, 4096);
sprintf(tmpBuf, "use %s", pThread->dbName);
TAOS_RES* tmpResult = taos_query(pThread->taosCon, tmpBuf);
......@@ -1368,8 +1512,10 @@ static void* taosDumpOutWorkThreadFp(void *arg)
return NULL;
}
#if 0
int fileNameIndex = 1;
int tablesInOneFile = 0;
#endif
int64_t lastRowsPrint = 5000000;
fprintf(fp, "USE %s;\n\n", pThread->dbName);
while (1) {
......@@ -1377,7 +1523,7 @@ static void* taosDumpOutWorkThreadFp(void *arg)
if (readLen <= 0) break;
int ret = taosDumpTable(
tableRecord.name, tableRecord.metric, &g_args,
tableRecord.name, tableRecord.metric,
fp, pThread->taosCon, pThread->dbName);
if (ret >= 0) {
// TODO: sum table count and table rows by self
......@@ -1390,12 +1536,13 @@ static void* taosDumpOutWorkThreadFp(void *arg)
lastRowsPrint += 5000000;
}
#if 0
tablesInOneFile++;
if (tablesInOneFile >= g_args.table_batch) {
fclose(fp);
tablesInOneFile = 0;
memset(tmpBuf, 0, TSDB_FILENAME_LEN + 128);
memset(tmpBuf, 0, 4096);
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.tables.%d-%d.sql",
g_args.outpath, pThread->dbName,
......@@ -1408,12 +1555,14 @@ static void* taosDumpOutWorkThreadFp(void *arg)
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
fprintf(stderr, "failed to open file %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
close(fd);
taos_free_result(tmpResult);
return NULL;
}
}
#endif
}
}
......@@ -1424,12 +1573,18 @@ static void* taosDumpOutWorkThreadFp(void *arg)
return NULL;
}
static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args,
int32_t numOfThread, char *dbName)
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName)
{
pthread_attr_t thattr;
SThreadParaObj *threadObj =
(SThreadParaObj *)calloc(numOfThread, sizeof(SThreadParaObj));
if (threadObj == NULL) {
errorPrint("%s() LN%d, memory allocation failed!\n",
__func__, __LINE__);
return;
}
for (int t = 0; t < numOfThread; ++t) {
SThreadParaObj *pThread = threadObj + t;
pThread->rowsOfDumpOut = 0;
......@@ -1437,14 +1592,20 @@ static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args,
pThread->threadIndex = t;
pThread->totalThreads = numOfThread;
tstrncpy(pThread->dbName, dbName, TSDB_DB_NAME_LEN);
pThread->taosCon = taosCon;
pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (pThread->taosCon == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
return;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, taosDumpOutWorkThreadFp,
if (pthread_create(&(pThread->threadID), &thattr,
taosDumpOutWorkThreadFp,
(void*)pThread) != 0) {
errorPrint("thread:%d failed to start\n", pThread->threadIndex);
errorPrint("%s() LN%d, thread:%d failed to start\n",
__func__, __LINE__, pThread->threadIndex);
exit(-1);
}
}
......@@ -1461,8 +1622,10 @@ static void taosStartDumpOutWorkThreads(void* taosCon, struct arguments* args,
totalRowsOfDumpOut += threadObj[t].rowsOfDumpOut;
}
fprintf(g_fpOfResult, "# child table counter: %"PRId64"\n", totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# row counter: %"PRId64"\n", totalRowsOfDumpOut);
fprintf(g_fpOfResult, "# child table counter: %"PRId64"\n",
totalChildTblsOfDumpOut);
fprintf(g_fpOfResult, "# row counter: %"PRId64"\n",
totalRowsOfDumpOut);
g_resultStatistics.totalChildTblsOfDumpOut += totalChildTblsOfDumpOut;
g_resultStatistics.totalRowsOfDumpOut += totalRowsOfDumpOut;
free(threadObj);
......@@ -1506,19 +1669,21 @@ static int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE
TAOS_RES* res = taos_query(taosCon, sqlstr);
int32_t code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "failed to run command <%s>, reason: %s\n", sqlstr, taos_errstr(res));
errorPrint("%s() LN%d, failed to run command <%s>, reason: %s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
exit(-1);
}
TAOS_FIELD *fields = taos_fetch_fields(res);
char tmpFileName[TSDB_FILENAME_LEN + 1];
memset(tmpFileName, 0, TSDB_FILENAME_LEN);
char tmpFileName[MAX_FILE_NAME_LEN];
memset(tmpFileName, 0, MAX_FILE_NAME_LEN);
sprintf(tmpFileName, ".stables.tmp");
fd = open(tmpFileName, O_RDWR | O_CREAT, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpFileName);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpFileName);
taos_free_result(res);
(void)remove(".stables.tmp");
exit(-1);
......@@ -1556,14 +1721,15 @@ static int32_t taosDumpCreateSuperTableClause(TAOS* taosCon, char* dbName, FILE
}
static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TAOS *taosCon) {
static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon) {
TAOS_ROW row;
int fd = -1;
STableRecord tableRecord;
taosDumpCreateDbClause(dbInfo, arguments->with_property, fp);
taosDumpCreateDbClause(dbInfo, g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n", dbInfo->name);
fprintf(g_fpOfResult, "\n#### database: %s\n",
dbInfo->name);
g_resultStatistics.totalDatabasesOfDumpOut++;
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
......@@ -1577,17 +1743,19 @@ static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TA
TAOS_RES* res = taos_query(taosCon, sqlstr);
int code = taos_errno(res);
if (code != 0) {
fprintf(stderr, "failed to run command <%s>, reason:%s\n", sqlstr, taos_errstr(res));
errorPrint("%s() LN%d, failed to run command <%s>, reason:%s\n",
__func__, __LINE__, sqlstr, taos_errstr(res));
taos_free_result(res);
return -1;
}
char tmpBuf[TSDB_FILENAME_LEN + 1];
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
char tmpBuf[MAX_FILE_NAME_LEN];
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".show-tables.tmp");
fd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (fd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
taos_free_result(res);
return -1;
}
......@@ -1623,7 +1791,8 @@ static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TA
char* tblBuf = (char*)calloc(1, tableOfPerFile * sizeof(STableRecord));
if (NULL == tblBuf){
fprintf(stderr, "failed to calloc %" PRIzu "\n", tableOfPerFile * sizeof(STableRecord));
errorPrint("failed to calloc %" PRIzu "\n",
tableOfPerFile * sizeof(STableRecord));
close(fd);
return -1;
}
......@@ -1631,11 +1800,12 @@ static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TA
int32_t numOfThread = 0;
int subFd = -1;
for (numOfThread = 0; numOfThread < maxThreads; numOfThread++) {
memset(tmpBuf, 0, TSDB_FILENAME_LEN);
memset(tmpBuf, 0, MAX_FILE_NAME_LEN);
sprintf(tmpBuf, ".tables.tmp.%d", numOfThread);
subFd = open(tmpBuf, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH);
if (subFd == -1) {
fprintf(stderr, "failed to open temp file: %s\n", tmpBuf);
errorPrint("%s() LN%d, failed to open temp file: %s\n",
__func__, __LINE__, tmpBuf);
for (int32_t loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
......@@ -1665,10 +1835,8 @@ static int taosDumpDb(SDbInfo *dbInfo, struct arguments *arguments, FILE *fp, TA
fd = -1;
}
taos_free_result(res);
// start multi threads to dumpout
taosStartDumpOutWorkThreads(taosCon, arguments, numOfThread, dbInfo->name);
taosStartDumpOutWorkThreads(numOfThread, dbInfo->name);
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
......@@ -1735,14 +1903,16 @@ static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate %d memory\n",
__func__, __LINE__, COMMAND_SIZE);
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf, "CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, metric);
for (; counter < numOfCols; counter++) {
......@@ -1784,61 +1954,56 @@ static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
free(tmpBuf);
}
static int taosDumpTableData(FILE *fp, char *tbname,
struct arguments *arguments, TAOS* taosCon, char* dbName) {
int64_t lastRowsPrint = 5000000;
int64_t totalRows = 0;
int count = 0;
char *pstr = NULL;
TAOS_ROW row = NULL;
int numFields = 0;
static int writeSchemaToAvro(char *jsonAvroSchema)
{
errorPrint("%s() LN%d, TODO: implement write schema to avro",
__func__, __LINE__);
return 0;
}
if (arguments->schemaonly) {
static int64_t writeResultToAvro(TAOS_RES *res)
{
errorPrint("%s() LN%d, TODO: implementation need\n", __func__, __LINE__);
return 0;
}
}
static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbName)
{
int64_t totalRows = 0;
int32_t sql_buf_len = arguments->max_sql_len;
int32_t sql_buf_len = g_args.max_sql_len;
char* tmpBuffer = (char *)calloc(1, sql_buf_len + 128);
if (tmpBuffer == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("failed to allocate %d memory\n", sql_buf_len + 128);
return -1;
}
pstr = tmpBuffer;
char sqlstr[1024] = {0};
sprintf(sqlstr,
"select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;",
dbName, tbname, arguments->start_time, arguments->end_time);
char *pstr = tmpBuffer;
TAOS_RES* tmpResult = taos_query(taosCon, sqlstr);
int32_t code = taos_errno(tmpResult);
if (code != 0) {
fprintf(stderr, "failed to run command %s, reason: %s\n", sqlstr, taos_errstr(tmpResult));
free(tmpBuffer);
taos_free_result(tmpResult);
return -1;
}
TAOS_ROW row = NULL;
int numFields = 0;
int rowFlag = 0;
int64_t lastRowsPrint = 5000000;
int count = 0;
numFields = taos_field_count(tmpResult);
numFields = taos_field_count(res);
assert(numFields > 0);
TAOS_FIELD *fields = taos_fetch_fields(tmpResult);
TAOS_FIELD *fields = taos_fetch_fields(res);
int rowFlag = 0;
int32_t curr_sqlstr_len = 0;
int32_t total_sqlstr_len = 0;
count = 0;
while ((row = taos_fetch_row(tmpResult)) != NULL) {
pstr = tmpBuffer;
while ((row = taos_fetch_row(res)) != NULL) {
curr_sqlstr_len = 0;
int32_t* length = taos_fetch_lengths(tmpResult); // act len
int32_t* length = taos_fetch_lengths(res); // act len
if (count == 0) {
total_sqlstr_len = 0;
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "INSERT INTO %s.%s VALUES (", dbName, tbname);
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len,
"INSERT INTO %s.%s VALUES (", dbName, tbName);
} else {
if (arguments->mysqlFlag) {
if (g_args.mysqlFlag) {
if (0 == rowFlag) {
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "(");
rowFlag++;
......@@ -1882,7 +2047,8 @@ static int taosDumpTableData(FILE *fp, char *tbname,
case TSDB_DATA_TYPE_DOUBLE:
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%f", GET_DOUBLE_VAL(row[col]));
break;
case TSDB_DATA_TYPE_BINARY: {
case TSDB_DATA_TYPE_BINARY:
{
char tbuf[COMMAND_SIZE] = {0};
//*(pstr++) = '\'';
converStringToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
......@@ -1891,14 +2057,15 @@ static int taosDumpTableData(FILE *fp, char *tbname,
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "\'%s\'", tbuf);
break;
}
case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_NCHAR:
{
char tbuf[COMMAND_SIZE] = {0};
convertNCharToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "\'%s\'", tbuf);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
if (!arguments->mysqlFlag) {
if (!g_args.mysqlFlag) {
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "%" PRId64 "",
*(int64_t *)row[col]);
} else {
......@@ -1916,7 +2083,7 @@ static int taosDumpTableData(FILE *fp, char *tbname,
}
}
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ") ");
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, ")");
totalRows++;
count++;
......@@ -1924,60 +2091,87 @@ static int taosDumpTableData(FILE *fp, char *tbname,
if (totalRows >= lastRowsPrint) {
printf(" %"PRId64 " rows already be dumpout from %s.%s\n",
totalRows, dbName, tbname);
totalRows, dbName, tbName);
lastRowsPrint += 5000000;
}
total_sqlstr_len += curr_sqlstr_len;
if ((count >= arguments->data_batch)
if ((count >= g_args.data_batch)
|| (sql_buf_len - total_sqlstr_len < TSDB_MAX_BYTES_PER_ROW)) {
fprintf(fp, ";\n");
count = 0;
} //else {
//fprintf(fp, "\\\n");
//}
}
}
printf("total_sqlstr_len: %d\n", total_sqlstr_len);
debugPrint("total_sqlstr_len: %d\n", total_sqlstr_len);
fprintf(fp, "\n");
atomic_add_fetch_64(&totalDumpOutRows, totalRows);
taos_free_result(tmpResult);
atomic_add_fetch_64(&g_totalDumpOutRows, totalRows);
free(tmpBuffer);
return 0;
}
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName,
char *jsonAvroSchema) {
int64_t totalRows = 0;
char sqlstr[1024] = {0};
sprintf(sqlstr,
"select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;",
dbName, tbName, g_args.start_time, g_args.end_time);
TAOS_RES* res = taos_query(taosCon, sqlstr);
int32_t code = taos_errno(res);
if (code != 0) {
errorPrint("failed to run command %s, reason: %s\n",
sqlstr, taos_errstr(res));
taos_free_result(res);
return -1;
}
if (g_args.avro) {
writeSchemaToAvro(jsonAvroSchema);
totalRows = writeResultToAvro(res);
} else {
totalRows = writeResultToSql(res, fp, dbName, tbName);
}
taos_free_result(res);
return totalRows;
}
static int taosCheckParam(struct arguments *arguments) {
if (arguments->all_databases && arguments->databases) {
if (g_args.all_databases && g_args.databases) {
fprintf(stderr, "conflict option --all-databases and --databases\n");
return -1;
}
if (arguments->start_time > arguments->end_time) {
if (g_args.start_time > g_args.end_time) {
fprintf(stderr, "start time is larger than end time\n");
return -1;
}
if (arguments->arg_list_len == 0) {
if ((!arguments->all_databases) && (!arguments->isDumpIn)) {
if (g_args.arg_list_len == 0) {
if ((!g_args.all_databases) && (!g_args.isDumpIn)) {
fprintf(stderr, "taosdump requires parameters\n");
return -1;
}
}
/*
if (arguments->isDumpIn && (strcmp(arguments->outpath, DEFAULT_DUMP_FILE) != 0)) {
/*
if (g_args.isDumpIn && (strcmp(g_args.outpath, DEFAULT_DUMP_FILE) != 0)) {
fprintf(stderr, "duplicate parameter input and output file path\n");
return -1;
}
*/
if (!arguments->isDumpIn && arguments->encode != NULL) {
*/
if (!g_args.isDumpIn && g_args.encode != NULL) {
fprintf(stderr, "invalid option in dump out\n");
return -1;
}
if (arguments->table_batch <= 0) {
if (g_args.table_batch <= 0) {
fprintf(stderr, "invalid option in dump out\n");
return -1;
}
......@@ -2143,13 +2337,20 @@ _exit_no_charset:
// ======== dumpIn support multi threads functions ================================//
static char **tsDumpInSqlFiles = NULL;
static int32_t tsSqlFileNum = 0;
static char tsDbSqlFile[TSDB_FILENAME_LEN] = {0};
static char tsfCharset[64] = {0};
static int taosGetFilesNum(const char *directoryName, const char *prefix)
static char **g_tsDumpInSqlFiles = NULL;
static int32_t g_tsSqlFileNum = 0;
static char g_tsDbSqlFile[MAX_FILE_NAME_LEN] = {0};
static char g_tsCharset[64] = {0};
static int taosGetFilesNum(const char *directoryName,
const char *prefix, const char *prefix2)
{
char cmd[1024] = { 0 };
if (prefix2)
sprintf(cmd, "ls %s/*.%s %s/*.%s | wc -l ",
directoryName, prefix, directoryName, prefix2);
else
sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix);
FILE *fp = popen(cmd, "r");
......@@ -2173,10 +2374,18 @@ static int taosGetFilesNum(const char *directoryName, const char *prefix)
return fileNum;
}
static void taosParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles)
static void taosParseDirectory(const char *directoryName,
const char *prefix, const char *prefix2,
char **fileArray, int totalFiles)
{
char cmd[1024] = { 0 };
if (prefix2) {
sprintf(cmd, "ls %s/*.%s %s/*.%s | sort",
directoryName, prefix, directoryName, prefix2);
} else {
sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix);
}
FILE *fp = popen(cmd, "r");
if (fp == NULL) {
......@@ -2186,7 +2395,7 @@ static void taosParseDirectory(const char *directoryName, const char *prefix, ch
int fileNum = 0;
while (fscanf(fp, "%128s", fileArray[fileNum++])) {
if (strcmp(fileArray[fileNum-1], tsDbSqlFile) == 0) {
if (strcmp(fileArray[fileNum-1], g_tsDbSqlFile) == 0) {
fileNum--;
}
if (fileNum >= totalFiles) {
......@@ -2203,7 +2412,7 @@ static void taosParseDirectory(const char *directoryName, const char *prefix, ch
pclose(fp);
}
static void taosCheckTablesSQLFile(const char *directoryName)
static void taosCheckDatabasesSQLFile(const char *directoryName)
{
char cmd[1024] = { 0 };
sprintf(cmd, "ls %s/dbs.sql", directoryName);
......@@ -2214,27 +2423,27 @@ static void taosCheckTablesSQLFile(const char *directoryName)
exit(-1);
}
while (fscanf(fp, "%128s", tsDbSqlFile)) {
while (fscanf(fp, "%128s", g_tsDbSqlFile)) {
break;
}
pclose(fp);
}
static void taosMallocSQLFiles()
static void taosMallocDumpFiles()
{
tsDumpInSqlFiles = (char**)calloc(tsSqlFileNum, sizeof(char*));
for (int i = 0; i < tsSqlFileNum; i++) {
tsDumpInSqlFiles[i] = calloc(1, TSDB_FILENAME_LEN);
g_tsDumpInSqlFiles = (char**)calloc(g_tsSqlFileNum, sizeof(char*));
for (int i = 0; i < g_tsSqlFileNum; i++) {
g_tsDumpInSqlFiles[i] = calloc(1, MAX_FILE_NAME_LEN);
}
}
static void taosFreeSQLFiles()
static void taosFreeDumpFiles()
{
for (int i = 0; i < tsSqlFileNum; i++) {
tfree(tsDumpInSqlFiles[i]);
for (int i = 0; i < g_tsSqlFileNum; i++) {
tfree(g_tsDumpInSqlFiles[i]);
}
tfree(tsDumpInSqlFiles);
tfree(g_tsDumpInSqlFiles);
}
static void taosGetDirectoryFileList(char *inputDir)
......@@ -2246,19 +2455,29 @@ static void taosGetDirectoryFileList(char *inputDir)
}
if (fileStat.st_mode & S_IFDIR) {
taosCheckTablesSQLFile(inputDir);
tsSqlFileNum = taosGetFilesNum(inputDir, "sql");
int tsSqlFileNumOfTbls = tsSqlFileNum;
if (tsDbSqlFile[0] != 0) {
taosCheckDatabasesSQLFile(inputDir);
if (g_args.avro)
g_tsSqlFileNum = taosGetFilesNum(inputDir, "sql", "avro");
else
g_tsSqlFileNum += taosGetFilesNum(inputDir, "sql", NULL);
int tsSqlFileNumOfTbls = g_tsSqlFileNum;
if (g_tsDbSqlFile[0] != 0) {
tsSqlFileNumOfTbls--;
}
taosMallocSQLFiles();
taosMallocDumpFiles();
if (0 != tsSqlFileNumOfTbls) {
taosParseDirectory(inputDir, "sql", tsDumpInSqlFiles, tsSqlFileNumOfTbls);
if (g_args.avro) {
taosParseDirectory(inputDir, "sql", "avro",
g_tsDumpInSqlFiles, tsSqlFileNumOfTbls);
} else {
taosParseDirectory(inputDir, "sql", NULL,
g_tsDumpInSqlFiles, tsSqlFileNumOfTbls);
}
fprintf(stdout, "\nstart to dispose %d files in %s\n", tsSqlFileNum, inputDir);
}
else {
fprintf(stdout, "\nstart to dispose %d files in %s\n",
g_tsSqlFileNum, inputDir);
} else {
errorPrint("%s is not a directory\n", inputDir);
exit(-1);
}
......@@ -2268,24 +2487,23 @@ static FILE* taosOpenDumpInFile(char *fptr) {
wordexp_t full_path;
if (wordexp(fptr, &full_path, 0) != 0) {
fprintf(stderr, "ERROR: illegal file name: %s\n", fptr);
errorPrint("illegal file name: %s\n", fptr);
return NULL;
}
char *fname = full_path.we_wordv[0];
FILE *f = NULL;
if ((fname) && (strlen(fname) > 0)) {
FILE *f = fopen(fname, "r");
f = fopen(fname, "r");
if (f == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, fname);
}
}
wordfree(&full_path);
return f;
}
return NULL;
}
static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
......@@ -2298,7 +2516,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
cmd = (char *)malloc(TSDB_MAX_ALLOWED_SQL_LEN);
if (cmd == NULL) {
fprintf(stderr, "failed to allocate memory\n");
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
......@@ -2324,7 +2543,8 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
memcpy(cmd + cmd_len, line, read_len);
cmd[read_len + cmd_len]= '\0';
if (queryDbImpl(taos, cmd)) {
fprintf(stderr, "error sql: linenu:%d, file:%s\n", lineNo, fileName);
errorPrint("%s() LN%d, error sql: linenu:%d, file:%s\n",
__func__, __LINE__, lineNo, fileName);
fprintf(g_fpOfResult, "error sql: linenu:%d, file:%s\n", lineNo, fileName);
}
......@@ -2346,43 +2566,56 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static void* taosDumpInWorkThreadFp(void *arg)
{
SThreadParaObj *pThread = (SThreadParaObj*)arg;
for (int32_t f = 0; f < tsSqlFileNum; ++f) {
for (int32_t f = 0; f < g_tsSqlFileNum; ++f) {
if (f % pThread->totalThreads == pThread->threadIndex) {
char *SQLFileName = tsDumpInSqlFiles[f];
char *SQLFileName = g_tsDumpInSqlFiles[f];
FILE* fp = taosOpenDumpInFile(SQLFileName);
if (NULL == fp) {
continue;
}
fprintf(stderr, "Success Open input file: %s\n", SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, tsfCharset, g_args.encode, SQLFileName);
fprintf(stderr, ", Success Open input file: %s\n",
SQLFileName);
taosDumpInOneFile(pThread->taosCon, fp, g_tsCharset, g_args.encode, SQLFileName);
}
}
return NULL;
}
static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
static void taosStartDumpInWorkThreads()
{
pthread_attr_t thattr;
SThreadParaObj *pThread;
int32_t totalThreads = args->thread_num;
int32_t totalThreads = g_args.thread_num;
if (totalThreads > tsSqlFileNum) {
totalThreads = tsSqlFileNum;
if (totalThreads > g_tsSqlFileNum) {
totalThreads = g_tsSqlFileNum;
}
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(
totalThreads, sizeof(SThreadParaObj));
if (NULL == threadObj) {
errorPrint("%s() LN%d, memory allocation failed\n", __func__, __LINE__);
}
SThreadParaObj *threadObj = (SThreadParaObj *)calloc(totalThreads, sizeof(SThreadParaObj));
for (int32_t t = 0; t < totalThreads; ++t) {
pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = totalThreads;
pThread->taosCon = taosCon;
pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (pThread->taosCon == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
return;
}
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, taosDumpInWorkThreadFp, (void*)pThread) != 0) {
fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex);
if (pthread_create(&(pThread->threadID), &thattr,
taosDumpInWorkThreadFp, (void*)pThread) != 0) {
errorPrint("%s() LN%d, thread:%d failed to start\n",
__func__, __LINE__, pThread->threadIndex);
exit(0);
}
}
......@@ -2397,42 +2630,48 @@ static void taosStartDumpInWorkThreads(void* taosCon, struct arguments *args)
free(threadObj);
}
static int taosDumpIn(struct arguments *arguments) {
assert(arguments->isDumpIn);
static int taosDumpIn() {
assert(g_args.isDumpIn);
TAOS *taos = NULL;
FILE *fp = NULL;
taos = taos_connect(arguments->host, arguments->user, arguments->password, NULL, arguments->port);
taos = taos_connect(
g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (taos == NULL) {
fprintf(stderr, "failed to connect to TDengine server\n");
errorPrint("%s() LN%d, failed to connect to TDengine server\n",
__func__, __LINE__);
return -1;
}
taosGetDirectoryFileList(arguments->inpath);
taosGetDirectoryFileList(g_args.inpath);
int32_t tsSqlFileNumOfTbls = tsSqlFileNum;
if (tsDbSqlFile[0] != 0) {
int32_t tsSqlFileNumOfTbls = g_tsSqlFileNum;
if (g_tsDbSqlFile[0] != 0) {
tsSqlFileNumOfTbls--;
fp = taosOpenDumpInFile(tsDbSqlFile);
fp = taosOpenDumpInFile(g_tsDbSqlFile);
if (NULL == fp) {
fprintf(stderr, "failed to open input file %s\n", tsDbSqlFile);
errorPrint("%s() LN%d, failed to open input file %s\n",
__func__, __LINE__, g_tsDbSqlFile);
return -1;
}
fprintf(stderr, "Success Open input file: %s\n", tsDbSqlFile);
fprintf(stderr, "Success Open input file: %s\n", g_tsDbSqlFile);
taosLoadFileCharset(fp, tsfCharset);
taosLoadFileCharset(fp, g_tsCharset);
taosDumpInOneFile(taos, fp, tsfCharset, arguments->encode, tsDbSqlFile);
taosDumpInOneFile(taos, fp, g_tsCharset, g_args.encode,
g_tsDbSqlFile);
}
taos_close(taos);
if (0 != tsSqlFileNumOfTbls) {
taosStartDumpInWorkThreads(taos, arguments);
taosStartDumpInWorkThreads();
}
taos_close(taos);
taosFreeSQLFiles();
taosFreeDumpFiles();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册