提交 08db90ba 编写于 作者: S Shuduo Sang

merge with master branch.

...@@ -730,8 +730,16 @@ static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporte ...@@ -730,8 +730,16 @@ static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporte
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
pQueryInfo->window = *win; pQueryInfo->window = *win;
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
__compar_fn_t func = getComparFunc(t1->padding, 0);
return func(t1->tag, t2->tag);
} }
int32_t tidTagsCompar(const void* p1, const void* p2) { int32_t tidTagsCompar(const void* p1, const void* p2) {
...@@ -742,28 +750,7 @@ int32_t tidTagsCompar(const void* p1, const void* p2) { ...@@ -742,28 +750,7 @@ int32_t tidTagsCompar(const void* p1, const void* p2) {
return (t1->vgId > t2->vgId) ? 1 : -1; return (t1->vgId > t2->vgId) ? 1 : -1;
} }
tstr* tag1 = (tstr*) t1->tag; return tagValCompar(p1, p2);
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return strncmp(tag1->data, tag2->data, tag1->len);
}
int32_t tagValCompar(const void* p1, const void* p2) {
const STidTags* t1 = (const STidTags*) varDataVal(p1);
const STidTags* t2 = (const STidTags*) varDataVal(p2);
tstr* tag1 = (tstr*) t1->tag;
tstr* tag2 = (tstr*) t2->tag;
if (tag1->len != tag2->len) {
return (tag1->len > tag2->len)? 1: -1;
}
return memcmp(tag1->data, tag2->data, tag1->len);
} }
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) { void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables) {
...@@ -889,6 +876,12 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq ...@@ -889,6 +876,12 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true; return true;
} }
static void setTidTagType(SJoinSupporter* p, uint8_t type) {
for (int32_t i = 0; i < p->num; ++i) {
STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize);
tag->padding = type;
}
}
static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) { static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pParentSql, SArray* resList) {
int16_t joinNum = pParentSql->subState.numOfSub; int16_t joinNum = pParentSql->subState.numOfSub;
...@@ -908,6 +901,8 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -908,6 +901,8 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
for (int32_t i = 0; i < joinNum; i++) { for (int32_t i = 0; i < joinNum; i++) {
SJoinSupporter* p = pParentSql->pSubs[i]->param; SJoinSupporter* p = pParentSql->pSubs[i]->param;
setTidTagType(p, pColSchema->type);
ctxlist[i].p = p; ctxlist[i].p = p;
ctxlist[i].res = taosArrayInit(p->num, size); ctxlist[i].res = taosArrayInit(p->num, size);
......
...@@ -54,6 +54,7 @@ ...@@ -54,6 +54,7 @@
#include "tutil.h" #include "tutil.h"
#define STMT_IFACE_ENABLED 0 #define STMT_IFACE_ENABLED 0
#define NANO_SECOND_ENABLED 0
#define REQ_EXTRA_BUF_LEN 1024 #define REQ_EXTRA_BUF_LEN 1024
#define RESP_BUF_LEN 4096 #define RESP_BUF_LEN 4096
...@@ -66,16 +67,9 @@ extern char configDir[]; ...@@ -66,16 +67,9 @@ extern char configDir[];
#define STR_INSERT_INTO "INSERT INTO " #define STR_INSERT_INTO "INSERT INTO "
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
#define MAX_RECORDS_PER_REQ 32766 #define MAX_RECORDS_PER_REQ 32766
#define HEAD_BUFF_LEN 1024*24 // 16*1024 + (192+32)*2 + insert into .. #define HEAD_BUFF_LEN TSDB_MAX_COLUMNS*24 // 16*MAX_COLUMNS + (192+32)*2 + insert into ..
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2) #define BUFFER_SIZE (65536*2)
...@@ -84,20 +78,17 @@ enum TEST_MODE { ...@@ -84,20 +78,17 @@ enum TEST_MODE {
#define MAX_PASSWORD_SIZE 64 #define MAX_PASSWORD_SIZE 64
#define MAX_HOSTNAME_SIZE 64 #define MAX_HOSTNAME_SIZE 64
#define MAX_TB_NAME_SIZE 64 #define MAX_TB_NAME_SIZE 64
#define MAX_DATA_SIZE (16*1024)+20 // max record len: 16*1024, timestamp string and ,('') need extra space #define MAX_DATA_SIZE (16*TSDB_MAX_COLUMNS)+20 // max record len: 16*MAX_COLUMNS, timestamp string and ,('') need extra space
#define MAX_NUM_DATATYPE 10
#define OPT_ABORT 1 /* –abort */ #define OPT_ABORT 1 /* –abort */
#define STRING_LEN 60000 #define STRING_LEN 60000
#define MAX_PREPARED_RAND 1000000 #define MAX_PREPARED_RAND 1000000
#define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255. #define MAX_FILE_NAME_LEN 256 // max file name length on linux is 255.
#define MAX_SAMPLES_ONCE_FROM_FILE 10000 #define MAX_SAMPLES_ONCE_FROM_FILE 10000
#define MAX_NUM_DATATYPE 10 #define MAX_NUM_COLUMNS (TSDB_MAX_COLUMNS - 1) // exclude first column timestamp
#define MAX_DB_COUNT 8 #define MAX_DB_COUNT 8
#define MAX_SUPER_TABLE_COUNT 200 #define MAX_SUPER_TABLE_COUNT 200
#define MAX_COLUMN_COUNT 1024
#define MAX_TAG_COUNT 128
#define MAX_QUERY_SQL_COUNT 100 #define MAX_QUERY_SQL_COUNT 100
#define MAX_QUERY_SQL_LENGTH 1024 #define MAX_QUERY_SQL_LENGTH 1024
...@@ -108,6 +99,13 @@ enum TEST_MODE { ...@@ -108,6 +99,13 @@ enum TEST_MODE {
#define DEFAULT_TIMESTAMP_STEP 1 #define DEFAULT_TIMESTAMP_STEP 1
enum TEST_MODE {
INSERT_TEST, // 0
QUERY_TEST, // 1
SUBSCRIBE_TEST, // 2
INVAID_TEST
};
typedef enum CREATE_SUB_TALBE_MOD_EN { typedef enum CREATE_SUB_TALBE_MOD_EN {
PRE_CREATE_SUBTBL, PRE_CREATE_SUBTBL,
AUTO_CREATE_SUBTBL, AUTO_CREATE_SUBTBL,
...@@ -218,7 +216,7 @@ typedef struct SArguments_S { ...@@ -218,7 +216,7 @@ typedef struct SArguments_S {
bool performance_print; bool performance_print;
char * output_file; char * output_file;
bool async_mode; bool async_mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_COLUMNS + 1];
uint32_t len_of_binary; uint32_t len_of_binary;
uint32_t num_of_CPR; uint32_t num_of_CPR;
uint32_t num_of_threads; uint32_t num_of_threads;
...@@ -231,7 +229,7 @@ typedef struct SArguments_S { ...@@ -231,7 +229,7 @@ typedef struct SArguments_S {
int64_t num_of_DPT; int64_t num_of_DPT;
int abort; int abort;
uint32_t disorderRatio; // 0: no disorder, >0: x% uint32_t disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms, us or ns. accordig to database precision
uint32_t method_of_delete; uint32_t method_of_delete;
char ** arg_list; char ** arg_list;
uint64_t totalInsertRows; uint64_t totalInsertRows;
...@@ -259,10 +257,10 @@ typedef struct SSuperTable_S { ...@@ -259,10 +257,10 @@ typedef struct SSuperTable_S {
int64_t childTblLimit; int64_t childTblLimit;
uint64_t childTblOffset; uint64_t childTblOffset;
// int multiThreadWriteOneTbl; // 0: no, 1: yes // int multiThreadWriteOneTbl; // 0: no, 1: yes
uint32_t interlaceRows; // uint32_t interlaceRows; //
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms, us or ns. according to database precision
uint64_t maxSqlLen; // uint64_t maxSqlLen; //
uint64_t insertInterval; // insert interval, will override global insert interval uint64_t insertInterval; // insert interval, will override global insert interval
...@@ -274,9 +272,9 @@ typedef struct SSuperTable_S { ...@@ -274,9 +272,9 @@ typedef struct SSuperTable_S {
char tagsFile[MAX_FILE_NAME_LEN]; char tagsFile[MAX_FILE_NAME_LEN];
uint32_t columnCount; uint32_t columnCount;
StrColumn columns[MAX_COLUMN_COUNT]; StrColumn columns[TSDB_MAX_COLUMNS];
uint32_t tagCount; uint32_t tagCount;
StrColumn tags[MAX_TAG_COUNT]; StrColumn tags[TSDB_MAX_TAGS];
char* childTblName; char* childTblName;
char* colsOfCreateChildTable; char* colsOfCreateChildTable;
...@@ -320,7 +318,7 @@ typedef struct { ...@@ -320,7 +318,7 @@ typedef struct {
} SDbInfo; } SDbInfo;
typedef struct SDbCfg_S { typedef struct SDbCfg_S {
// int maxtablesPerVnode; // int maxtablesPerVnode;
uint32_t minRows; // 0 means default uint32_t minRows; // 0 means default
uint32_t maxRows; // 0 means default uint32_t maxRows; // 0 means default
int comp; int comp;
...@@ -572,7 +570,7 @@ SArguments g_args = { ...@@ -572,7 +570,7 @@ SArguments g_args = {
0, // test_mode 0, // test_mode
"127.0.0.1", // host "127.0.0.1", // host
6030, // port 6030, // port
TAOSC_IFACE, // iface INTERFACE_BUT, // iface
"root", // user "root", // user
#ifdef _TD_POWER_ #ifdef _TD_POWER_
"powerdb", // password "powerdb", // password
...@@ -640,7 +638,7 @@ static FILE * g_fpOfInsertResult = NULL; ...@@ -640,7 +638,7 @@ static FILE * g_fpOfInsertResult = NULL;
#define performancePrint(fmt, ...) \ #define performancePrint(fmt, ...) \
do { if (g_args.performance_print) \ do { if (g_args.performance_print) \
fprintf(stderr, "VERB: "fmt, __VA_ARGS__); } while(0) fprintf(stderr, "PERF: "fmt, __VA_ARGS__); } while(0)
#define errorPrint(fmt, ...) \ #define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0) do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
...@@ -731,7 +729,7 @@ static void printHelp() { ...@@ -731,7 +729,7 @@ static void printHelp() {
"The number of columns per record. Default is ", "The number of columns per record. Default is ",
DEFAULT_DATATYPE_NUM, DEFAULT_DATATYPE_NUM,
". Max values is ", ". Max values is ",
MAX_NUM_DATATYPE); MAX_NUM_COLUMNS);
printf("%s%s%s%s\n", indent, indent, indent, printf("%s%s%s%s\n", indent, indent, indent,
"All of the new column(s) type is INT. If use -b to specify column type, -l will be ignored."); "All of the new column(s) type is INT. If use -b to specify column type, -l will be ignored.");
printf("%s%s%s%s\n", indent, "-T", indent, printf("%s%s%s%s\n", indent, "-T", indent,
...@@ -937,16 +935,16 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -937,16 +935,16 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
arguments->num_of_CPR = atoi(argv[++i]); arguments->num_of_CPR = atoi(argv[++i]);
if (arguments->num_of_CPR > MAX_NUM_DATATYPE) { if (arguments->num_of_CPR > MAX_NUM_COLUMNS) {
printf("WARNING: max acceptible columns count is %d\n", MAX_NUM_DATATYPE); printf("WARNING: max acceptible columns count is %d\n", MAX_NUM_COLUMNS);
prompt(); prompt();
arguments->num_of_CPR = MAX_NUM_DATATYPE; arguments->num_of_CPR = MAX_NUM_COLUMNS;
} }
for (int col = DEFAULT_DATATYPE_NUM; col < arguments->num_of_CPR; col ++) { for (int col = DEFAULT_DATATYPE_NUM; col < arguments->num_of_CPR; col ++) {
arguments->datatype[col] = "INT"; arguments->datatype[col] = "INT";
} }
for (int col = arguments->num_of_CPR; col < MAX_NUM_DATATYPE; col++) { for (int col = arguments->num_of_CPR; col < MAX_NUM_COLUMNS; col++) {
arguments->datatype[col] = NULL; arguments->datatype[col] = NULL;
} }
...@@ -999,7 +997,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -999,7 +997,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
arguments->datatype[index++] = token; arguments->datatype[index++] = token;
token = strsep(&running, ","); token = strsep(&running, ",");
if (index >= MAX_NUM_DATATYPE) break; if (index >= MAX_NUM_COLUMNS) break;
} }
arguments->datatype[index] = NULL; arguments->datatype[index] = NULL;
} }
...@@ -1095,7 +1093,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -1095,7 +1093,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
int columnCount; int columnCount;
for (columnCount = 0; columnCount < MAX_NUM_DATATYPE; columnCount ++) { for (columnCount = 0; columnCount < MAX_NUM_COLUMNS; columnCount ++) {
if (g_args.datatype[columnCount] == NULL) { if (g_args.datatype[columnCount] == NULL) {
break; break;
} }
...@@ -1120,7 +1118,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -1120,7 +1118,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
arguments->use_metric ? "true" : "false"); arguments->use_metric ? "true" : "false");
if (*(arguments->datatype)) { if (*(arguments->datatype)) {
printf("# Specified data type: "); printf("# Specified data type: ");
for (int i = 0; i < MAX_NUM_DATATYPE; i++) for (int i = 0; i < MAX_NUM_COLUMNS; i++)
if (arguments->datatype[i]) if (arguments->datatype[i])
printf("%s,", arguments->datatype[i]); printf("%s,", arguments->datatype[i]);
else else
...@@ -1433,8 +1431,13 @@ static int printfInsertMeta() { ...@@ -1433,8 +1431,13 @@ static int printfInsertMeta() {
else else
printf("\ntaosdemo is simulating random data as you request..\n\n"); printf("\ntaosdemo is simulating random data as you request..\n\n");
if (g_args.iface != INTERFACE_BUT) {
// first time if no iface specified
printf("interface: \033[33m%s\033[0m\n", printf("interface: \033[33m%s\033[0m\n",
(g_args.iface==TAOSC_IFACE)?"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); (g_args.iface==TAOSC_IFACE)?"taosc":
(g_args.iface==REST_IFACE)?"rest":"stmt");
}
printf("host: \033[33m%s:%u\033[0m\n", printf("host: \033[33m%s:%u\033[0m\n",
g_Dbs.host, g_Dbs.port); g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
...@@ -1512,7 +1515,10 @@ static int printfInsertMeta() { ...@@ -1512,7 +1515,10 @@ static int printfInsertMeta() {
} }
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { #if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))) {
printf(" precision: \033[33m%s\033[0m\n", printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
} else { } else {
...@@ -1702,6 +1708,9 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1702,6 +1708,9 @@ static void printfInsertMetaToFile(FILE* fp) {
} }
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ns", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
fprintf(fp, " precision: %s\n", fprintf(fp, " precision: %s\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
...@@ -1902,10 +1911,12 @@ static void printfQueryMeta() { ...@@ -1902,10 +1911,12 @@ static void printfQueryMeta() {
static char* formatTimestamp(char* buf, int64_t val, int precision) { static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt; time_t tt;
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000000);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000); tt = (time_t)(val / 1000000);
#if NANO_SECOND_ENABLED == 1
} if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
#endif
} else { } else {
tt = (time_t)(val / 1000); tt = (time_t)(val / 1000);
} }
...@@ -1925,10 +1936,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { ...@@ -1925,10 +1936,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
struct tm* ptm = localtime(&tt); struct tm* ptm = localtime(&tt);
size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm); size_t pos = strftime(buf, 32, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) { if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", (int)(val % 1000000)); sprintf(buf + pos, ".%06d", (int)(val % 1000000));
#if NANO_SECOND_ENABLED == 1
} else if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
#endif
} else { } else {
sprintf(buf + pos, ".%03d", (int)(val % 1000)); sprintf(buf + pos, ".%03d", (int)(val % 1000));
} }
...@@ -2952,6 +2965,10 @@ static int createDatabasesAndStables() { ...@@ -2952,6 +2965,10 @@ static int createDatabasesAndStables() {
" fsync %d", g_Dbs.db[i].dbCfg.fsync); " fsync %d", g_Dbs.db[i].dbCfg.fsync);
} }
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns")))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) { "us", strlen("us")))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
...@@ -3182,8 +3199,10 @@ static void createChildTables() { ...@@ -3182,8 +3199,10 @@ static void createChildTables() {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
// with super table // with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) if ((AUTO_CREATE_SUBTBL
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) { == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS
== g_Dbs.db[i].superTbls[j].childTblExists)) {
continue; continue;
} }
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
...@@ -3378,9 +3397,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3378,9 +3397,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
} }
int columnSize = cJSON_GetArraySize(columns); int columnSize = cJSON_GetArraySize(columns);
if ((columnSize + 1/* ts */) > MAX_COLUMN_COUNT) { if ((columnSize + 1/* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n", errorPrint("%s() LN%d, failed to read json, column size overflow, max column size is %d\n",
__func__, __LINE__, MAX_COLUMN_COUNT); __func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3436,9 +3455,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3436,9 +3455,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
} }
} }
if ((index + 1 /* ts */) > MAX_COLUMN_COUNT) { if ((index + 1 /* ts */) > MAX_NUM_COLUMNS) {
errorPrint("%s() LN%d, failed to read json, column size overflow, allowed max column size is %d\n", errorPrint("%s() LN%d, failed to read json, column size overflow, allowed max column size is %d\n",
__func__, __LINE__, MAX_COLUMN_COUNT); __func__, __LINE__, MAX_NUM_COLUMNS);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3455,9 +3474,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3455,9 +3474,9 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
} }
int tagSize = cJSON_GetArraySize(tags); int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) { if (tagSize > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, MAX_TAG_COUNT); __func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3507,17 +3526,17 @@ static bool getColumnAndTagTypeFromInsertJsonFile( ...@@ -3507,17 +3526,17 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
} }
} }
if (index > MAX_TAG_COUNT) { if (index > TSDB_MAX_TAGS) {
errorPrint("%s() LN%d, failed to read json, tags size overflow, allowed max tag count is %d\n", errorPrint("%s() LN%d, failed to read json, tags size overflow, allowed max tag count is %d\n",
__func__, __LINE__, MAX_TAG_COUNT); __func__, __LINE__, TSDB_MAX_TAGS);
goto PARSE_OVER; goto PARSE_OVER;
} }
superTbls->tagCount = index; superTbls->tagCount = index;
if ((superTbls->columnCount + superTbls->tagCount + 1 /* ts */) > MAX_COLUMN_COUNT) { if ((superTbls->columnCount + superTbls->tagCount + 1 /* ts */) > TSDB_MAX_COLUMNS) {
errorPrint("%s() LN%d, columns + tags is more than allowed max columns count: %d\n", errorPrint("%s() LN%d, columns + tags is more than allowed max columns count: %d\n",
__func__, __LINE__, MAX_COLUMN_COUNT); __func__, __LINE__, TSDB_MAX_COLUMNS);
goto PARSE_OVER; goto PARSE_OVER;
} }
ret = true; ret = true;
...@@ -4137,7 +4156,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4137,7 +4156,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
__func__, __LINE__); __func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
/* /*
cJSON *multiThreadWriteOneTbl = cJSON *multiThreadWriteOneTbl =
cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes cJSON_GetObjectItem(stbInfo, "multi_thread_write_one_tbl"); // no , yes
if (multiThreadWriteOneTbl if (multiThreadWriteOneTbl
...@@ -4154,7 +4173,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4154,7 +4173,23 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
*/ */
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows"); cJSON* stbInterlaceRows = cJSON_GetObjectItem(stbInfo, "interlace_rows");
if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) { if (stbInterlaceRows && stbInterlaceRows->type == cJSON_Number) {
if (stbInterlaceRows->valueint < 0) { if (stbInterlaceRows->valueint < 0) {
...@@ -4163,15 +4198,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4163,15 +4198,15 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint; g_Dbs.db[i].superTbls[j].interlaceRows = stbInterlaceRows->valueint;
// rows per table need be less than insert batch
if (g_Dbs.db[i].superTbls[j].interlaceRows > g_args.num_of_RPR) { if (g_Dbs.db[i].superTbls[j].interlaceRows > g_Dbs.db[i].superTbls[j].insertRows) {
printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > num_of_records_per_req %u\n\n", printf("NOTICE: db[%d].superTbl[%d]'s interlace rows value %u > insert_rows %"PRId64"\n\n",
i, j, g_Dbs.db[i].superTbls[j].interlaceRows, i, j, g_Dbs.db[i].superTbls[j].interlaceRows,
g_args.num_of_RPR); g_Dbs.db[i].superTbls[j].insertRows);
printf(" interlace rows value will be set to num_of_records_per_req %u\n\n", printf(" interlace rows value will be set to insert_rows %"PRId64"\n\n",
g_args.num_of_RPR); g_Dbs.db[i].superTbls[j].insertRows);
prompt(); prompt();
g_Dbs.db[i].superTbls[j].interlaceRows = g_args.num_of_RPR; g_Dbs.db[i].superTbls[j].interlaceRows = g_Dbs.db[i].superTbls[j].insertRows;
} }
} else if (!stbInterlaceRows) { } else if (!stbInterlaceRows) {
g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_Dbs.db[i].superTbls[j].interlaceRows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
...@@ -4208,22 +4243,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -4208,22 +4243,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
if (insertRows->valueint < 0) {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
} else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else {
errorPrint("%s() LN%d, failed to read json, insert_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval"); cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) { if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint; g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
...@@ -4980,12 +4999,22 @@ static int64_t generateData(char *recBuf, char **data_type, ...@@ -4980,12 +4999,22 @@ static int64_t generateData(char *recBuf, char **data_type,
bool b = rand_bool() & 1; bool b = rand_bool() & 1;
pstr += sprintf(pstr, ",%s", b ? "true" : "false"); pstr += sprintf(pstr, ",%s", b ? "true" : "false");
} else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) { } else if (strcasecmp(data_type[i % columnCount], "BINARY") == 0) {
char *s = malloc(lenOfBinary); char *s = malloc(lenOfBinary + 1);
if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
exit(-1);
}
rand_string(s, lenOfBinary); rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s); pstr += sprintf(pstr, ",\"%s\"", s);
free(s); free(s);
} else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) { } else if (strcasecmp(data_type[i % columnCount], "NCHAR") == 0) {
char *s = malloc(lenOfBinary); char *s = malloc(lenOfBinary + 1);
if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
exit(-1);
}
rand_string(s, lenOfBinary); rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s); pstr += sprintf(pstr, ",\"%s\"", s);
free(s); free(s);
...@@ -5042,13 +5071,17 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -5042,13 +5071,17 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
uint16_t iface; uint16_t iface;
if (superTblInfo) if (superTblInfo)
iface = superTblInfo->iface; iface = superTblInfo->iface;
else {
if (g_args.iface == INTERFACE_BUT)
iface = TAOSC_IFACE;
else else
iface = g_args.iface; iface = g_args.iface;
}
debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, debugPrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, __func__, __LINE__,
(g_args.iface==TAOSC_IFACE)? (iface==TAOSC_IFACE)?
"taosc":(g_args.iface==REST_IFACE)?"rest":"stmt"); "taosc":(iface==REST_IFACE)?"rest":"stmt");
switch(iface) { switch(iface) {
case TAOSC_IFACE: case TAOSC_IFACE:
...@@ -5138,12 +5171,18 @@ static int32_t generateDataTailWithoutStb( ...@@ -5138,12 +5171,18 @@ static int32_t generateDataTailWithoutStb(
char **data_type = g_args.datatype; char **data_type = g_args.datatype;
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
if (g_args.disorderRatio) {
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
startTime + getTSRandTail( startTime + getTSRandTail(
(int64_t) DEFAULT_TIMESTAMP_STEP, k, (int64_t) DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio, g_args.disorderRatio,
g_args.disorderRange), g_args.disorderRange),
lenOfBinary); lenOfBinary);
} else {
retLen = generateData(data, data_type,
startTime + (int64_t) (DEFAULT_TIMESTAMP_STEP* k),
lenOfBinary);
}
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
...@@ -5202,7 +5241,7 @@ static int32_t generateStbDataTail( ...@@ -5202,7 +5241,7 @@ static int32_t generateStbDataTail(
verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n", verbosePrint("%s() LN%d batch=%u buflen=%"PRId64"\n",
__func__, __LINE__, batch, remainderBufLen); __func__, __LINE__, batch, remainderBufLen);
int32_t k = 0; int32_t k;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
...@@ -5651,10 +5690,15 @@ static int32_t prepareStmtWithoutStb( ...@@ -5651,10 +5690,15 @@ static int32_t prepareStmtWithoutStb(
bind_ts = (int64_t *)ptr; bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (g_args.disorderRatio) {
*bind_ts = startTime + getTSRandTail( *bind_ts = startTime + getTSRandTail(
(int64_t)DEFAULT_TIMESTAMP_STEP, k, (int64_t)DEFAULT_TIMESTAMP_STEP, k,
g_args.disorderRatio, g_args.disorderRatio,
g_args.disorderRange); g_args.disorderRange);
} else {
*bind_ts = startTime + (int64_t)(DEFAULT_TIMESTAMP_STEP * k);
}
bind->buffer_length = sizeof(int64_t); bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts; bind->buffer = bind_ts;
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
...@@ -5739,7 +5783,7 @@ static int32_t prepareStbStmt( ...@@ -5739,7 +5783,7 @@ static int32_t prepareStbStmt(
bind_ts = (int64_t *)ptr; bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (sourceRand) { if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail( *bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k, stbInfo->timeStampStep, k,
stbInfo->disorderRatio, stbInfo->disorderRatio,
...@@ -5806,6 +5850,7 @@ static int32_t prepareStbStmt( ...@@ -5806,6 +5850,7 @@ static int32_t prepareStbStmt(
if (!sourceRand) { if (!sourceRand) {
(*pSamplePos) ++; (*pSamplePos) ++;
} }
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
break; break;
} }
...@@ -5815,6 +5860,43 @@ static int32_t prepareStbStmt( ...@@ -5815,6 +5860,43 @@ static int32_t prepareStbStmt(
free(bindArray); free(bindArray);
return k; return k;
} }
static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
g_args.num_of_RPR,
insertRows, 0, startTime,
pSamplePos);
}
static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
pSamplePos);
}
#endif #endif
static int32_t generateStbProgressiveData( static int32_t generateStbProgressiveData(
...@@ -5888,7 +5970,9 @@ static void printStatPerThread(threadInfo *pThreadInfo) ...@@ -5888,7 +5970,9 @@ static void printStatPerThread(threadInfo *pThreadInfo)
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows, pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0))); (pThreadInfo->totalDelay/1000.0)?
(double)(pThreadInfo->totalAffectedRows/(pThreadInfo->totalDelay/1000.0)):
FLT_MAX);
} }
// sync write interlace data // sync write interlace data
...@@ -5987,7 +6071,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5987,7 +6071,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint32_t recOfBatch = 0; uint32_t recOfBatch = 0;
for (uint32_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
char tableName[TSDB_TABLE_NAME_LEN]; char tableName[TSDB_TABLE_NAME_LEN];
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
...@@ -6004,7 +6088,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6004,7 +6088,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt( generated = prepareStbStmtInterlace(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
...@@ -6233,7 +6317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6233,7 +6317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (superTblInfo) { if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) { if (superTblInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
generated = prepareStbStmt( generated = prepareStbStmtProgressive(
superTblInfo, superTblInfo,
pThreadInfo->stmt, pThreadInfo->stmt,
tableName, tableName,
...@@ -6467,7 +6551,7 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6467,7 +6551,7 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
} }
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision,SSuperTable* superTblInfo) { char* precision, SSuperTable* superTblInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
...@@ -6475,6 +6559,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6475,6 +6559,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
timePrec = TSDB_TIME_PRECISION_MILLI; timePrec = TSDB_TIME_PRECISION_MILLI;
} else if (0 == strncasecmp(precision, "us", 2)) { } else if (0 == strncasecmp(precision, "us", 2)) {
timePrec = TSDB_TIME_PRECISION_MICRO; timePrec = TSDB_TIME_PRECISION_MICRO;
#if NANO_SECOND_ENABLED == 1
} else if (0 == strncasecmp(precision, "ns", 2)) {
timePrec = TSDB_TIME_PRECISION_NANO;
#endif
} else { } else {
errorPrint("Not support precision: %s\n", precision); errorPrint("Not support precision: %s\n", precision);
exit(-1); exit(-1);
...@@ -6661,13 +6749,26 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6661,13 +6749,26 @@ static void startMultiThreadInsertData(int threads, char* db_name,
char buffer[3000]; char buffer[3000];
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, "INSERT INTO ? values(?");
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1); tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) { for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?"); pstr += sprintf(pstr, ",?");
} }
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){ if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
...@@ -6682,19 +6783,19 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6682,19 +6783,19 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pThreadInfo->taos = NULL; pThreadInfo->taos = NULL;
} }
/* if ((NULL == superTblInfo) /* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) { || (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/ */
pThreadInfo->start_table_from = tableFrom; pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a; pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1; pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1; tableFrom = pThreadInfo->end_table_to + 1;
/* } else { /* } else {
pThreadInfo->start_table_from = 0; pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount; pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint(); pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
} }
*/ */
tsem_init(&(pThreadInfo->lock_sem), 0, 0); tsem_init(&(pThreadInfo->lock_sem), 0, 0);
if (ASYNC_MODE == g_Dbs.asyncMode) { if (ASYNC_MODE == g_Dbs.asyncMode) {
pthread_create(pids + i, NULL, asyncWrite, pThreadInfo); pthread_create(pids + i, NULL, asyncWrite, pThreadInfo);
...@@ -6751,34 +6852,40 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -6751,34 +6852,40 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t end = taosGetTimestampMs(); int64_t end = taosGetTimestampMs();
int64_t t = end - start; int64_t t = end - start;
double tInMs = t/1000.0;
if (superTblInfo) { if (superTblInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) { if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n", "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
} }
} else { } else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t / 1000.0, g_args.totalInsertRows, tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) { if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n", "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t * 1000.0, g_args.totalInsertRows, tInMs, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (tInMs)?
(double)(g_args.totalInsertRows/tInMs):FLT_MAX);
} }
} }
...@@ -7938,7 +8045,12 @@ static void setParaFromArg(){ ...@@ -7938,7 +8045,12 @@ static void setParaFromArg(){
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix,
g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20); g_args.tb_prefix, TSDB_TABLE_NAME_LEN - 20);
tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].dataSource, "rand", MAX_TB_NAME_SIZE);
if (g_args.iface == INTERFACE_BUT) {
g_Dbs.db[0].superTbls[0].iface = TAOSC_IFACE;
} else {
g_Dbs.db[0].superTbls[0].iface = g_args.iface; g_Dbs.db[0].superTbls[0].iface = g_args.iface;
}
tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp, tstrncpy(g_Dbs.db[0].superTbls[0].startTimestamp,
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP; g_Dbs.db[0].superTbls[0].timeStampStep = DEFAULT_TIMESTAMP_STEP;
...@@ -7947,7 +8059,7 @@ static void setParaFromArg(){ ...@@ -7947,7 +8059,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len; g_Dbs.db[0].superTbls[0].maxSqlLen = g_args.max_sql_len;
g_Dbs.db[0].superTbls[0].columnCount = 0; g_Dbs.db[0].superTbls[0].columnCount = 0;
for (int i = 0; i < MAX_NUM_DATATYPE; i++) { for (int i = 0; i < MAX_NUM_COLUMNS; i++) {
if (data_type[i] == NULL) { if (data_type[i] == NULL) {
break; break;
} }
......
...@@ -267,6 +267,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) { ...@@ -267,6 +267,10 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) { if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f); metaLen += (int32_t)fwrite(pBlock->tag.pz, 1, (size_t)pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
float tfloat = (float)pBlock->tag.dKey;
metaLen += (int32_t)fwrite(&tfloat, 1, (size_t) pBlock->tag.nLen, pTSBuf->f);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) {
metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.nLen, 1, sizeof(pBlock->tag.nLen), pTSBuf->f);
metaLen += (int32_t)fwrite(&pBlock->tag.i64, 1, (size_t) pBlock->tag.nLen, pTSBuf->f); metaLen += (int32_t)fwrite(&pBlock->tag.i64, 1, (size_t) pBlock->tag.nLen, pTSBuf->f);
...@@ -351,6 +355,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { ...@@ -351,6 +355,11 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f); sz = fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
UNUSED(sz); UNUSED(sz);
} else if (pBlock->tag.nType == TSDB_DATA_TYPE_FLOAT) {
float tfloat = 0;
sz = fread(&tfloat, (size_t) pBlock->tag.nLen, 1, pTSBuf->f);
pBlock->tag.dKey = (double)tfloat;
UNUSED(sz);
} else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value } else if (pBlock->tag.nType != TSDB_DATA_TYPE_NULL) { //TODO check the return value
sz = fread(&pBlock->tag.i64, (size_t) pBlock->tag.nLen, 1, pTSBuf->f); sz = fread(&pBlock->tag.i64, (size_t) pBlock->tag.nLen, 1, pTSBuf->f);
UNUSED(sz); UNUSED(sz);
......
...@@ -3361,6 +3361,10 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) { ...@@ -3361,6 +3361,10 @@ static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
} }
static void* destroyTableCheckInfo(SArray* pTableCheckInfo) { static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
if (pTableCheckInfo == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(pTableCheckInfo); size_t size = taosArrayGetSize(pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i); STableCheckInfo* p = taosArrayGet(pTableCheckInfo, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册