未验证 提交 0fc2985b 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-5757]<fix>: taosdump precision. (#7896)

上级 257f17f5
......@@ -181,6 +181,7 @@ typedef struct {
int32_t threadIndex;
int32_t totalThreads;
char dbName[TSDB_DB_NAME_LEN];
int precision;
void *taosCon;
int64_t rowsOfDumpOut;
int64_t tablesOfDumpOut;
......@@ -246,11 +247,6 @@ static struct argp_option options[] = {
{"avro", 'v', 0, 0, "Dump apache avro format data file. By default, dump sql command sequence.", 2},
{"start-time", 'S', "START_TIME", 0, "Start time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00:000+0800 or '2017-10-01 00:00:00.000+0800'", 4},
{"end-time", 'E', "END_TIME", 0, "End time to dump. Either epoch or ISO8601/RFC3339 format is acceptable. ISO8601 format example: 2017-10-01T00:00:00.000+0800 or 2017-10-0100:00:00.000+0800 or '2017-10-01 00:00:00.000+0800'", 5},
#if TSDB_SUPPORT_NANOSECOND == 1
{"precision", 'C', "PRECISION", 0, "Specify precision for converting human-readable time to epoch. Valid value is one of ms, us, and ns. Default is ms.", 6},
#else
{"precision", 'C', "PRECISION", 0, "Use specified precision to convert human-readable time. Valid value is one of ms and us. Default is ms.", 6},
#endif
{"data-batch", 'B', "DATA_BATCH", 0, "Number of data point per insert statement. Max value is 32766. Default is 1.", 3},
{"max-sql-len", 'L', "SQL_LEN", 0, "Max length of one sql. Default is 65480.", 3},
{"table-batch", 't', "TABLE_BATCH", 0, "Number of table dumpout into one output file. Default is 1.", 3},
......@@ -281,8 +277,11 @@ typedef struct arguments {
bool with_property;
bool avro;
int64_t start_time;
char humanStartTime[28];
int64_t end_time;
char humanEndTime[28];
char precision[8];
int32_t data_batch;
int32_t max_sql_len;
int32_t table_batch; // num of table which will be dump into one output file.
......@@ -296,6 +295,8 @@ typedef struct arguments {
bool debug_print;
bool verbose_print;
bool performance_print;
int dbCount;
} SArguments;
/* Our argp parser. */
......@@ -318,13 +319,17 @@ static void taosDumpCreateTableClause(STableDef *tableDes, int numOfCols,
static void taosDumpCreateMTableClause(STableDef *tableDes, char *metric,
int numOfCols, FILE *fp, char* dbName);
static int32_t taosDumpTable(char *tbName, char *metric,
FILE *fp, TAOS* taosCon, char* dbName);
FILE *fp, TAOS* taosCon, char* dbName, int precision);
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName,
int precision,
char *jsonAvroSchema);
static int taosCheckParam(struct arguments *arguments);
static void taosFreeDbInfos();
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName);
static void taosStartDumpOutWorkThreads(
int32_t numOfThread,
char *dbName,
int precision);
struct arguments g_args = {
// connection option
......@@ -349,8 +354,10 @@ struct arguments g_args = {
false, // schemeonly
true, // with_property
false, // avro format
-INT64_MAX, // start_time
-INT64_MAX + 1, // start_time
{0}, // humanStartTime
INT64_MAX, // end_time
{0}, // humanEndTime
"ms", // precision
1, // data_batch
TSDB_MAX_SQL_LEN, // max_sql_len
......@@ -364,7 +371,8 @@ struct arguments g_args = {
false, // isDumpIn
false, // debug_print
false, // verbose_print
false // performance_print
false, // performance_print
0, // dbCount
};
static void errorPrintReqArg2(char *program, char *wrong_arg)
......@@ -472,12 +480,8 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'S':
// parse time here.
g_args.start_time = atol(arg);
break;
case 'E':
g_args.end_time = atol(arg);
break;
case 'C':
break;
case 'B':
g_args.data_batch = atoi(arg);
......@@ -550,7 +554,7 @@ static int queryDbImpl(TAOS *taos, char *command) {
return 0;
}
static void parse_precision_first(
UNUSED_FUNC static void parse_precision_first(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-C") == 0) {
......@@ -616,6 +620,73 @@ static void parse_args(
}
}
static void copyHumanTimeToArg(char *timeStr, bool isStartTime)
{
if (isStartTime)
strcpy(g_args.humanStartTime, timeStr);
else
strcpy(g_args.humanEndTime, timeStr);
}
static void copyTimestampToArg(char *timeStr, bool isStartTime)
{
if (isStartTime)
g_args.start_time = atol(timeStr);
else
g_args.end_time = atol(timeStr);
}
static void parse_timestamp(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
char *tmp;
bool isStartTime = false;
bool isEndTime = false;
if (strcmp(argv[i], "-S") == 0) {
isStartTime = true;
} else if (strcmp(argv[i], "-E") == 0) {
isEndTime = true;
}
if (isStartTime || isEndTime) {
if (NULL == argv[i+1]) {
errorPrint("%s need a valid value following!\n", argv[i]);
exit(-1);
}
tmp = strdup(argv[i+1]);
if (strchr(tmp, ':') && strchr(tmp, '-')) {
copyHumanTimeToArg(tmp, isStartTime);
} else {
copyTimestampToArg(tmp, isStartTime);
}
}
}
}
static int getPrecisionByString(char *precision)
{
if (0 == strncasecmp(precision,
"ms", 2)) {
return TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision,
"us", 2)) {
return TSDB_TIME_PRECISION_MICRO;
#if TSDB_SUPPORT_NANOSECOND == 1
} else if (0 == strncasecmp(precision,
"ns", 2)) {
return TSDB_TIME_PRECISION_NANO;
#endif
} else {
errorPrint("Invalid time precision: %s",
precision);
}
return -1;
}
/*
static void parse_timestamp(
int argc, char *argv[], SArguments *arguments) {
for (int i = 1; i < argc; i++) {
......@@ -634,6 +705,7 @@ static void parse_timestamp(
int64_t tmpEpoch;
if (strchr(tmp, ':') && strchr(tmp, '-')) {
strcpy(g_args.humanStartTime, tmp)
int32_t timePrec;
if (0 == strncasecmp(arguments->precision,
"ms", strlen("ms"))) {
......@@ -672,6 +744,7 @@ static void parse_timestamp(
}
}
}
*/
int main(int argc, char *argv[]) {
static char verType[32] = {0};
......@@ -682,7 +755,7 @@ int main(int argc, char *argv[]) {
/* Parse our arguments; every option seen by parse_opt will be
reflected in arguments. */
if (argc > 1) {
parse_precision_first(argc, argv, &g_args);
// parse_precision_first(argc, argv, &g_args);
parse_timestamp(argc, argv, &g_args);
parse_args(argc, argv, &g_args);
}
......@@ -714,7 +787,9 @@ int main(int argc, char *argv[]) {
printf("with_property: %s\n", g_args.with_property?"true":"false");
printf("avro format: %s\n", g_args.avro?"true":"false");
printf("start_time: %" PRId64 "\n", g_args.start_time);
printf("human readable start time: %s \n", g_args.humanStartTime);
printf("end_time: %" PRId64 "\n", g_args.end_time);
printf("human readable end time: %s \n", g_args.humanEndTime);
printf("precision: %s\n", g_args.precision);
printf("data_batch: %d\n", g_args.data_batch);
printf("max_sql_len: %d\n", g_args.max_sql_len);
......@@ -759,7 +834,9 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "with_property: %s\n", g_args.with_property?"true":"false");
fprintf(g_fpOfResult, "avro format: %s\n", g_args.avro?"true":"false");
fprintf(g_fpOfResult, "start_time: %" PRId64 "\n", g_args.start_time);
fprintf(g_fpOfResult, "human readable start time: %s \n", g_args.humanStartTime);
fprintf(g_fpOfResult, "end_time: %" PRId64 "\n", g_args.end_time);
fprintf(g_fpOfResult, "human readable end time: %s \n", g_args.humanEndTime);
fprintf(g_fpOfResult, "precision: %s\n", g_args.precision);
fprintf(g_fpOfResult, "data_batch: %d\n", g_args.data_batch);
fprintf(g_fpOfResult, "max_sql_len: %d\n", g_args.max_sql_len);
......@@ -816,7 +893,8 @@ int main(int argc, char *argv[]) {
static void taosFreeDbInfos() {
if (g_dbInfos == NULL) return;
for (int i = 0; i < 128; i++) tfree(g_dbInfos[i]);
for (int i = 0; i < g_args.dbCount; i++)
tfree(g_dbInfos[i]);
tfree(g_dbInfos);
}
......@@ -1046,6 +1124,88 @@ static int32_t taosSaveTableOfMetricToTempFile(
return 0;
}
static int getDbCount()
{
int count;
TAOS *taos = NULL;
TAOS_RES *result = NULL;
char *command = NULL;
TAOS_ROW row;
command = (char *)malloc(COMMAND_SIZE);
if (command == NULL) {
errorPrint("%s() LN%d, failed to allocate command buffer\n", __func__, __LINE__);
return 0;
}
/* Connect to server */
taos = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (NULL == taos) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
free(command);
return 0;
}
sprintf(command, "show databases");
result = taos_query(taos, command);
int32_t code = taos_errno(result);
if (0 != code) {
errorPrint("%s() LN%d, failed to run command: %s, reason: %s\n",
__func__, __LINE__, command, taos_errstr(result));
free(command);
return 0;
}
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result)) != NULL) {
// sys database name : 'log', but subsequent version changed to 'log'
if ((strncasecmp(row[TSDB_SHOW_DB_NAME_INDEX], "log",
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
&& (!g_args.allow_sys)) {
continue;
}
if (g_args.databases) { // input multi dbs
for (int i = 0; g_args.arg_list[i]; i++) {
if (strncasecmp(g_args.arg_list[i],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
}
continue;
} else if (!g_args.all_databases) { // only input one db
if (strncasecmp(g_args.arg_list[0],
(char *)row[TSDB_SHOW_DB_NAME_INDEX],
fields[TSDB_SHOW_DB_NAME_INDEX].bytes) == 0)
goto _dump_db_point;
else
continue;
}
_dump_db_point:
count++;
if (g_args.databases) {
if (count > g_args.arg_list_len) break;
} else if (!g_args.all_databases) {
if (count >= 1) break;
}
}
if (count == 0) {
errorPrint("%d databases valid to dump\n", count);
}
free(command);
return count;
}
static int taosDumpOut() {
TAOS *taos = NULL;
TAOS_RES *result = NULL;
......@@ -1070,7 +1230,14 @@ static int taosDumpOut() {
return -1;
}
g_dbInfos = (SDbInfo **)calloc(128, sizeof(SDbInfo *));
g_args.dbCount = getDbCount();
if (0 == g_args.dbCount) {
errorPrint("%d databases valid to dump\n", g_args.dbCount);
return -1;
}
g_dbInfos = (SDbInfo **)calloc(g_args.dbCount, sizeof(SDbInfo *));
if (g_dbInfos == NULL) {
errorPrint("%s() LN%d, failed to allocate memory\n",
__func__, __LINE__);
......@@ -1165,9 +1332,9 @@ _dump_db_point:
g_dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
g_dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
tstrncpy(g_dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
min(8, fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes + 1));
//g_dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
tstrncpy(g_dbInfos[count]->precision,
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
DB_PRECISION_LEN);
g_dbInfos[count]->update = *((int8_t *)row[TSDB_SHOW_DB_UPDATE_INDEX]);
}
count++;
......@@ -1263,8 +1430,10 @@ _dump_db_point:
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(totalNumOfThread,
g_dbInfos[0]->name);
g_dbInfos[0]->name,
getPrecisionByString(g_dbInfos[0]->precision));
char tmpFileName[MAX_FILE_NAME_LEN];
_clean_tmp_file:
......@@ -1453,7 +1622,7 @@ static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema)
static int32_t taosDumpTable(
char *tbName, char *metric,
FILE *fp, TAOS* taosCon, char* dbName) {
FILE *fp, TAOS* taosCon, char* dbName, int precision) {
int count = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef)
......@@ -1504,7 +1673,7 @@ static int32_t taosDumpTable(
int32_t ret = 0;
if (!g_args.schemaonly) {
ret = taosDumpTableData(fp, tbName, taosCon, dbName,
ret = taosDumpTableData(fp, tbName, taosCon, dbName, precision,
jsonAvroSchema);
}
......@@ -1595,7 +1764,8 @@ static void* taosDumpOutWorkThreadFp(void *arg)
int ret = taosDumpTable(
tableRecord.name, tableRecord.metric,
fp, pThread->taosCon, pThread->dbName);
fp, pThread->taosCon, pThread->dbName,
pThread->precision);
if (ret >= 0) {
// TODO: sum table count and table rows by self
pThread->tablesOfDumpOut++;
......@@ -1644,7 +1814,7 @@ static void* taosDumpOutWorkThreadFp(void *arg)
return NULL;
}
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName)
static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName, int precision)
{
pthread_attr_t thattr;
SThreadParaObj *threadObj =
......@@ -1663,6 +1833,7 @@ static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName)
pThread->threadIndex = t;
pThread->totalThreads = numOfThread;
tstrncpy(pThread->dbName, dbName, TSDB_DB_NAME_LEN);
pThread->precision = precision;
pThread->taosCon = taos_connect(g_args.host, g_args.user, g_args.password,
NULL, g_args.port);
if (pThread->taosCon == NULL) {
......@@ -1912,7 +2083,8 @@ static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taosCon) {
}
// start multi threads to dumpout
taosStartDumpOutWorkThreads(numOfThread, dbInfo->name);
taosStartDumpOutWorkThreads(numOfThread, dbInfo->name,
getPrecisionByString(dbInfo->precision));
for (int loopCnt = 0; loopCnt < numOfThread; loopCnt++) {
sprintf(tmpBuf, ".tables.tmp.%d", loopCnt);
(void)remove(tmpBuf);
......@@ -2190,14 +2362,38 @@ static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbN
}
static int taosDumpTableData(FILE *fp, char *tbName,
TAOS* taosCon, char* dbName,
TAOS* taosCon, char* dbName, int precision,
char *jsonAvroSchema) {
int64_t totalRows = 0;
char sqlstr[1024] = {0};
int64_t start_time, end_time;
if (strlen(g_args.humanStartTime)) {
if (TSDB_CODE_SUCCESS != taosParseTime(
g_args.humanStartTime, &start_time, strlen(g_args.humanStartTime),
precision, 0)) {
errorPrint("Input %s, time format error!\n", g_args.humanStartTime);
return -1;
}
} else {
start_time = g_args.start_time;
}
if (strlen(g_args.humanEndTime)) {
if (TSDB_CODE_SUCCESS != taosParseTime(
g_args.humanEndTime, &end_time, strlen(g_args.humanEndTime),
precision, 0)) {
errorPrint("Input %s, time format error!\n", g_args.humanEndTime);
return -1;
}
} else {
end_time = g_args.end_time;
}
sprintf(sqlstr,
"select * from %s.%s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc;",
dbName, tbName, g_args.start_time, g_args.end_time);
dbName, tbName, start_time, end_time);
TAOS_RES* res = taos_query(taosCon, sqlstr);
int32_t code = taos_errno(res);
......
......@@ -181,7 +181,7 @@ python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoIns
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanoQuery.py
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestSupportNanosubscribe.py
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdemoTestInsertTime_step.py
python3 test.py -f tools/taosdemoAllTest/NanoTestCase/taosdumpTestNanoSupport.py
python3 test.py -f tools/taosdumpTestNanoSupport.py
#
python3 ./test.py -f tsdb/tsdbComp.py
......
......@@ -136,7 +136,7 @@ class TDTestCase:
# dump part data with -S -E
os.system(
'%staosdump --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -C ns -o ./taosdumptest/dumptmp2 ' %
'%staosdump --databases timedb1 -S 1625068810000000000 -E 1625068860000000000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000000000 -o ./taosdumptest/dumptmp3 ' %
......@@ -218,7 +218,7 @@ class TDTestCase:
"%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000000 -E 1625068860000000 -C us -o ./taosdumptest/dumptmp2 ' %
'%staosdump --databases timedb1 -S 1625068810000000 -E 1625068860000000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000000 -o ./taosdumptest/dumptmp3 ' %
......@@ -299,7 +299,7 @@ class TDTestCase:
"%staosdump --databases timedb1 -o ./taosdumptest/dumptmp1" % binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000 -E 1625068860000 -C ms -o ./taosdumptest/dumptmp2 ' %
'%staosdump --databases timedb1 -S 1625068810000 -E 1625068860000 -o ./taosdumptest/dumptmp2 ' %
binPath)
os.system(
'%staosdump --databases timedb1 -S 1625068810000 -o ./taosdumptest/dumptmp3 ' %
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册