提交 f2d1f4c6 编写于 作者: sangshuduo's avatar sangshuduo

merge avro feature code.

上级 a7ccda16
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
#include "tsdb.h" #include "tsdb.h"
#include "tutil.h" #include "tutil.h"
#define AVRO_SUPPORT 0 #define AVRO_SUPPORT 0
#if AVRO_SUPPORT == 1 #if AVRO_SUPPORT == 1
...@@ -47,8 +46,8 @@ ...@@ -47,8 +46,8 @@
static int converStringToReadable(char *str, int size, char *buf, int bufsize); static int converStringToReadable(char *str, int size, char *buf, int bufsize);
static int convertNCharToReadable(char *str, int size, char *buf, int bufsize); static int convertNCharToReadable(char *str, int size, char *buf, int bufsize);
static void taosDumpCharset(FILE *fp); static void dumpCharset(FILE *fp);
static void taosLoadFileCharset(FILE *fp, char *fcharset); static void loadFileCharset(FILE *fp, char *fcharset);
typedef struct { typedef struct {
short bytes; short bytes;
...@@ -172,7 +171,8 @@ typedef struct { ...@@ -172,7 +171,8 @@ typedef struct {
} TableRecord; } TableRecord;
typedef struct { typedef struct {
bool isStable; bool isStb;
bool belongStb;
int64_t dumpNtbCount; int64_t dumpNtbCount;
TableRecord **dumpNtbInfos; TableRecord **dumpNtbInfos;
TableRecord tableRecord; TableRecord tableRecord;
...@@ -342,24 +342,21 @@ static resultStatistics g_resultStatistics = {0}; ...@@ -342,24 +342,21 @@ static resultStatistics g_resultStatistics = {0};
static FILE *g_fpOfResult = NULL; static FILE *g_fpOfResult = NULL;
static int g_numOfCores = 1; static int g_numOfCores = 1;
static int taosDumpOut(); static int dumpOut();
static int taosDumpIn(); static int dumpIn();
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, static void dumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty,
FILE *fp); FILE *fp);
//static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taos);
static int dumpStable(char *table, FILE *fp, SDbInfo *dbInfo);
static int dumpCreateTableClause(TableDef *tableDes, int numOfCols, static int dumpCreateTableClause(TableDef *tableDes, int numOfCols,
FILE *fp, char* dbName); FILE *fp, char* dbName);
static void taosDumpCreateMTableClause(TableDef *tableDes, char *stable, static int getTableDes(
int numOfCols, FILE *fp, char* dbName); char* dbName, char *table,
static int64_t taosDumpTable(char *tbName, char *stable, TableDef *stableDes, bool isSuperTable);
FILE *fp, char* dbName, int precision);
static int64_t dumpTableData(FILE *fp, char *tbName, static int64_t dumpTableData(FILE *fp, char *tbName,
char* dbName, char* dbName,
int precision, int precision,
char *jsonAvroSchema); char *jsonAvroSchema);
static int checkParam(); static int checkParam();
static void taosFreeDbInfos(); static void freeDbInfos();
struct arguments g_args = { struct arguments g_args = {
// connection option // connection option
...@@ -433,16 +430,16 @@ static void printVersion() { ...@@ -433,16 +430,16 @@ static void printVersion() {
} }
} }
UNUSED_FUNC void errorWrongValue(char *program, char *wrong_arg, char *wrong_value) void errorWrongValue(char *program, char *wrong_arg, char *wrong_value)
{ {
fprintf(stderr, "%s %s: %s is an invalid value\n", program, wrong_arg, wrong_value); fprintf(stderr, "%s %s: %s is an invalid value\n", program, wrong_arg, wrong_value);
fprintf(stderr, "Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); fprintf(stderr, "Try `taosdump --help' or `taosdump --usage' for more information.\n");
} }
static void errorUnrecognized(char *program, char *wrong_arg) static void errorUnrecognized(char *program, char *wrong_arg)
{ {
fprintf(stderr, "%s: unrecognized options '%s'\n", program, wrong_arg); fprintf(stderr, "%s: unrecognized options '%s'\n", program, wrong_arg);
fprintf(stderr, "Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); fprintf(stderr, "Try `taosdump --help' or `taosdump --usage' for more information.\n");
} }
static void errorPrintReqArg(char *program, char *wrong_arg) static void errorPrintReqArg(char *program, char *wrong_arg)
...@@ -451,7 +448,7 @@ static void errorPrintReqArg(char *program, char *wrong_arg) ...@@ -451,7 +448,7 @@ static void errorPrintReqArg(char *program, char *wrong_arg)
"%s: option requires an argument -- '%s'\n", "%s: option requires an argument -- '%s'\n",
program, wrong_arg); program, wrong_arg);
fprintf(stderr, fprintf(stderr,
"Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); "Try `taosdump --help' or `taosdump --usage' for more information.\n");
} }
static void errorPrintReqArg2(char *program, char *wrong_arg) static void errorPrintReqArg2(char *program, char *wrong_arg)
...@@ -460,7 +457,7 @@ static void errorPrintReqArg2(char *program, char *wrong_arg) ...@@ -460,7 +457,7 @@ static void errorPrintReqArg2(char *program, char *wrong_arg)
"%s: option requires a number argument '-%s'\n", "%s: option requires a number argument '-%s'\n",
program, wrong_arg); program, wrong_arg);
fprintf(stderr, fprintf(stderr,
"Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); "Try `taosdump --help' or `taosdump --usage' for more information.\n");
} }
static void errorPrintReqArg3(char *program, char *wrong_arg) static void errorPrintReqArg3(char *program, char *wrong_arg)
...@@ -469,7 +466,7 @@ static void errorPrintReqArg3(char *program, char *wrong_arg) ...@@ -469,7 +466,7 @@ static void errorPrintReqArg3(char *program, char *wrong_arg)
"%s: option '%s' requires an argument\n", "%s: option '%s' requires an argument\n",
program, wrong_arg); program, wrong_arg);
fprintf(stderr, fprintf(stderr,
"Try `taosdemo --help' or `taosdemo --usage' for more information.\n"); "Try `taosdump --help' or `taosdump --usage' for more information.\n");
} }
/* Parse a single option. */ /* Parse a single option. */
...@@ -496,7 +493,14 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { ...@@ -496,7 +493,14 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
errorPrintReqArg2("taosdump", "P"); errorPrintReqArg2("taosdump", "P");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
g_args.port = atoi(arg);
uint64_t port = atoi(arg);
if (port > 65535) {
errorWrongValue("taosdump", "-P or --port", arg);
exit(EXIT_FAILURE);
}
g_args.port = (uint16_t)port;
break; break;
case 'q': case 'q':
g_args.mysqlFlag = atoi(arg); g_args.mysqlFlag = atoi(arg);
...@@ -709,7 +713,7 @@ static void parse_args( ...@@ -709,7 +713,7 @@ static void parse_args(
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
g_args.databases = true; g_args.databases = true;
} else if (0 == strncmp(argv[i], "--version", strlen("--version")) || } else if (0 == strncmp(argv[i], "--version", strlen("--version")) ||
0 == strncmp(argv[i], "-V", strlen("-V"))) { 0 == strncmp(argv[i], "-V", strlen("-V"))) {
printVersion(); printVersion();
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
...@@ -788,7 +792,7 @@ static int getPrecisionByString(char *precision) ...@@ -788,7 +792,7 @@ static int getPrecisionByString(char *precision)
return -1; return -1;
} }
static void taosFreeDbInfos() { static void freeDbInfos() {
if (g_dbInfos == NULL) return; if (g_dbInfos == NULL) return;
for (int i = 0; i < g_args.dumpDbCount; i++) for (int i = 0; i < g_args.dumpDbCount; i++)
tfree(g_dbInfos[i]); tfree(g_dbInfos[i]);
...@@ -839,15 +843,20 @@ static int getTableRecordInfo( ...@@ -839,15 +843,20 @@ static int getTableRecordInfo(
while ((row = taos_fetch_row(result)) != NULL) { while ((row = taos_fetch_row(result)) != NULL) {
isSet = true; isSet = true;
pTableRecordInfo->isStable = false; pTableRecordInfo->isStb = false;
tstrncpy(pTableRecordInfo->tableRecord.name, tstrncpy(pTableRecordInfo->tableRecord.name,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX], (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
min(TSDB_TABLE_NAME_LEN, min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1)); fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1));
tstrncpy(pTableRecordInfo->tableRecord.stable, if (strlen((char *)row[TSDB_SHOW_TABLES_METRIC_INDEX]) > 0) {
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX], pTableRecordInfo->belongStb = true;
min(TSDB_TABLE_NAME_LEN, tstrncpy(pTableRecordInfo->tableRecord.stable,
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1)); (char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1));
} else {
pTableRecordInfo->belongStb = false;
}
break; break;
} }
...@@ -872,7 +881,7 @@ static int getTableRecordInfo( ...@@ -872,7 +881,7 @@ static int getTableRecordInfo(
while ((row = taos_fetch_row(result)) != NULL) { while ((row = taos_fetch_row(result)) != NULL) {
isSet = true; isSet = true;
pTableRecordInfo->isStable = true; pTableRecordInfo->isStb = true;
tstrncpy(pTableRecordInfo->tableRecord.stable, table, tstrncpy(pTableRecordInfo->tableRecord.stable, table,
TSDB_TABLE_NAME_LEN); TSDB_TABLE_NAME_LEN);
break; break;
...@@ -974,7 +983,213 @@ static int getDumpDbCount() ...@@ -974,7 +983,213 @@ static int getDumpDbCount()
return count; return count;
} }
static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName) static void dumpCreateMTableClause(
char* dbName,
char *stable,
TableDef *tableDes,
int numOfCols,
FILE *fp
) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
errorPrint("%s() LN%d, failed to allocate %d memory\n",
__func__, __LINE__, COMMAND_SIZE);
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, stable);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
}
assert(counter < numOfCols);
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, ", %s",
tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
static int convertTbDesToAvroSchema(
char *dbName, char *tbName, TableDef *tableDes, int colCount,
char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
// {
// "namesapce": "database name",
// "type": "record",
// "name": "table name",
// "fields": [
// {
// "name": "col0 name",
// "type": "long"
// },
// {
// "name": "col1 name",
// "type": ["int", "null"]
// },
// {
// "name": "col2 name",
// "type": ["float", "null"]
// },
// ...
// {
// "name": "coln name",
// "type": ["string", "null"]
// }
// ]
// }
*avroSchema = (char *)calloc(1,
17 + TSDB_DB_NAME_LEN /* dbname section */
+ 17 /* type: record */
+ 11 + TSDB_TABLE_NAME_LEN /* tbname section */
+ 10 /* fields section */
+ (TSDB_COL_NAME_LEN + 11 + 16) * colCount + 4); /* fields section */
if (*avroSchema == NULL) {
errorPrint("%s() LN%d, memory allocation failed!\n", __func__, __LINE__);
return -1;
}
char *pstr = *avroSchema;
pstr += sprintf(pstr,
"{\"namespace\": \"%s\", \"type\": \"record\", \"name\": \"%s\", \"fields\": [",
dbName, tbName);
for (int i = 0; i < colCount; i ++) {
if (0 == i) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": \"%s\"",
tableDes->cols[i].field, "long");
} else {
if (strcasecmp(tableDes->cols[i].type, "binary") == 0 ||
strcasecmp(tableDes->cols[i].type, "nchar") == 0) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, "string");
} else {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, tableDes->cols[i].type);
}
}
if ((i != (colCount -1))
&& (strcmp(tableDes->cols[i + 1].note, "TAG") != 0)) {
pstr += sprintf(pstr, "},");
} else {
pstr += sprintf(pstr, "}");
break;
}
}
pstr += sprintf(pstr, "]}");
debugPrint("%s() LN%d, avroSchema: %s\n", __func__, __LINE__, *avroSchema);
return 0;
}
static int64_t dumpNormalTable(
char *dbName,
char *stable,
char *tbName,
int precision,
FILE *fp
) {
int colCount = 0;
TableDef *tableDes = (TableDef *)calloc(1, sizeof(TableDef)
+ sizeof(ColDes) * TSDB_MAX_COLUMNS);
if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
dumpCreateMTableClause(dbName, stable, tableDes, colCount, fp);
} else { // dump table definition
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
dumpCreateTableClause(tableDes, colCount, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
if (0 != convertTbDesToAvroSchema(
dbName, tbName, tableDes, colCount, &jsonAvroSchema)) {
freeTbDes(tableDes);
return -1;
}
}
free(tableDes);
int64_t ret = 0;
if (!g_args.schemaonly) {
ret = dumpTableData(fp, tbName, dbName, precision,
jsonAvroSchema);
}
return ret;
}
static int64_t dumpNormalTableBelongStb(
SDbInfo *dbInfo, char *stbName, char *ntbName)
{ {
int64_t count = 0; int64_t count = 0;
...@@ -996,21 +1211,47 @@ static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName) ...@@ -996,21 +1211,47 @@ static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName)
return -1; return -1;
} }
count = taosDumpTable(ntbName, NULL, count = dumpNormalTable(
fp, dbInfo->name, getPrecisionByString(dbInfo->precision)); dbInfo->name,
stbName,
ntbName,
getPrecisionByString(dbInfo->precision),
fp);
fclose(fp); fclose(fp);
return count; return count;
} }
static int64_t dumpNormalTable(FILE *fp, TAOS *taos, char *dbName, char *tbName, static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName)
char *stbName,
int precision)
{ {
int64_t count = 0; int64_t count = 0;
count = taosDumpTable(tbName, stbName,
fp, dbName, precision);
char tmpBuf[4096] = {0};
FILE *fp = NULL;
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.%s.sql",
g_args.outpath, dbInfo->name, ntbName);
} else {
sprintf(tmpBuf, "%s.%s.sql",
dbInfo->name, ntbName);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
return -1;
}
count = dumpNormalTable(
dbInfo->name,
NULL,
ntbName,
getPrecisionByString(dbInfo->precision),
fp);
fclose(fp);
return count; return count;
} }
...@@ -1044,12 +1285,12 @@ static void *dumpNtbOfDb(void *arg) { ...@@ -1044,12 +1285,12 @@ static void *dumpNtbOfDb(void *arg) {
debugPrint("[%d] No.\t%"PRId64" table name: %s\n", debugPrint("[%d] No.\t%"PRId64" table name: %s\n",
pThreadInfo->threadIndex, i, pThreadInfo->threadIndex, i,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name); ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name);
dumpNormalTable(fp, dumpNormalTable(
pThreadInfo->taos,
pThreadInfo->dbName, pThreadInfo->dbName,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->stable, ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->stable,
pThreadInfo->precision); ((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name,
pThreadInfo->precision,
fp);
} }
fclose(fp); fclose(fp);
...@@ -1103,12 +1344,12 @@ static void *dumpNormalTablesOfStb(void *arg) { ...@@ -1103,12 +1344,12 @@ static void *dumpNormalTablesOfStb(void *arg) {
debugPrint("[%d] sub table %"PRId64": name: %s\n", debugPrint("[%d] sub table %"PRId64": name: %s\n",
pThreadInfo->threadIndex, i++, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]); pThreadInfo->threadIndex, i++, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
dumpNormalTable(fp, dumpNormalTable(
pThreadInfo->taos,
pThreadInfo->dbName, pThreadInfo->dbName,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
pThreadInfo->stbName, pThreadInfo->stbName,
pThreadInfo->precision); (char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
pThreadInfo->precision,
fp);
} }
fclose(fp); fclose(fp);
...@@ -1294,6 +1535,34 @@ static int64_t dumpNtbOfStbByThreads( ...@@ -1294,6 +1535,34 @@ static int64_t dumpNtbOfStbByThreads(
return records; return records;
} }
static int dumpStableClasuse(SDbInfo *dbInfo, char *stbName, FILE *fp)
{
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(TableDef) + sizeof(ColDes) * TSDB_MAX_COLUMNS);
TableDef *tableDes = (TableDef *)calloc(1, sizeOfTableDes);
if (NULL == tableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
int colCount = getTableDes(dbInfo->name,
stbName, tableDes, true);
if (colCount < 0) {
free(tableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, stbName);
exit(-1);
}
dumpCreateTableClause(tableDes, colCount, fp, dbInfo->name);
free(tableDes);
return 0;
}
static int64_t dumpCreateSTableClauseOfDb( static int64_t dumpCreateSTableClauseOfDb(
SDbInfo *dbInfo, FILE *fp) SDbInfo *dbInfo, FILE *fp)
{ {
...@@ -1323,7 +1592,7 @@ static int64_t dumpCreateSTableClauseOfDb( ...@@ -1323,7 +1592,7 @@ static int64_t dumpCreateSTableClauseOfDb(
int64_t superTblCnt = 0; int64_t superTblCnt = 0;
while ((row = taos_fetch_row(res)) != NULL) { while ((row = taos_fetch_row(res)) != NULL) {
if (0 == dumpStable(row[TSDB_SHOW_TABLES_NAME_INDEX], fp, dbInfo)) { if (0 == dumpStableClasuse(dbInfo, row[TSDB_SHOW_TABLES_NAME_INDEX], fp)) {
superTblCnt ++; superTblCnt ++;
} }
} }
...@@ -1405,7 +1674,7 @@ static int64_t dumpNTablesOfDb(SDbInfo *dbInfo) ...@@ -1405,7 +1674,7 @@ static int64_t dumpNTablesOfDb(SDbInfo *dbInfo)
static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp) static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp)
{ {
taosDumpCreateDbClause(dbInfo, g_args.with_property, fp); dumpCreateDbClause(dbInfo, g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n", fprintf(g_fpOfResult, "\n#### database: %s\n",
dbInfo->name); dbInfo->name);
...@@ -1416,14 +1685,13 @@ static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp) ...@@ -1416,14 +1685,13 @@ static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp)
return dumpNTablesOfDb(dbInfo); return dumpNTablesOfDb(dbInfo);
} }
static int taosDumpOut() { static int dumpOut() {
TAOS *taos = NULL; TAOS *taos = NULL;
TAOS_RES *result = NULL; TAOS_RES *result = NULL;
TAOS_ROW row; TAOS_ROW row;
FILE *fp = NULL; FILE *fp = NULL;
int32_t count = 0; int32_t count = 0;
TableRecordInfo tableRecordInfo;
char tmpBuf[4096] = {0}; char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) { if (g_args.outpath[0] != 0) {
...@@ -1469,7 +1737,7 @@ static int taosDumpOut() { ...@@ -1469,7 +1737,7 @@ static int taosDumpOut() {
/* --------------------------------- Main Code -------------------------------- */ /* --------------------------------- Main Code -------------------------------- */
/* if (g_args.databases || g_args.all_databases) { // dump part of databases or all databases */ /* if (g_args.databases || g_args.all_databases) { // dump part of databases or all databases */
/* */ /* */
taosDumpCharset(fp); dumpCharset(fp);
sprintf(command, "show databases"); sprintf(command, "show databases");
result = taos_query(taos, command); result = taos_query(taos, command);
...@@ -1591,11 +1859,13 @@ static int taosDumpOut() { ...@@ -1591,11 +1859,13 @@ static int taosDumpOut() {
g_totalDumpOutRows += records; g_totalDumpOutRows += records;
} }
} else { } else {
taosDumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp); dumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp);
} }
int superTblCnt = 0 ; int superTblCnt = 0 ;
for (int i = 1; g_args.arg_list[i]; i++) { for (int i = 1; g_args.arg_list[i]; i++) {
TableRecordInfo tableRecordInfo;
if (getTableRecordInfo(g_dbInfos[0]->name, if (getTableRecordInfo(g_dbInfos[0]->name,
g_args.arg_list[i], g_args.arg_list[i],
&tableRecordInfo) < 0) { &tableRecordInfo) < 0) {
...@@ -1605,14 +1875,24 @@ static int taosDumpOut() { ...@@ -1605,14 +1875,24 @@ static int taosDumpOut() {
} }
int64_t records = 0; int64_t records = 0;
if (tableRecordInfo.isStable) { // dump all table of this stable if (tableRecordInfo.isStb) { // dump all table of this stable
int ret = dumpStable( int ret = dumpStableClasuse(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable, tableRecordInfo.tableRecord.stable,
fp, g_dbInfos[0]); fp);
if (ret >= 0) { if (ret >= 0) {
superTblCnt++; superTblCnt++;
records = dumpNtbOfStbByThreads(g_dbInfos[0], g_args.arg_list[i]); records = dumpNtbOfStbByThreads(g_dbInfos[0], g_args.arg_list[i]);
} }
} else if (tableRecordInfo.belongStb){
dumpStableClasuse(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable,
fp);
records = dumpNormalTableBelongStb(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable,
g_args.arg_list[i]);
} else { } else {
records = dumpNormalTableWithoutStb(g_dbInfos[0], g_args.arg_list[i]); records = dumpNormalTableWithoutStb(g_dbInfos[0], g_args.arg_list[i]);
} }
...@@ -1627,7 +1907,7 @@ static int taosDumpOut() { ...@@ -1627,7 +1907,7 @@ static int taosDumpOut() {
/* Close the handle and return */ /* Close the handle and return */
fclose(fp); fclose(fp);
taos_free_result(result); taos_free_result(result);
taosFreeDbInfos(); freeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows); fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return 0; return 0;
...@@ -1635,89 +1915,11 @@ _exit_failure: ...@@ -1635,89 +1915,11 @@ _exit_failure:
fclose(fp); fclose(fp);
taos_close(taos); taos_close(taos);
taos_free_result(result); taos_free_result(result);
taosFreeDbInfos(); freeDbInfos();
errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows); errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return -1; return -1;
} }
static int convertTbDesToAvroSchema(
char *dbName, char *tbName, TableDef *tableDes, int colCount,
char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
// {
// "namesapce": "database name",
// "type": "record",
// "name": "table name",
// "fields": [
// {
// "name": "col0 name",
// "type": "long"
// },
// {
// "name": "col1 name",
// "type": ["int", "null"]
// },
// {
// "name": "col2 name",
// "type": ["float", "null"]
// },
// ...
// {
// "name": "coln name",
// "type": ["string", "null"]
// }
// ]
// }
*avroSchema = (char *)calloc(1,
17 + TSDB_DB_NAME_LEN /* dbname section */
+ 17 /* type: record */
+ 11 + TSDB_TABLE_NAME_LEN /* tbname section */
+ 10 /* fields section */
+ (TSDB_COL_NAME_LEN + 11 + 16) * colCount + 4); /* fields section */
if (*avroSchema == NULL) {
errorPrint("%s() LN%d, memory allocation failed!\n", __func__, __LINE__);
return -1;
}
char *pstr = *avroSchema;
pstr += sprintf(pstr,
"{\"namespace\": \"%s\", \"type\": \"record\", \"name\": \"%s\", \"fields\": [",
dbName, tbName);
for (int i = 0; i < colCount; i ++) {
if (0 == i) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": \"%s\"",
tableDes->cols[i].field, "long");
} else {
if (strcasecmp(tableDes->cols[i].type, "binary") == 0 ||
strcasecmp(tableDes->cols[i].type, "nchar") == 0) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, "string");
} else {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, tableDes->cols[i].type);
}
}
if ((i != (colCount -1))
&& (strcmp(tableDes->cols[i + 1].note, "TAG") != 0)) {
pstr += sprintf(pstr, "},");
} else {
pstr += sprintf(pstr, "}");
break;
}
}
pstr += sprintf(pstr, "]}");
debugPrint("%s() LN%d, avroSchema: %s\n", __func__, __LINE__, *avroSchema);
return 0;
}
static int getTableDes( static int getTableDes(
char* dbName, char *table, char* dbName, char *table,
TableDef *tableDes, bool isSuperTable) { TableDef *tableDes, bool isSuperTable) {
...@@ -1905,71 +2107,7 @@ static int getTableDes( ...@@ -1905,71 +2107,7 @@ static int getTableDes(
return colCount; return colCount;
} }
static int64_t taosDumpTable( static void dumpCreateDbClause(
char *tbName, char *stable,
FILE *fp, char* dbName, int precision) {
int colCount = 0;
TableDef *tableDes = (TableDef *)calloc(1, sizeof(TableDef)
+ sizeof(ColDes) * TSDB_MAX_COLUMNS);
if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table
/*
colCount = getTableDes(stable, tableDes, taos);
if (count < 0) {
free(tableDes);
return -1;
}
dumpCreateTableClause(tableDes, count, fp);
memset(tableDes, 0, sizeof(TableDef) + sizeof(ColDes) * TSDB_MAX_COLUMNS);
*/
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, stable, colCount, fp, dbName);
} else { // dump table definition
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
dumpCreateTableClause(tableDes, colCount, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
if (0 != convertTbDesToAvroSchema(
dbName, tbName, tableDes, colCount, &jsonAvroSchema)) {
freeTbDes(tableDes);
return -1;
}
}
free(tableDes);
int64_t ret = 0;
if (!g_args.schemaonly) {
ret = dumpTableData(fp, tbName, dbName, precision,
jsonAvroSchema);
}
return ret;
}
static void taosDumpCreateDbClause(
SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char sqlstr[TSDB_MAX_SQL_LEN] = {0}; char sqlstr[TSDB_MAX_SQL_LEN] = {0};
...@@ -1991,34 +2129,6 @@ static void taosDumpCreateDbClause( ...@@ -1991,34 +2129,6 @@ static void taosDumpCreateDbClause(
fprintf(fp, "%s\n\n", sqlstr); fprintf(fp, "%s\n\n", sqlstr);
} }
static int dumpStable(char *stbName, FILE *fp, SDbInfo *dbInfo)
{
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(TableDef) + sizeof(ColDes) * TSDB_MAX_COLUMNS);
TableDef *tableDes = (TableDef *)calloc(1, sizeOfTableDes);
if (NULL == tableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
int colCount = getTableDes(dbInfo->name,
stbName, tableDes, true);
if (colCount < 0) {
free(tableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, stbName);
exit(-1);
}
dumpCreateTableClause(tableDes, colCount, fp, dbInfo->name);
free(tableDes);
return 0;
}
static int dumpCreateTableClause(TableDef *tableDes, int numOfCols, static int dumpCreateTableClause(TableDef *tableDes, int numOfCols,
FILE *fp, char* dbName) { FILE *fp, char* dbName) {
int counter = 0; int counter = 0;
...@@ -2070,73 +2180,6 @@ static int dumpCreateTableClause(TableDef *tableDes, int numOfCols, ...@@ -2070,73 +2180,6 @@ static int dumpCreateTableClause(TableDef *tableDes, int numOfCols,
return fprintf(fp, "%s\n\n", sqlstr); return fprintf(fp, "%s\n\n", sqlstr);
} }
static void taosDumpCreateMTableClause(TableDef *tableDes, char *stable,
int numOfCols, FILE *fp, char* dbName) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
errorPrint("%s() LN%d, failed to allocate %d memory\n",
__func__, __LINE__, COMMAND_SIZE);
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, stable);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
}
assert(counter < numOfCols);
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, ", %s",
tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
static int writeSchemaToAvro(char *jsonAvroSchema) static int writeSchemaToAvro(char *jsonAvroSchema)
{ {
errorPrint("%s() LN%d, TODO: implement write schema to avro", errorPrint("%s() LN%d, TODO: implement write schema to avro",
...@@ -2512,7 +2555,7 @@ static int convertNCharToReadable(char *str, int size, char *buf, int bufsize) { ...@@ -2512,7 +2555,7 @@ static int convertNCharToReadable(char *str, int size, char *buf, int bufsize) {
return 0; return 0;
} }
static void taosDumpCharset(FILE *fp) { static void dumpCharset(FILE *fp) {
char charsetline[256]; char charsetline[256];
(void)fseek(fp, 0, SEEK_SET); (void)fseek(fp, 0, SEEK_SET);
...@@ -2520,7 +2563,7 @@ static void taosDumpCharset(FILE *fp) { ...@@ -2520,7 +2563,7 @@ static void taosDumpCharset(FILE *fp) {
(void)fwrite(charsetline, strlen(charsetline), 1, fp); (void)fwrite(charsetline, strlen(charsetline), 1, fp);
} }
static void taosLoadFileCharset(FILE *fp, char *fcharset) { static void loadFileCharset(FILE *fp, char *fcharset) {
char * line = NULL; char * line = NULL;
size_t line_size = 0; size_t line_size = 0;
...@@ -2652,7 +2695,7 @@ static void taosMallocDumpFiles() ...@@ -2652,7 +2695,7 @@ static void taosMallocDumpFiles()
} }
} }
static void taosFreeDumpFiles() static void freeDumpFiles()
{ {
for (int i = 0; i < g_tsSqlFileNum; i++) { for (int i = 0; i < g_tsSqlFileNum; i++) {
tfree(g_tsDumpInSqlFiles[i]); tfree(g_tsDumpInSqlFiles[i]);
...@@ -2720,7 +2763,7 @@ static FILE* taosOpenDumpInFile(char *fptr) { ...@@ -2720,7 +2763,7 @@ static FILE* taosOpenDumpInFile(char *fptr) {
return f; return f;
} }
static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset, static int dumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
char* encode, char* fileName) { char* encode, char* fileName) {
int read_len = 0; int read_len = 0;
char * cmd = NULL; char * cmd = NULL;
...@@ -2777,7 +2820,7 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset, ...@@ -2777,7 +2820,7 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
return 0; return 0;
} }
static void* taosDumpInWorkThreadFp(void *arg) static void* dumpInWorkThreadFp(void *arg)
{ {
threadInfo *pThread = (threadInfo*)arg; threadInfo *pThread = (threadInfo*)arg;
setThreadName("dumpInWorkThrd"); setThreadName("dumpInWorkThrd");
...@@ -2791,14 +2834,14 @@ static void* taosDumpInWorkThreadFp(void *arg) ...@@ -2791,14 +2834,14 @@ static void* taosDumpInWorkThreadFp(void *arg)
} }
fprintf(stderr, ", Success Open input file: %s\n", fprintf(stderr, ", Success Open input file: %s\n",
SQLFileName); SQLFileName);
taosDumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName); dumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName);
} }
} }
return NULL; return NULL;
} }
static void taosStartDumpInWorkThreads() static void startDumpInWorkThreads()
{ {
pthread_attr_t thattr; pthread_attr_t thattr;
threadInfo *pThread; threadInfo *pThread;
...@@ -2830,7 +2873,7 @@ static void taosStartDumpInWorkThreads() ...@@ -2830,7 +2873,7 @@ static void taosStartDumpInWorkThreads()
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr, if (pthread_create(&(pThread->threadID), &thattr,
taosDumpInWorkThreadFp, (void*)pThread) != 0) { dumpInWorkThreadFp, (void*)pThread) != 0) {
errorPrint("%s() LN%d, thread:%d failed to start\n", errorPrint("%s() LN%d, thread:%d failed to start\n",
__func__, __LINE__, pThread->threadIndex); __func__, __LINE__, pThread->threadIndex);
exit(0); exit(0);
...@@ -2847,7 +2890,7 @@ static void taosStartDumpInWorkThreads() ...@@ -2847,7 +2890,7 @@ static void taosStartDumpInWorkThreads()
free(threadObj); free(threadObj);
} }
static int taosDumpIn() { static int dumpIn() {
assert(g_args.isDumpIn); assert(g_args.isDumpIn);
TAOS *taos = NULL; TAOS *taos = NULL;
...@@ -2876,19 +2919,19 @@ static int taosDumpIn() { ...@@ -2876,19 +2919,19 @@ static int taosDumpIn() {
} }
fprintf(stderr, "Success Open input file: %s\n", g_tsDbSqlFile); fprintf(stderr, "Success Open input file: %s\n", g_tsDbSqlFile);
taosLoadFileCharset(fp, g_tsCharset); loadFileCharset(fp, g_tsCharset);
taosDumpInOneFile(taos, fp, g_tsCharset, g_args.encode, dumpInOneFile(taos, fp, g_tsCharset, g_args.encode,
g_tsDbSqlFile); g_tsDbSqlFile);
} }
taos_close(taos); taos_close(taos);
if (0 != tsSqlFileNumOfTbls) { if (0 != tsSqlFileNumOfTbls) {
taosStartDumpInWorkThreads(); startDumpInWorkThreads();
} }
taosFreeDumpFiles(); freeDumpFiles();
return 0; return 0;
} }
...@@ -3009,7 +3052,7 @@ int main(int argc, char *argv[]) { ...@@ -3009,7 +3052,7 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n", fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn() < 0) { if (dumpIn() < 0) {
ret = -1; ret = -1;
} }
} else { } else {
...@@ -3017,7 +3060,7 @@ int main(int argc, char *argv[]) { ...@@ -3017,7 +3060,7 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n", fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut() < 0) { if (dumpOut() < 0) {
ret = -1; ret = -1;
} else { } else {
fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n"); fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册