提交 48535f8b 编写于 作者: W wenzhouwww

Merge branch 'master' into test/udf_for_master

......@@ -3011,7 +3011,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
if (taosArrayGetSize(pItem->pNode->Expr.paramList) <= 0) {
if (pItem->pNode->Expr.paramList == NULL || taosArrayGetSize(pItem->pNode->Expr.paramList) <= 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg13);
}
......
......@@ -586,6 +586,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
int disorderRatio, int disorderRange);
static bool getInfoFromJsonFile(char* file);
static void init_rand_data();
static int regexMatch(const char *s, const char *reg, int cflags);
/* ************ Global variables ************ */
......@@ -803,7 +804,7 @@ static void printHelp() {
printf("%s%s%s%s\n", indent, "-q, --query-mode=MODE", "\t\t",
"Query mode -- 0: SYNC, 1: ASYNC. By default use SYNC.");
printf("%s%s%s%s\n", indent, "-b, --data-type=DATATYPE", "\t",
"The data_type of columns, By default use: FLOAT, INT, FLOAT.");
"The data_type of columns, By default use: FLOAT,INT,FLOAT. NCHAR and BINARY can also use custom length. Eg: NCHAR(16),BINARY(8)");
printf("%s%s%s%s%d\n", indent, "-w, --binwidth=WIDTH", "\t\t",
"The width of data_type 'BINARY' or 'NCHAR'. By default use ",
g_args.binwidth);
......@@ -985,36 +986,55 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->performance_print = true;
} else if ((0 == strncmp(argv[i], "-P", strlen("-P")))
|| (0 == strncmp(argv[i], "--port", strlen("--port")))) {
uint64_t port;
char strPort[BIGINT_BUFF_LEN];
if (2 == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg(argv[0], "P");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
} else if (isStringNumber(argv[i+1])) {
tstrncpy(strPort, argv[++i], BIGINT_BUFF_LEN);
} else {
errorPrintReqArg2(argv[0], "P");
exit(EXIT_FAILURE);
}
arguments->port = atoi(argv[++i]);
} else if (0 == strncmp(argv[i], "--port=", strlen("--port="))) {
if (isStringNumber((char *)(argv[i] + strlen("--port=")))) {
arguments->port = atoi((char *)(argv[i]+strlen("--port=")));
tstrncpy(strPort, (char *)(argv[i]+strlen("--port=")), BIGINT_BUFF_LEN);
} else {
errorPrintReqArg2(argv[0], "--port");
exit(EXIT_FAILURE);
}
} else if (0 == strncmp(argv[i], "-P", strlen("-P"))) {
if (isStringNumber((char *)(argv[i] + strlen("-P")))) {
arguments->port = atoi((char *)(argv[i]+strlen("-P")));
tstrncpy(strPort, (char *)(argv[i]+strlen("-P")), BIGINT_BUFF_LEN);
} else {
errorPrintReqArg2(argv[0], "--port");
exit(EXIT_FAILURE);
}
} else if (strlen("--port") == strlen(argv[i])) {
if (argc == i+1) {
errorPrintReqArg3(argv[0], "--port");
exit(EXIT_FAILURE);
} else if (!isStringNumber(argv[i+1])) {
} else if (isStringNumber(argv[i+1])) {
tstrncpy(strPort, argv[++i], BIGINT_BUFF_LEN);
} else {
errorPrintReqArg2(argv[0], "--port");
exit(EXIT_FAILURE);
}
arguments->port = atoi(argv[++i]);
} else {
errorUnrecognized(argv[0], argv[i]);
exit(EXIT_FAILURE);
}
port = atoi(strPort);
if (port > 65535) {
errorWrongValue("taosdump", "-P or --port", strPort);
exit(EXIT_FAILURE);
}
arguments->port = (uint16_t)port;
} else if ((0 == strncmp(argv[i], "-I", strlen("-I")))
|| (0 == strncmp(argv[i], "--interface", strlen("--interface")))) {
if (2 == strlen(argv[i])) {
......@@ -1579,9 +1599,10 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(dataType, "SMALLINT")
&& strcasecmp(dataType, "BIGINT")
&& strcasecmp(dataType, "DOUBLE")
&& strcasecmp(dataType, "BINARY")
&& strcasecmp(dataType, "TIMESTAMP")
&& strcasecmp(dataType, "NCHAR")
&& !regexMatch(dataType,
"^(NCHAR|BINARY)(\\([1-9][0-9]*\\))?$",
REG_ICASE | REG_EXTENDED)
&& strcasecmp(dataType, "UTINYINT")
&& strcasecmp(dataType, "USMALLINT")
&& strcasecmp(dataType, "UINT")
......@@ -1603,9 +1624,13 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->data_type[0] = TSDB_DATA_TYPE_FLOAT;
} else if (0 == strcasecmp(dataType, "DOUBLE")) {
arguments->data_type[0] = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strcasecmp(dataType, "BINARY")) {
} else if (1 == regexMatch(dataType,
"^BINARY(\\([1-9][0-9]*\\))?$",
REG_ICASE | REG_EXTENDED)) {
arguments->data_type[0] = TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(dataType, "NCHAR")) {
} else if (1 == regexMatch(dataType,
"^NCHAR(\\([1-9][0-9]*\\))?$",
REG_ICASE | REG_EXTENDED)) {
arguments->data_type[0] = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strcasecmp(dataType, "BOOL")) {
arguments->data_type[0] = TSDB_DATA_TYPE_BOOL;
......@@ -1638,9 +1663,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "SMALLINT")
&& strcasecmp(token, "BIGINT")
&& strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY")
&& strcasecmp(token, "TIMESTAMP")
&& strcasecmp(token, "NCHAR")
&& !regexMatch(token, "^(NCHAR|BINARY)(\\([1-9][0-9]*\\))?$", REG_ICASE | REG_EXTENDED)
&& strcasecmp(token, "UTINYINT")
&& strcasecmp(token, "USMALLINT")
&& strcasecmp(token, "UINT")
......@@ -1663,9 +1687,11 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->data_type[index] = TSDB_DATA_TYPE_DOUBLE;
} else if (0 == strcasecmp(token, "TINYINT")) {
arguments->data_type[index] = TSDB_DATA_TYPE_TINYINT;
} else if (0 == strcasecmp(token, "BINARY")) {
} else if (1 == regexMatch(token, "^BINARY(\\([1-9][0-9]*\\))?$", REG_ICASE |
REG_EXTENDED)) {
arguments->data_type[index] = TSDB_DATA_TYPE_BINARY;
} else if (0 == strcasecmp(token, "NCHAR")) {
} else if (1 == regexMatch(token, "^NCHAR(\\([1-9][0-9]*\\))?$", REG_ICASE |
REG_EXTENDED)) {
arguments->data_type[index] = TSDB_DATA_TYPE_NCHAR;
} else if (0 == strcasecmp(token, "BOOL")) {
arguments->data_type[index] = TSDB_DATA_TYPE_BOOL;
......@@ -2660,6 +2686,8 @@ static int printfInsertMeta() {
}
}
if (g_args.use_metric) {
printf(" super table count: \033[33m%"PRIu64"\033[0m\n",
g_Dbs.db[i].superTblCount);
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
......@@ -2735,7 +2763,7 @@ static int printfInsertMeta() {
g_Dbs.db[i].superTbls[j].sampleFile);
printf(" tagsFile: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].tagsFile);
printf(" columnCount: \033[33m%d\033[0m\n",
printf(" columnCount: \033[33m%d\033[0m\n ",
g_Dbs.db[i].superTbls[j].columnCount);
for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) {
//printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen);
......@@ -2743,7 +2771,7 @@ static int printfInsertMeta() {
"binary", 6))
|| (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType,
"nchar", 5))) {
printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k,
printf("column[%d]:\033[33m%s(%d)\033[0m ", k,
g_Dbs.db[i].superTbls[j].columns[k].dataType,
g_Dbs.db[i].superTbls[j].columns[k].dataLen);
} else {
......@@ -2771,6 +2799,14 @@ static int printfInsertMeta() {
}
printf("\n");
}
} else {
printf(" childTblCount: \033[33m%"PRId64"\033[0m\n",
g_args.ntables);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n",
g_args.insertRows);
}
printf("\n");
}
......@@ -4271,6 +4307,10 @@ static int createSuperTable(
len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len,
"T%d %s,", tagIndex, "BIGINT UNSIGNED");
lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + BIGINT_BUFF_LEN;
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
len += snprintf(tags + len, TSDB_MAX_TAGS_LEN - len,
"T%d %s,", tagIndex, "TIMESTAMP");
lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + TIMESTAMP_BUFF_LEN;
} else {
taos_close(taos);
free(command);
......@@ -10263,7 +10303,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
&stbInfo->childTblName, &childTblCount,
limit,
offset);
ntables = childTblCount; // CBD
ntables = childTblCount;
} else {
ntables = g_args.ntables;
tableFrom = 0;
......@@ -10282,8 +10322,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
b = ntables % threads;
}
if ((stbInfo)
&& (stbInfo->iface == REST_IFACE)) {
if (g_args.iface == REST_IFACE || ((stbInfo) && (stbInfo->iface == REST_IFACE))) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
ERROR_EXIT("convert host to server address");
......@@ -11734,6 +11773,8 @@ static void initOfQueryMeta() {
}
static void setParaFromArg() {
char type[20];
char length[20];
if (g_args.host) {
tstrncpy(g_Dbs.host, g_args.host, MAX_HOSTNAME_SIZE);
} else {
......@@ -11815,7 +11856,17 @@ static void setParaFromArg() {
g_Dbs.db[0].superTbls[0].columns[i].data_type = data_type[i];
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
dataType[i], min(DATATYPE_BUFF_LEN, strlen(dataType[i]) + 1));
if (1 == regexMatch(dataType[i], "^(NCHAR|BINARY)(\\([1-9][0-9]*\\))$", REG_ICASE |
REG_EXTENDED)) {
sscanf(dataType[i], "%[^(](%[^)]", type, length);
g_Dbs.db[0].superTbls[0].columns[i].dataLen = atoi(length);
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
type, min(DATATYPE_BUFF_LEN, strlen(type) + 1));
} else {
g_Dbs.db[0].superTbls[0].columns[i].dataLen = g_args.binwidth;
tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType,
dataType[i], min(DATATYPE_BUFF_LEN, strlen(dataType[i]) + 1));
}
g_Dbs.db[0].superTbls[0].columnCount++;
}
......
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdio.h>
#include <pthread.h>
#include <iconv.h>
#include <sys/stat.h>
#include <sys/syscall.h>
......@@ -26,6 +28,12 @@
#include "tsdb.h"
#include "tutil.h"
#define AVRO_SUPPORT 0
#if AVRO_SUPPORT == 1
#include <avro.h>
#endif
#define TSDB_SUPPORT_NANOSECOND 1
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255
......@@ -38,8 +46,8 @@
static int converStringToReadable(char *str, int size, char *buf, int bufsize);
static int convertNCharToReadable(char *str, int size, char *buf, int bufsize);
static void taosDumpCharset(FILE *fp);
static void taosLoadFileCharset(FILE *fp, char *fcharset);
static void dumpCharset(FILE *fp);
static void loadFileCharset(FILE *fp, char *fcharset);
typedef struct {
short bytes;
......@@ -128,19 +136,23 @@ enum _describe_table_index {
TSDB_MAX_DESCRIBE_METRIC
};
#define COL_NOTE_LEN 128
#define COL_NOTE_LEN 4
#define COL_TYPEBUF_LEN 16
#define COL_VALUEBUF_LEN 32
typedef struct {
char field[TSDB_COL_NAME_LEN + 1];
char type[16];
char field[TSDB_COL_NAME_LEN];
char type[COL_TYPEBUF_LEN];
int length;
char note[COL_NOTE_LEN];
} SColDes;
char value[COL_VALUEBUF_LEN];
char *var_value;
} ColDes;
typedef struct {
char name[TSDB_TABLE_NAME_LEN];
SColDes cols[];
} STableDef;
ColDes cols[];
} TableDef;
extern char version[];
......@@ -159,7 +171,8 @@ typedef struct {
} TableRecord;
typedef struct {
bool isStable;
bool isStb;
bool belongStb;
int64_t dumpNtbCount;
TableRecord **dumpNtbInfos;
TableRecord tableRecord;
......@@ -329,24 +342,21 @@ static resultStatistics g_resultStatistics = {0};
static FILE *g_fpOfResult = NULL;
static int g_numOfCores = 1;
static int taosDumpOut();
static int taosDumpIn();
static void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty,
static int dumpOut();
static int dumpIn();
static void dumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty,
FILE *fp);
//static int taosDumpDb(SDbInfo *dbInfo, FILE *fp, TAOS *taos);
static int dumpStable(char *table, FILE *fp, SDbInfo *dbInfo);
static int dumpCreateTableClause(STableDef *tableDes, int numOfCols,
static int dumpCreateTableClause(TableDef *tableDes, int numOfCols,
FILE *fp, char* dbName);
static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable,
int numOfCols, FILE *fp, char* dbName);
static int64_t taosDumpTable(char *tbName, char *stable,
FILE *fp, char* dbName, int precision);
static int getTableDes(
char* dbName, char *table,
TableDef *stableDes, bool isSuperTable);
static int64_t dumpTableData(FILE *fp, char *tbName,
char* dbName,
int precision,
char *jsonAvroSchema);
static int checkParam();
static void taosFreeDbInfos();
static void freeDbInfos();
struct arguments g_args = {
// connection option
......@@ -613,6 +623,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
return 0;
}
static void freeTbDes(TableDef *tableDes)
{
for (int i = 0; i < TSDB_MAX_COLUMNS; i ++) {
if (tableDes->cols[i].var_value) {
free(tableDes->cols[i].var_value);
}
}
free(tableDes);
}
static int queryDbImpl(TAOS *taos, char *command) {
TAOS_RES *res = NULL;
int32_t code = -1;
......@@ -771,7 +792,7 @@ static int getPrecisionByString(char *precision)
return -1;
}
static void taosFreeDbInfos() {
static void freeDbInfos() {
if (g_dbInfos == NULL) return;
for (int i = 0; i < g_args.dumpDbCount; i++)
tfree(g_dbInfos[i]);
......@@ -822,15 +843,20 @@ static int getTableRecordInfo(
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isStable = false;
pTableRecordInfo->isStb = false;
tstrncpy(pTableRecordInfo->tableRecord.name,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_NAME_INDEX].bytes + 1));
if (strlen((char *)row[TSDB_SHOW_TABLES_METRIC_INDEX]) > 0) {
pTableRecordInfo->belongStb = true;
tstrncpy(pTableRecordInfo->tableRecord.stable,
(char *)row[TSDB_SHOW_TABLES_METRIC_INDEX],
min(TSDB_TABLE_NAME_LEN,
fields[TSDB_SHOW_TABLES_METRIC_INDEX].bytes + 1));
} else {
pTableRecordInfo->belongStb = false;
}
break;
}
......@@ -855,7 +881,7 @@ static int getTableRecordInfo(
while ((row = taos_fetch_row(result)) != NULL) {
isSet = true;
pTableRecordInfo->isStable = true;
pTableRecordInfo->isStb = true;
tstrncpy(pTableRecordInfo->tableRecord.stable, table,
TSDB_TABLE_NAME_LEN);
break;
......@@ -892,7 +918,6 @@ static int inDatabasesSeq(
dbname = strsep(&running, ",");
}
}
return -1;
......@@ -958,7 +983,213 @@ static int getDumpDbCount()
return count;
}
static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName)
static void dumpCreateMTableClause(
char* dbName,
char *stable,
TableDef *tableDes,
int numOfCols,
FILE *fp
) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
errorPrint("%s() LN%d, failed to allocate %d memory\n",
__func__, __LINE__, COMMAND_SIZE);
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, stable);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
}
assert(counter < numOfCols);
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, ", %s",
tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].value);
}
} else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
if (tableDes->cols[counter].var_value) {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].var_value);
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].value);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
static int convertTbDesToAvroSchema(
char *dbName, char *tbName, TableDef *tableDes, int colCount,
char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
// {
// "namesapce": "database name",
// "type": "record",
// "name": "table name",
// "fields": [
// {
// "name": "col0 name",
// "type": "long"
// },
// {
// "name": "col1 name",
// "type": ["int", "null"]
// },
// {
// "name": "col2 name",
// "type": ["float", "null"]
// },
// ...
// {
// "name": "coln name",
// "type": ["string", "null"]
// }
// ]
// }
*avroSchema = (char *)calloc(1,
17 + TSDB_DB_NAME_LEN /* dbname section */
+ 17 /* type: record */
+ 11 + TSDB_TABLE_NAME_LEN /* tbname section */
+ 10 /* fields section */
+ (TSDB_COL_NAME_LEN + 11 + 16) * colCount + 4); /* fields section */
if (*avroSchema == NULL) {
errorPrint("%s() LN%d, memory allocation failed!\n", __func__, __LINE__);
return -1;
}
char *pstr = *avroSchema;
pstr += sprintf(pstr,
"{\"namespace\": \"%s\", \"type\": \"record\", \"name\": \"%s\", \"fields\": [",
dbName, tbName);
for (int i = 0; i < colCount; i ++) {
if (0 == i) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": \"%s\"",
tableDes->cols[i].field, "long");
} else {
if (strcasecmp(tableDes->cols[i].type, "binary") == 0 ||
strcasecmp(tableDes->cols[i].type, "nchar") == 0) {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, "string");
} else {
pstr += sprintf(pstr,
"{\"name\": \"%s\", \"type\": [\"%s\", \"null\"]",
tableDes->cols[i].field, tableDes->cols[i].type);
}
}
if ((i != (colCount -1))
&& (strcmp(tableDes->cols[i + 1].note, "TAG") != 0)) {
pstr += sprintf(pstr, "},");
} else {
pstr += sprintf(pstr, "}");
break;
}
}
pstr += sprintf(pstr, "]}");
debugPrint("%s() LN%d, avroSchema: %s\n", __func__, __LINE__, *avroSchema);
return 0;
}
static int64_t dumpNormalTable(
char *dbName,
char *stable,
char *tbName,
int precision,
FILE *fp
) {
int colCount = 0;
TableDef *tableDes = (TableDef *)calloc(1, sizeof(TableDef)
+ sizeof(ColDes) * TSDB_MAX_COLUMNS);
if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
dumpCreateMTableClause(dbName, stable, tableDes, colCount, fp);
} else { // dump table definition
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
dumpCreateTableClause(tableDes, colCount, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
if (0 != convertTbDesToAvroSchema(
dbName, tbName, tableDes, colCount, &jsonAvroSchema)) {
freeTbDes(tableDes);
return -1;
}
}
free(tableDes);
int64_t ret = 0;
if (!g_args.schemaonly) {
ret = dumpTableData(fp, tbName, dbName, precision,
jsonAvroSchema);
}
return ret;
}
static int64_t dumpNormalTableBelongStb(
SDbInfo *dbInfo, char *stbName, char *ntbName)
{
int64_t count = 0;
......@@ -980,21 +1211,47 @@ static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName)
return -1;
}
count = taosDumpTable(ntbName, NULL,
fp, dbInfo->name, getPrecisionByString(dbInfo->precision));
count = dumpNormalTable(
dbInfo->name,
stbName,
ntbName,
getPrecisionByString(dbInfo->precision),
fp);
fclose(fp);
return count;
}
static int64_t dumpNormalTable(FILE *fp, TAOS *taos, char *dbName, char *tbName,
char *stbName,
int precision)
static int64_t dumpNormalTableWithoutStb(SDbInfo *dbInfo, char *ntbName)
{
int64_t count = 0;
count = taosDumpTable(tbName, stbName,
fp, dbName, precision);
char tmpBuf[4096] = {0};
FILE *fp = NULL;
if (g_args.outpath[0] != 0) {
sprintf(tmpBuf, "%s/%s.%s.sql",
g_args.outpath, dbInfo->name, ntbName);
} else {
sprintf(tmpBuf, "%s.%s.sql",
dbInfo->name, ntbName);
}
fp = fopen(tmpBuf, "w");
if (fp == NULL) {
errorPrint("%s() LN%d, failed to open file %s\n",
__func__, __LINE__, tmpBuf);
return -1;
}
count = dumpNormalTable(
dbInfo->name,
NULL,
ntbName,
getPrecisionByString(dbInfo->precision),
fp);
fclose(fp);
return count;
}
......@@ -1028,12 +1285,12 @@ static void *dumpNtbOfDb(void *arg) {
debugPrint("[%d] No.\t%"PRId64" table name: %s\n",
pThreadInfo->threadIndex, i,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name);
dumpNormalTable(fp,
pThreadInfo->taos,
dumpNormalTable(
pThreadInfo->dbName,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name,
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->stable,
pThreadInfo->precision);
((TableInfo *)(g_tablesList + pThreadInfo->tableFrom+i))->name,
pThreadInfo->precision,
fp);
}
fclose(fp);
......@@ -1087,12 +1344,12 @@ static void *dumpNormalTablesOfStb(void *arg) {
debugPrint("[%d] sub table %"PRId64": name: %s\n",
pThreadInfo->threadIndex, i++, (char *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
dumpNormalTable(fp,
pThreadInfo->taos,
dumpNormalTable(
pThreadInfo->dbName,
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
pThreadInfo->stbName,
pThreadInfo->precision);
(char *)row[TSDB_SHOW_TABLES_NAME_INDEX],
pThreadInfo->precision,
fp);
}
fclose(fp);
......@@ -1278,6 +1535,34 @@ static int64_t dumpNtbOfStbByThreads(
return records;
}
static int dumpStableClasuse(SDbInfo *dbInfo, char *stbName, FILE *fp)
{
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(TableDef) + sizeof(ColDes) * TSDB_MAX_COLUMNS);
TableDef *tableDes = (TableDef *)calloc(1, sizeOfTableDes);
if (NULL == tableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
int colCount = getTableDes(dbInfo->name,
stbName, tableDes, true);
if (colCount < 0) {
free(tableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, stbName);
exit(-1);
}
dumpCreateTableClause(tableDes, colCount, fp, dbInfo->name);
free(tableDes);
return 0;
}
static int64_t dumpCreateSTableClauseOfDb(
SDbInfo *dbInfo, FILE *fp)
{
......@@ -1307,7 +1592,7 @@ static int64_t dumpCreateSTableClauseOfDb(
int64_t superTblCnt = 0;
while ((row = taos_fetch_row(res)) != NULL) {
if (0 == dumpStable(row[TSDB_SHOW_TABLES_NAME_INDEX], fp, dbInfo)) {
if (0 == dumpStableClasuse(dbInfo, row[TSDB_SHOW_TABLES_NAME_INDEX], fp)) {
superTblCnt ++;
}
}
......@@ -1389,7 +1674,7 @@ static int64_t dumpNTablesOfDb(SDbInfo *dbInfo)
static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp)
{
taosDumpCreateDbClause(dbInfo, g_args.with_property, fp);
dumpCreateDbClause(dbInfo, g_args.with_property, fp);
fprintf(g_fpOfResult, "\n#### database: %s\n",
dbInfo->name);
......@@ -1400,14 +1685,13 @@ static int64_t dumpWholeDatabase(SDbInfo *dbInfo, FILE *fp)
return dumpNTablesOfDb(dbInfo);
}
static int taosDumpOut() {
static int dumpOut() {
TAOS *taos = NULL;
TAOS_RES *result = NULL;
TAOS_ROW row;
FILE *fp = NULL;
int32_t count = 0;
TableRecordInfo tableRecordInfo;
char tmpBuf[4096] = {0};
if (g_args.outpath[0] != 0) {
......@@ -1453,7 +1737,7 @@ static int taosDumpOut() {
/* --------------------------------- Main Code -------------------------------- */
/* if (g_args.databases || g_args.all_databases) { // dump part of databases or all databases */
/* */
taosDumpCharset(fp);
dumpCharset(fp);
sprintf(command, "show databases");
result = taos_query(taos, command);
......@@ -1575,11 +1859,13 @@ static int taosDumpOut() {
g_totalDumpOutRows += records;
}
} else {
taosDumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp);
dumpCreateDbClause(g_dbInfos[0], g_args.with_property, fp);
}
int superTblCnt = 0 ;
for (int i = 1; g_args.arg_list[i]; i++) {
TableRecordInfo tableRecordInfo;
if (getTableRecordInfo(g_dbInfos[0]->name,
g_args.arg_list[i],
&tableRecordInfo) < 0) {
......@@ -1589,14 +1875,24 @@ static int taosDumpOut() {
}
int64_t records = 0;
if (tableRecordInfo.isStable) { // dump all table of this stable
int ret = dumpStable(
if (tableRecordInfo.isStb) { // dump all table of this stable
int ret = dumpStableClasuse(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable,
fp, g_dbInfos[0]);
fp);
if (ret >= 0) {
superTblCnt++;
records = dumpNtbOfStbByThreads(g_dbInfos[0], g_args.arg_list[i]);
}
} else if (tableRecordInfo.belongStb){
dumpStableClasuse(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable,
fp);
records = dumpNormalTableBelongStb(
g_dbInfos[0],
tableRecordInfo.tableRecord.stable,
g_args.arg_list[i]);
} else {
records = dumpNormalTableWithoutStb(g_dbInfos[0], g_args.arg_list[i]);
}
......@@ -1611,7 +1907,7 @@ static int taosDumpOut() {
/* Close the handle and return */
fclose(fp);
taos_free_result(result);
taosFreeDbInfos();
freeDbInfos();
fprintf(stderr, "dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return 0;
......@@ -1619,14 +1915,14 @@ _exit_failure:
fclose(fp);
taos_close(taos);
taos_free_result(result);
taosFreeDbInfos();
freeDbInfos();
errorPrint("dump out rows: %" PRId64 "\n", g_totalDumpOutRows);
return -1;
}
static int getTableDes(
char* dbName, char *table,
STableDef *stableDes, bool isSuperTable) {
TableDef *tableDes, bool isSuperTable) {
TAOS_ROW row = NULL;
TAOS_RES* res = NULL;
int colCount = 0;
......@@ -1655,22 +1951,21 @@ static int getTableDes(
TAOS_FIELD *fields = taos_fetch_fields(res);
tstrncpy(stableDes->name, table, TSDB_TABLE_NAME_LEN);
tstrncpy(tableDes->name, table, TSDB_TABLE_NAME_LEN);
while ((row = taos_fetch_row(res)) != NULL) {
tstrncpy(stableDes->cols[colCount].field,
tstrncpy(tableDes->cols[colCount].field,
(char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
min(TSDB_COL_NAME_LEN + 1,
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes + 1));
tstrncpy(stableDes->cols[colCount].type,
tstrncpy(tableDes->cols[colCount].type,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(16, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes + 1));
stableDes->cols[colCount].length =
tableDes->cols[colCount].length =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(stableDes->cols[colCount].note,
tstrncpy(tableDes->cols[colCount].note,
(char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
min(COL_NOTE_LEN,
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes + 1));
colCount++;
}
......@@ -1683,10 +1978,10 @@ static int getTableDes(
// if child-table have tag, using select tagName from table to get tagValue
for (int i = 0 ; i < colCount; i++) {
if (strcmp(stableDes->cols[i].note, "TAG") != 0) continue;
if (strcmp(tableDes->cols[i].note, "TAG") != 0) continue;
sprintf(sqlstr, "select %s from %s.%s",
stableDes->cols[i].field, dbName, table);
tableDes->cols[i].field, dbName, table);
res = taos_query(taos, sqlstr);
code = taos_errno(res);
......@@ -1710,7 +2005,7 @@ static int getTableDes(
}
if (row[TSDB_SHOW_TABLES_NAME_INDEX] == NULL) {
sprintf(stableDes->cols[i].note, "%s", "NULL");
sprintf(tableDes->cols[i].note, "%s", "NUL");
taos_free_result(res);
res = NULL;
taos_close(taos);
......@@ -1722,58 +2017,82 @@ static int getTableDes(
//int32_t* length = taos_fetch_lengths(tmpResult);
switch (fields[0].type) {
case TSDB_DATA_TYPE_BOOL:
sprintf(stableDes->cols[i].note, "%d",
sprintf(tableDes->cols[i].value, "%d",
((((int32_t)(*((char *)row[TSDB_SHOW_TABLES_NAME_INDEX]))) == 1) ? 1 : 0));
break;
case TSDB_DATA_TYPE_TINYINT:
sprintf(stableDes->cols[i].note, "%d",
sprintf(tableDes->cols[i].value, "%d",
*((int8_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_SMALLINT:
sprintf(stableDes->cols[i].note, "%d", *((int16_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
sprintf(tableDes->cols[i].value, "%d",
*((int16_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_INT:
sprintf(stableDes->cols[i].note, "%d", *((int32_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
sprintf(tableDes->cols[i].value, "%d",
*((int32_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_BIGINT:
sprintf(stableDes->cols[i].note, "%" PRId64 "", *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
sprintf(tableDes->cols[i].value, "%" PRId64 "",
*((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_FLOAT:
sprintf(stableDes->cols[i].note, "%f", GET_FLOAT_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
sprintf(tableDes->cols[i].value, "%f",
GET_FLOAT_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_DOUBLE:
sprintf(stableDes->cols[i].note, "%f", GET_DOUBLE_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
sprintf(tableDes->cols[i].value, "%f",
GET_DOUBLE_VAL(row[TSDB_SHOW_TABLES_NAME_INDEX]));
break;
case TSDB_DATA_TYPE_BINARY:
{
memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note));
stableDes->cols[i].note[0] = '\'';
char tbuf[COL_NOTE_LEN];
converStringToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN);
char* pstr = stpcpy(&(stableDes->cols[i].note[1]), tbuf);
*(pstr++) = '\'';
break;
memset(tableDes->cols[i].value, 0,
sizeof(tableDes->cols[i].value));
int len = strlen((char *)row[0]);
// FIXME for long value
if (len < (COL_VALUEBUF_LEN - 2)) {
tableDes->cols[i].value[0] = '\'';
converStringToReadable(
(char *)row[0],
length[0],
tableDes->cols[i].value + 1,
len);
tableDes->cols[i].value[len+1] = '\'';
} else {
tableDes->cols[i].var_value = calloc(1, len + 2);
if (tableDes->cols[i].var_value == NULL) {
errorPrint("%s() LN%d, memory alalocation failed!\n",
__func__, __LINE__);
taos_free_result(res);
return -1;
}
tableDes->cols[i].var_value[0] = '\'';
converStringToReadable((char *)row[0],
length[0],
(char *)(tableDes->cols[i].var_value + 1), len);
tableDes->cols[i].var_value[len+1] = '\'';
}
break;
case TSDB_DATA_TYPE_NCHAR:
{
memset(stableDes->cols[i].note, 0, sizeof(stableDes->cols[i].note));
memset(tableDes->cols[i].value, 0, sizeof(tableDes->cols[i].note));
char tbuf[COL_NOTE_LEN-2]; // need reserve 2 bytes for ' '
convertNCharToReadable((char *)row[TSDB_SHOW_TABLES_NAME_INDEX], length[0], tbuf, COL_NOTE_LEN);
sprintf(stableDes->cols[i].note, "\'%s\'", tbuf);
sprintf(tableDes->cols[i].value, "\'%s\'", tbuf);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
sprintf(stableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
sprintf(tableDes->cols[i].value, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
#if 0
if (!g_args.mysqlFlag) {
sprintf(tableDes->cols[i].note, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
sprintf(tableDes->cols[i].value, "%" PRId64 "", *(int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
} else {
char buf[64] = "\0";
int64_t ts = *((int64_t *)row[TSDB_SHOW_TABLES_NAME_INDEX]);
time_t tt = (time_t)(ts / 1000);
struct tm *ptm = localtime(&tt);
strftime(buf, 64, "%y-%m-%d %H:%M:%S", ptm);
sprintf(tableDes->cols[i].note, "\'%s.%03d\'", buf, (int)(ts % 1000));
sprintf(tableDes->cols[i].value, "\'%s.%03d\'", buf, (int)(ts % 1000));
}
#endif
break;
......@@ -1788,74 +2107,7 @@ static int getTableDes(
return colCount;
}
static int convertSchemaToAvroSchema(STableDef *stableDes, char **avroSchema)
{
errorPrint("%s() LN%d TODO: covert table schema to avro schema\n",
__func__, __LINE__);
return 0;
}
static int64_t taosDumpTable(
char *tbName, char *stable,
FILE *fp, char* dbName, int precision) {
int colCount = 0;
STableDef *tableDes = (STableDef *)calloc(1, sizeof(STableDef)
+ sizeof(SColDes) * TSDB_MAX_COLUMNS);
if (stable != NULL && stable[0] != '\0') { // dump table schema which is created by using super table
/*
colCount = getTableDes(stable, tableDes, taos);
if (count < 0) {
free(tableDes);
return -1;
}
dumpCreateTableClause(tableDes, count, fp);
memset(tableDes, 0, sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
*/
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create child-table using super-table
taosDumpCreateMTableClause(tableDes, stable, colCount, fp, dbName);
} else { // dump table definition
colCount = getTableDes(dbName, tbName, tableDes, false);
if (colCount < 0) {
free(tableDes);
return -1;
}
// create normal-table or super-table
dumpCreateTableClause(tableDes, colCount, fp, dbName);
}
char *jsonAvroSchema = NULL;
if (g_args.avro) {
convertSchemaToAvroSchema(tableDes, &jsonAvroSchema);
}
free(tableDes);
int64_t ret = 0;
if (!g_args.schemaonly) {
ret = dumpTableData(fp, tbName, dbName, precision,
jsonAvroSchema);
}
return ret;
}
static void taosDumpCreateDbClause(
static void dumpCreateDbClause(
SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
char sqlstr[TSDB_MAX_SQL_LEN] = {0};
......@@ -1877,35 +2129,7 @@ static void taosDumpCreateDbClause(
fprintf(fp, "%s\n\n", sqlstr);
}
static int dumpStable(char *stbName, FILE *fp, SDbInfo *dbInfo)
{
uint64_t sizeOfTableDes =
(uint64_t)(sizeof(STableDef) + sizeof(SColDes) * TSDB_MAX_COLUMNS);
STableDef *stableDes = (STableDef *)calloc(1, sizeOfTableDes);
if (NULL == stableDes) {
errorPrint("%s() LN%d, failed to allocate %"PRIu64" memory\n",
__func__, __LINE__, sizeOfTableDes);
exit(-1);
}
int colCount = getTableDes(dbInfo->name,
stbName, stableDes, true);
if (colCount < 0) {
free(stableDes);
errorPrint("%s() LN%d, failed to get stable[%s] schema\n",
__func__, __LINE__, stbName);
exit(-1);
}
dumpCreateTableClause(stableDes, colCount, fp, dbInfo->name);
free(stableDes);
return 0;
}
static int dumpCreateTableClause(STableDef *tableDes, int numOfCols,
static int dumpCreateTableClause(TableDef *tableDes, int numOfCols,
FILE *fp, char* dbName) {
int counter = 0;
int count_temp = 0;
......@@ -1956,64 +2180,6 @@ static int dumpCreateTableClause(STableDef *tableDes, int numOfCols,
return fprintf(fp, "%s\n\n", sqlstr);
}
static void taosDumpCreateMTableClause(STableDef *tableDes, char *stable,
int numOfCols, FILE *fp, char* dbName) {
int counter = 0;
int count_temp = 0;
char* tmpBuf = (char *)malloc(COMMAND_SIZE);
if (tmpBuf == NULL) {
errorPrint("%s() LN%d, failed to allocate %d memory\n",
__func__, __LINE__, COMMAND_SIZE);
return;
}
char *pstr = NULL;
pstr = tmpBuf;
pstr += sprintf(tmpBuf,
"CREATE TABLE IF NOT EXISTS %s.%s USING %s.%s TAGS (",
dbName, tableDes->name, dbName, stable);
for (; counter < numOfCols; counter++) {
if (tableDes->cols[counter].note[0] != '\0') break;
}
assert(counter < numOfCols);
count_temp = counter;
for (; counter < numOfCols; counter++) {
if (counter != count_temp) {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, ", \'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
} else {
pstr += sprintf(pstr, ", %s", tableDes->cols[counter].note);
}
} else {
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
//pstr += sprintf(pstr, "\'%s\'", tableDes->cols[counter].note);
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
} else {
pstr += sprintf(pstr, "%s", tableDes->cols[counter].note);
}
/* pstr += sprintf(pstr, "%s", tableDes->cols[counter].note); */
}
/* if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 || strcasecmp(tableDes->cols[counter].type, "nchar")
* == 0) { */
/* pstr += sprintf(pstr, "(%d)", tableDes->cols[counter].length); */
/* } */
}
pstr += sprintf(pstr, ");");
fprintf(fp, "%s\n", tmpBuf);
free(tmpBuf);
}
static int writeSchemaToAvro(char *jsonAvroSchema)
{
errorPrint("%s() LN%d, TODO: implement write schema to avro",
......@@ -2110,10 +2276,7 @@ static int64_t writeResultToSql(TAOS_RES *res, FILE *fp, char *dbName, char *tbN
case TSDB_DATA_TYPE_BINARY:
{
char tbuf[COMMAND_SIZE] = {0};
//*(pstr++) = '\'';
converStringToReadable((char *)row[col], length[col], tbuf, COMMAND_SIZE);
//pstr = stpcpy(pstr, tbuf);
//*(pstr++) = '\'';
curr_sqlstr_len += sprintf(pstr + curr_sqlstr_len, "\'%s\'", tbuf);
break;
}
......@@ -2369,7 +2532,6 @@ static int converStringToReadable(char *str, int size, char *buf, int bufsize) {
static int convertNCharToReadable(char *str, int size, char *buf, int bufsize) {
char *pstr = str;
char *pbuf = buf;
// TODO
wchar_t wc;
while (size > 0) {
if (*pstr == '\0') break;
......@@ -2393,7 +2555,7 @@ static int convertNCharToReadable(char *str, int size, char *buf, int bufsize) {
return 0;
}
static void taosDumpCharset(FILE *fp) {
static void dumpCharset(FILE *fp) {
char charsetline[256];
(void)fseek(fp, 0, SEEK_SET);
......@@ -2401,7 +2563,7 @@ static void taosDumpCharset(FILE *fp) {
(void)fwrite(charsetline, strlen(charsetline), 1, fp);
}
static void taosLoadFileCharset(FILE *fp, char *fcharset) {
static void loadFileCharset(FILE *fp, char *fcharset) {
char * line = NULL;
size_t line_size = 0;
......@@ -2533,7 +2695,7 @@ static void taosMallocDumpFiles()
}
}
static void taosFreeDumpFiles()
static void freeDumpFiles()
{
for (int i = 0; i < g_tsSqlFileNum; i++) {
tfree(g_tsDumpInSqlFiles[i]);
......@@ -2601,7 +2763,7 @@ static FILE* taosOpenDumpInFile(char *fptr) {
return f;
}
static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
static int dumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
char* encode, char* fileName) {
int read_len = 0;
char * cmd = NULL;
......@@ -2658,7 +2820,7 @@ static int taosDumpInOneFile(TAOS* taos, FILE* fp, char* fcharset,
return 0;
}
static void* taosDumpInWorkThreadFp(void *arg)
static void* dumpInWorkThreadFp(void *arg)
{
threadInfo *pThread = (threadInfo*)arg;
setThreadName("dumpInWorkThrd");
......@@ -2672,14 +2834,14 @@ static void* taosDumpInWorkThreadFp(void *arg)
}
fprintf(stderr, ", Success Open input file: %s\n",
SQLFileName);
taosDumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName);
dumpInOneFile(pThread->taos, fp, g_tsCharset, g_args.encode, SQLFileName);
}
}
return NULL;
}
static void taosStartDumpInWorkThreads()
static void startDumpInWorkThreads()
{
pthread_attr_t thattr;
threadInfo *pThread;
......@@ -2711,7 +2873,7 @@ static void taosStartDumpInWorkThreads()
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&(pThread->threadID), &thattr,
taosDumpInWorkThreadFp, (void*)pThread) != 0) {
dumpInWorkThreadFp, (void*)pThread) != 0) {
errorPrint("%s() LN%d, thread:%d failed to start\n",
__func__, __LINE__, pThread->threadIndex);
exit(0);
......@@ -2728,7 +2890,7 @@ static void taosStartDumpInWorkThreads()
free(threadObj);
}
static int taosDumpIn() {
static int dumpIn() {
assert(g_args.isDumpIn);
TAOS *taos = NULL;
......@@ -2757,19 +2919,19 @@ static int taosDumpIn() {
}
fprintf(stderr, "Success Open input file: %s\n", g_tsDbSqlFile);
taosLoadFileCharset(fp, g_tsCharset);
loadFileCharset(fp, g_tsCharset);
taosDumpInOneFile(taos, fp, g_tsCharset, g_args.encode,
dumpInOneFile(taos, fp, g_tsCharset, g_args.encode,
g_tsDbSqlFile);
}
taos_close(taos);
if (0 != tsSqlFileNumOfTbls) {
taosStartDumpInWorkThreads();
startDumpInWorkThreads();
}
taosFreeDumpFiles();
freeDumpFiles();
return 0;
}
......@@ -2890,7 +3052,7 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "# DumpIn start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpIn() < 0) {
if (dumpIn() < 0) {
ret = -1;
}
} else {
......@@ -2898,7 +3060,7 @@ int main(int argc, char *argv[]) {
fprintf(g_fpOfResult, "# DumpOut start time: %d-%02d-%02d %02d:%02d:%02d\n",
tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
if (taosDumpOut() < 0) {
if (dumpOut() < 0) {
ret = -1;
} else {
fprintf(g_fpOfResult, "\n============================== TOTAL STATISTICS ============================== \n");
......
@echo off
echo ==== start Go connector test cases test ====
cd /d %~dp0
......@@ -18,3 +19,10 @@ rem case002.bat
:: cd case002
:: case002.bat
rem cd nanosupport
rem nanoCase.bat
:: cd nanosupport
:: nanoCase.bat
......@@ -19,3 +19,4 @@ go env -w GOPROXY=https://goproxy.cn,direct
bash ./case001/case001.sh $severIp $serverPort
bash ./case002/case002.sh $severIp $serverPort
#bash ./case003/case003.sh $severIp $serverPort
bash ./nanosupport/nanoCase.sh $severIp $serverPort
......@@ -15,8 +15,7 @@ script_dir="$(dirname $(readlink -f $0))"
###### step 3: start build
cd $script_dir
rm -f go.*
go mod init demotest > /dev/null 2>&1
go mod tidy > /dev/null 2>&1
go build > /dev/null 2>&1
go mod init demotest
go build
sleep 1s
./demotest -h $1 -p $2
......@@ -44,7 +44,7 @@ func main() {
}
defer db.Close()
db.Exec("drop database if exists test")
db.Exec("create database if not exists test")
db.Exec("create database if not exists test ")
db.Exec("use test")
db.Exec("create table test (ts timestamp ,level int)")
for i := 0; i < 10; i++ {
......
package connector
import (
"context"
"fmt"
"reflect"
"time"
"github.com/taosdata/go-utils/log"
"github.com/taosdata/go-utils/tdengine/config"
"github.com/taosdata/go-utils/tdengine/connector"
tdengineExecutor "github.com/taosdata/go-utils/tdengine/executor"
)
type Executor struct {
executor *tdengineExecutor.Executor
ctx context.Context
}
var Logger = log.NewLogger("taos test")
func NewExecutor(conf *config.TDengineGo, db string, showSql bool) (*Executor, error) {
tdengineConnector, err := connector.NewTDengineConnector("go", conf)
if err != nil {
return nil, err
}
executor := tdengineExecutor.NewExecutor(tdengineConnector, db, showSql, Logger)
return &Executor{
executor: executor,
ctx: context.Background(),
}, nil
}
func (e *Executor) Execute(sql string) (int64, error) {
return e.executor.DoExec(e.ctx, sql)
}
func (e *Executor) Query(sql string) (*connector.Data, error) {
fmt.Println("query :", sql)
return e.executor.DoQuery(e.ctx, sql)
}
func (e *Executor) CheckData(row, col int, value interface{}, data *connector.Data) (bool, error) {
if data == nil {
return false, fmt.Errorf("data is nil")
}
if col >= len(data.Head) {
return false, fmt.Errorf("col out of data")
}
if row >= len(data.Data) {
return false, fmt.Errorf("row out of data")
}
dataValue := data.Data[row][col]
if dataValue == nil && value != nil {
return false, fmt.Errorf("dataValue is nil but value is not nil")
}
if dataValue == nil && value == nil {
return true, nil
}
if reflect.TypeOf(dataValue) != reflect.TypeOf(value) {
return false, fmt.Errorf("type not match expect %s got %s", reflect.TypeOf(value), reflect.TypeOf(dataValue))
}
switch value.(type) {
case time.Time:
t, _ := dataValue.(time.Time)
if value.(time.Time).Nanosecond() != t.Nanosecond() {
return false, fmt.Errorf("value not match expect %d got %d", value.(time.Time).Nanosecond(), t.Nanosecond())
}
case string:
if value.(string) != dataValue.(string) {
return false, fmt.Errorf("value not match expect %s got %s", value.(string), dataValue.(string))
}
case int8:
if value.(int8) != dataValue.(int8) {
return false, fmt.Errorf("value not match expect %d got %d", value.(int8), dataValue.(int8))
}
case int16:
if value.(int16) != dataValue.(int16) {
return false, fmt.Errorf("value not match expect %d got %d", value.(int16), dataValue.(int16))
}
case int32:
if value.(int32) != dataValue.(int32) {
return false, fmt.Errorf("value not match expect %d got %d", value.(int32), dataValue.(int32))
}
case int64:
if value.(int64) != dataValue.(int64) {
return false, fmt.Errorf("value not match expect %d got %d", value.(int64), dataValue.(int64))
}
case float32:
if value.(float32) != dataValue.(float32) {
return false, fmt.Errorf("value not match expect %f got %f", value.(float32), dataValue.(float32))
}
case float64:
if value.(float64) != dataValue.(float64) {
return false, fmt.Errorf("value not match expect %f got %f", value.(float32), dataValue.(float32))
}
case bool:
if value.(bool) != dataValue.(bool) {
return false, fmt.Errorf("value not match expect %t got %t", value.(bool), dataValue.(bool))
}
default:
return false, fmt.Errorf("unsupport type %v", reflect.TypeOf(value))
}
return true, nil
}
func (e *Executor) CheckData2(row, col int, value interface{}, data *connector.Data) {
match, err := e.CheckData(row, col, value, data)
fmt.Println("expect data is :", value)
fmt.Println("go got data is :", data.Data[row][col])
if err != nil {
fmt.Println(err)
}
if !match {
fmt.Println(" data not match")
}
/*
fmt.Println(value)
if data == nil {
// return false, fmt.Errorf("data is nil")
// fmt.Println("check failed")
}
if col >= len(data.Head) {
// return false, fmt.Errorf("col out of data")
// fmt.Println("check failed")
}
if row >= len(data.Data) {
// return false, fmt.Errorf("row out of data")
// fmt.Println("check failed")
}
dataValue := data.Data[row][col]
if dataValue == nil && value != nil {
// return false, fmt.Errorf("dataValue is nil but value is not nil")
// fmt.Println("check failed")
}
if dataValue == nil && value == nil {
// return true, nil
fmt.Println("check pass")
}
if reflect.TypeOf(dataValue) != reflect.TypeOf(value) {
// return false, fmt.Errorf("type not match expect %s got %s", reflect.TypeOf(value), reflect.TypeOf(dataValue))
fmt.Println("check failed")
}
switch value.(type) {
case time.Time:
t, _ := dataValue.(time.Time)
if value.(time.Time).Nanosecond() != t.Nanosecond() {
// return false, fmt.Errorf("value not match expect %d got %d", value.(time.Time).Nanosecond(), t.Nanosecond())
// fmt.Println("check failed")
}
case string:
if value.(string) != dataValue.(string) {
// return false, fmt.Errorf("value not match expect %s got %s", value.(string), dataValue.(string))
// fmt.Println("check failed")
}
case int8:
if value.(int8) != dataValue.(int8) {
// return false, fmt.Errorf("value not match expect %d got %d", value.(int8), dataValue.(int8))
// fmt.Println("check failed")
}
case int16:
if value.(int16) != dataValue.(int16) {
// return false, fmt.Errorf("value not match expect %d got %d", value.(int16), dataValue.(int16))
// fmt.Println("check failed")
}
case int32:
if value.(int32) != dataValue.(int32) {
// return false, fmt.Errorf("value not match expect %d got %d", value.(int32), dataValue.(int32))
// fmt.Println("check failed")
}
case int64:
if value.(int64) != dataValue.(int64) {
// return false, fmt.Errorf("value not match expect %d got %d", value.(int64), dataValue.(int64))
// fmt.Println("check failed")
}
case float32:
if value.(float32) != dataValue.(float32) {
// return false, fmt.Errorf("value not match expect %f got %f", value.(float32), dataValue.(float32))
// fmt.Println("check failed")
}
case float64:
if value.(float64) != dataValue.(float64) {
// return false, fmt.Errorf("value not match expect %f got %f", value.(float32), dataValue.(float32))
// fmt.Println("check failed")
}
case bool:
if value.(bool) != dataValue.(bool) {
// return false, fmt.Errorf("value not match expect %t got %t", value.(bool), dataValue.(bool))
// fmt.Println("check failed")
}
default:
// return false, fmt.Errorf("unsupport type %v", reflect.TypeOf(value))
// fmt.Println("check failed")
}
// return true, nil
// fmt.Println("check pass")
*/
}
func (e *Executor) CheckRow(count int, data *connector.Data) {
if len(data.Data) != count {
fmt.Println("check failed !")
}
}
@echo off
echo ==== start run nanosupport.go
del go.*
go mod init nano
go mod tidy
go build
nano.exe -h %1 -p %2
cd ..
#!/bin/bash
echo "==== start run nanosupport.go "
set +e
#set -x
script_dir="$(dirname $(readlink -f $0))"
#echo "pwd: $script_dir, para0: $0"
#execName=$0
#execName=`echo ${execName##*/}`
#goName=`echo ${execName%.*}`
###### step 3: start build
cd $script_dir
rm -f go.*
go mod init nano
go mod tidy
go build
sleep 10s
./nano -h $1 -p $2
package main
import (
"fmt"
"log"
"nano/connector"
"time"
"github.com/taosdata/go-utils/tdengine/config"
)
func main() {
e, err := connector.NewExecutor(&config.TDengineGo{
Address: "root:taosdata@/tcp(127.0.0.1:6030)/",
MaxIdle: 20,
MaxOpen: 30,
MaxLifetime: 30,
}, "db", false)
if err != nil {
panic(err)
}
prepareData(e)
data, err := e.Query("select * from tb")
if err != nil {
panic(err)
}
layout := "2006-01-02 15:04:05.999999999"
t0, _ := time.Parse(layout, "2021-06-10 00:00:00.100000001")
t1, _ := time.Parse(layout, "2021-06-10 00:00:00.150000000")
t2, _ := time.Parse(layout, "2021-06-10 00:00:00.299999999")
t3, _ := time.Parse(layout, "2021-06-10 00:00:00.300000000")
t4, _ := time.Parse(layout, "2021-06-10 00:00:00.300000001")
t5, _ := time.Parse(layout, "2021-06-10 00:00:00.999999999")
e.CheckData2(0, 0, t0, data)
e.CheckData2(1, 0, t1, data)
e.CheckData2(2, 0, t2, data)
e.CheckData2(3, 0, t3, data)
e.CheckData2(4, 0, t4, data)
e.CheckData2(5, 0, t5, data)
e.CheckData2(3, 1, int32(3), data)
e.CheckData2(4, 1, int32(5), data)
e.CheckData2(5, 1, int32(7), data)
fmt.Println(" start check nano support!")
data, _ = e.Query("select count(*) from tb where ts > 1623254400100000000 and ts < 1623254400100000002;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts > \"2021-06-10 0:00:00.100000001\" and ts < \"2021-06-10 0:00:00.160000000\";")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts > 1623254400100000000 and ts < 1623254400150000000;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts > \"2021-06-10 0:00:00.100000000\" and ts < \"2021-06-10 0:00:00.150000000\";")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts > 1623254400400000000;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts < \"2021-06-10 00:00:00.400000000\";")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb where ts < now + 400000000b;")
e.CheckData2(0, 0, int64(6), data)
data, _ = e.Query("select count(*) from tb where ts >= \"2021-06-10 0:00:00.100000001\";")
e.CheckData2(0, 0, int64(6), data)
data, _ = e.Query("select count(*) from tb where ts <= 1623254400300000000;")
e.CheckData2(0, 0, int64(4), data)
data, _ = e.Query("select count(*) from tb where ts = \"2021-06-10 0:00:00.000000000\";")
data, _ = e.Query("select count(*) from tb where ts = 1623254400150000000;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts = \"2021-06-10 0:00:00.100000001\";")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb where ts between 1623254400000000000 and 1623254400400000000;")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb where ts between \"2021-06-10 0:00:00.299999999\" and \"2021-06-10 0:00:00.300000001\";")
e.CheckData2(0, 0, int64(3), data)
data, _ = e.Query("select avg(speed) from tb interval(5000000000b);")
e.CheckRow(1, data)
data, _ = e.Query("select avg(speed) from tb interval(100000000b)")
e.CheckRow(4, data)
data, _ = e.Query("select avg(speed) from tb interval(1000b);")
e.CheckRow(5, data)
data, _ = e.Query("select avg(speed) from tb interval(1u);")
e.CheckRow(5, data)
data, _ = e.Query("select avg(speed) from tb interval(100000000b) sliding (100000000b);")
e.CheckRow(4, data)
data, _ = e.Query("select last(*) from tb")
tt, _ := time.Parse(layout, "2021-06-10 0:00:00.999999999")
e.CheckData2(0, 0, tt, data)
data, _ = e.Query("select first(*) from tb")
tt1, _ := time.Parse(layout, "2021-06-10 0:00:00.100000001")
e.CheckData2(0, 0, tt1, data)
e.Execute("insert into tb values(now + 500000000b, 6);")
data, _ = e.Query("select * from tb;")
e.CheckRow(7, data)
e.Execute("create table tb2 (ts timestamp, speed int, ts2 timestamp);")
e.Execute("insert into tb2 values(\"2021-06-10 0:00:00.100000001\", 1, \"2021-06-11 0:00:00.100000001\");")
e.Execute("insert into tb2 values(1623254400150000000, 2, 1623340800150000000);")
e.Execute("import into tb2 values(1623254400300000000, 3, 1623340800300000000);")
e.Execute("import into tb2 values(1623254400299999999, 4, 1623340800299999999);")
e.Execute("insert into tb2 values(1623254400300000001, 5, 1623340800300000001);")
e.Execute("insert into tb2 values(1623254400999999999, 7, 1623513600999999999);")
data, _ = e.Query("select * from tb2;")
tt2, _ := time.Parse(layout, "2021-06-10 0:00:00.100000001")
tt3, _ := time.Parse(layout, "2021-06-10 0:00:00.150000000")
e.CheckData2(0, 0, tt2, data)
e.CheckData2(1, 0, tt3, data)
e.CheckData2(2, 1, int32(4), data)
e.CheckData2(3, 1, int32(3), data)
tt4, _ := time.Parse(layout, "2021-06-11 00:00:00.300000001")
e.CheckData2(4, 2, tt4, data)
e.CheckRow(6, data)
data, _ = e.Query("select count(*) from tb2 where ts2 > 1623340800000000000 and ts2 < 1623340800150000000;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb2 where ts2 > \"2021-06-11 0:00:00.100000000\" and ts2 < \"2021-06-11 0:00:00.100000002\";")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb2 where ts2 > 1623340800500000000;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb2 where ts2 < \"2021-06-11 0:00:00.400000000\";")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 < now + 400000000b;")
e.CheckData2(0, 0, int64(6), data)
data, _ = e.Query("select count(*) from tb2 where ts2 >= \"2021-06-11 0:00:00.100000001\";")
e.CheckData2(0, 0, int64(6), data)
data, _ = e.Query("select count(*) from tb2 where ts2 <= 1623340800400000000;")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 = \"2021-06-11 0:00:00.000000000\";")
data, _ = e.Query("select count(*) from tb2 where ts2 = \"2021-06-11 0:00:00.300000001\";")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb2 where ts2 = 1623340800300000001;")
e.CheckData2(0, 0, int64(1), data)
data, _ = e.Query("select count(*) from tb2 where ts2 between 1623340800000000000 and 1623340800450000000;")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 between \"2021-06-11 0:00:00.299999999\" and \"2021-06-11 0:00:00.300000001\";")
e.CheckData2(0, 0, int64(3), data)
data, _ = e.Query("select count(*) from tb2 where ts2 <> 1623513600999999999;")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 <> \"2021-06-11 0:00:00.100000001\";")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 <> \"2021-06-11 0:00:00.100000000\";")
e.CheckData2(0, 0, int64(6), data)
data, _ = e.Query("select count(*) from tb2 where ts2 != 1623513600999999999;")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 != \"2021-06-11 0:00:00.100000001\";")
e.CheckData2(0, 0, int64(5), data)
data, _ = e.Query("select count(*) from tb2 where ts2 != \"2021-06-11 0:00:00.100000000\";")
e.CheckData2(0, 0, int64(6), data)
e.Execute("insert into tb2 values(now + 500000000b, 6, now +2d);")
data, _ = e.Query("select * from tb2;")
e.CheckRow(7, data)
e.Execute("create table tb3 (ts timestamp, speed int);")
_, err = e.Execute("insert into tb3 values(16232544001500000, 2);")
if err != nil {
fmt.Println("check pass! ")
}
e.Execute("insert into tb3 values(\"2021-06-10 0:00:00.123456\", 2);")
data, _ = e.Query("select * from tb3 where ts = \"2021-06-10 0:00:00.123456000\";")
e.CheckRow(1, data)
e.Execute("insert into tb3 values(\"2021-06-10 0:00:00.123456789000\", 2);")
data, _ = e.Query("select * from tb3 where ts = \"2021-06-10 0:00:00.123456789\";")
e.CheckRow(1, data)
// check timezone support
e.Execute("drop database if exists nsdb;")
e.Execute("create database nsdb precision 'ns';")
e.Execute("use nsdb;")
e.Execute("create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);")
e.Execute("insert into tb1 using st tags('2021-06-10 0:00:00.123456789' , 1 ) values('2021-06-10T0:00:00.123456789+07:00' , 1.0);")
data, _ = e.Query("select first(*) from tb1;")
ttt, _ := time.Parse(layout, "2021-06-10 01:00:00.123456789")
e.CheckData2(0, 0, ttt, data)
e.Execute("create database usdb precision 'us';")
e.Execute("use usdb;")
e.Execute("create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);")
e.Execute("insert into tb1 using st tags('2021-06-10 0:00:00.123456' , 1 ) values('2021-06-10T0:00:00.123456+07:00' , 1.0);")
data, _ = e.Query("select first(*) from tb1;")
ttt2, _ := time.Parse(layout, "2021-06-10 01:00:00.123456")
e.CheckData2(0, 0, ttt2, data)
e.Execute("drop database if exists msdb;")
e.Execute("create database msdb precision 'ms';")
e.Execute("use msdb;")
e.Execute("create stable st (ts timestamp ,speed float ) tags(time timestamp ,id int);")
e.Execute("insert into tb1 using st tags('2021-06-10 0:00:00.123' , 1 ) values('2021-06-10T0:00:00.123+07:00' , 1.0);")
data, _ = e.Query("select first(*) from tb1;")
ttt3, _ := time.Parse(layout, "2021-06-10 01:00:00.123")
e.CheckData2(0, 0, ttt3, data)
fmt.Println("all test done!")
}
func prepareData(e *connector.Executor) {
sqlList := []string{
"reset query cache;",
"drop database if exists db;",
"create database db;",
"use db;",
"reset query cache;",
"drop database if exists db;",
"create database db precision 'ns';",
"show databases;",
"use db;",
"create table tb (ts timestamp, speed int);",
"insert into tb values('2021-06-10 0:00:00.100000001', 1);",
"insert into tb values(1623254400150000000, 2);",
"import into tb values(1623254400300000000, 3);",
"import into tb values(1623254400299999999, 4);",
"insert into tb values(1623254400300000001, 5);",
"insert into tb values(1623254400999999999, 7);",
}
for _, sql := range sqlList {
err := executeSql(e, sql)
if err != nil {
log.Fatalf("prepare data error:%v, sql:%s", err, sql)
}
}
}
func executeSql(e *connector.Executor, sql string) error {
_, err := e.Execute(sql)
if err != nil {
return err
}
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册