未验证 提交 40910f6b 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Feature/sangshuduo/td 4068 taosdemo stmt (#6237)

* merge with develop branch.

change query/tests/CMakeLists.txt to allow unused function and variable.

* refactor data generating.

* refactor.

* refactor.

* refactor.

* refactor.

* refactor

* add prepare stmt function.

* refactor get rand timestamp.

* fix windows compile error.

* copy logic of generate data for stmt.

* insert data basically works now.

* fix windows compile issue.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 cadf32d3
......@@ -929,6 +929,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(argv[i], "BIGINT")
&& strcasecmp(argv[i], "DOUBLE")
&& strcasecmp(argv[i], "BINARY")
&& strcasecmp(argv[i], "TIMESTAMP")
&& strcasecmp(argv[i], "NCHAR")) {
printHelp();
errorPrint("%s", "-b: Invalid data_type!\n");
......@@ -950,6 +951,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
&& strcasecmp(token, "BIGINT")
&& strcasecmp(token, "DOUBLE")
&& strcasecmp(token, "BINARY")
&& strcasecmp(token, "TIMESTAMP")
&& strcasecmp(token, "NCHAR")) {
printHelp();
free(g_dupstr);
......@@ -2970,7 +2972,7 @@ static void* createTable(void *sarg)
}
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t startFrom, int64_t ntables,
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
......@@ -3014,10 +3016,10 @@ static int startMultiThreadCreateChildTable(
return -1;
}
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->use_metric = true;
pThreadInfo->cols = cols;
pThreadInfo->minDelay = UINT64_MAX;
......@@ -3055,15 +3057,15 @@ static void createChildTables() {
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
__func__, __LINE__, g_totalChildTables, startFrom);
__func__, __LINE__, g_totalChildTables, tableFrom);
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl,
startFrom,
tableFrom,
g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
}
......@@ -4691,8 +4693,8 @@ static int getRowDataFromSample(
static int64_t generateStbRowData(
SSuperTable* stbInfo,
char* recBuf, int64_t timestamp
) {
char* recBuf, int64_t timestamp)
{
int64_t dataLen = 0;
char *pstr = recBuf;
int64_t maxLen = MAX_DATA_SIZE;
......@@ -4720,23 +4722,23 @@ static int64_t generateStbRowData(
dataLen += snprintf(pstr + dataLen, maxLen - dataLen, "\'%s\',", buf);
tmfree(buf);
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"INT", 3)) {
"INT", strlen("INT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_int());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BIGINT", 6)) {
"BIGINT", strlen("BIGINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%"PRId64",", rand_bigint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"FLOAT", 5)) {
"FLOAT", strlen("FLOAT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_float());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"DOUBLE", 6)) {
"DOUBLE", strlen("DOUBLE"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%f,", rand_double());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"SMALLINT", 8)) {
"SMALLINT", strlen("SMALLINT"))) {
dataLen += snprintf(pstr + dataLen, maxLen - dataLen,
"%d,", rand_smallint());
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
......@@ -4792,6 +4794,8 @@ static int64_t generateData(char *recBuf, char **data_type,
pstr += sprintf(pstr, ",%d", rand_int());
} else if (strcasecmp(data_type[i % c], "BIGINT") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "TIMESTAMP") == 0) {
pstr += sprintf(pstr, ",%" PRId64, rand_bigint());
} else if (strcasecmp(data_type[i % c], "FLOAT") == 0) {
pstr += sprintf(pstr, ",%10.4f", rand_float());
} else if (strcasecmp(data_type[i % c], "DOUBLE") == 0) {
......@@ -4922,7 +4926,7 @@ static void getTableName(char *pTblName,
static int64_t generateDataTailWithoutStb(
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime,
uint64_t recordFrom, int64_t startTime,
/* int64_t *pSamplePos, */int64_t *dataLen) {
uint64_t len = 0;
......@@ -4958,9 +4962,9 @@ static int64_t generateDataTailWithoutStb(
verbosePrint("%s() LN%d len=%"PRIu64" k=%"PRIu64" \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
startFrom ++;
recordFrom ++;
if (startFrom >= insertRows) {
if (recordFrom >= insertRows) {
break;
}
}
......@@ -4989,7 +4993,7 @@ static int32_t generateStbDataTail(
SSuperTable* superTblInfo,
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t startFrom, int64_t startTime,
uint64_t recordFrom, int64_t startTime,
int64_t *pSamplePos, int64_t *dataLen) {
uint64_t len = 0;
......@@ -5038,9 +5042,9 @@ static int32_t generateStbDataTail(
verbosePrint("%s() LN%d len=%"PRIu64" k=%ud \nbuffer=%s\n",
__func__, __LINE__, len, k, buffer);
startFrom ++;
recordFrom ++;
if (startFrom >= insertRows) {
if (recordFrom >= insertRows) {
break;
}
}
......@@ -5237,6 +5241,7 @@ static int64_t generateInterlaceDataWithoutStb(
static int32_t prepareStbStmt(SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName, uint32_t batch, uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime, char *buffer)
{
uint32_t k;
......@@ -5256,7 +5261,7 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
return ret;
}
void *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
char *bindArray = malloc(sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("Failed to allocate %d bind params\n", batch);
return -1;
......@@ -5268,32 +5273,176 @@ static int32_t prepareStbStmt(SSuperTable *stbInfo,
} else {
tsRand = false;
}
for (k = 0; k < batch; k++) {
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
for (int i = 0; i <= stbInfo->columnCount; i ++) {
TAOS_BIND *bind = (TAOS_BIND *)bindArray + (sizeof(TAOS_BIND) * i);
if (i == 0) {
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
int64_t ts;
if (tsRand) {
ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
ts = startTime + stbInfo->timeStampStep * k;
}
bind->buffer = &ts;
} else {
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (tsRand) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * k;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < stbInfo->columnCount; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1)));
if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BINARY", strlen("BINARY"))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary = (char *)ptr;
rand_string(bind_binary, stbInfo->columns[i].dataLen);
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
bind->buffer_length = stbInfo->columns[i].dataLen;
bind->buffer = bind_binary;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"NCHAR", strlen("NCHAR"))) {
if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar = (char *)ptr;
rand_string(bind_nchar, stbInfo->columns[i].dataLen);
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = (int32_t *)ptr;
*bind_int = rand_int();
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = (int64_t *)ptr;
*bind_bigint = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *)ptr;
*bind_float = rand_float();
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)ptr;
*bind_double = rand_double();
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = (int16_t *)ptr;
*bind_smallint = rand_smallint();
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)ptr;
*bind_tinyint = rand_tinyint();
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)ptr;
*bind_bool = rand_bool();
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if (0 == strncasecmp(stbInfo->columns[i].dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *)ptr;
*bind_ts2 = rand_bigint();
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
errorPrint( "No support data type: %s\n",
stbInfo->columns[i].dataType);
return -1;
}
}
taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
// if msg > 3MB, break
}
taos_stmt_add_batch(stmt);
taos_stmt_bind_param(stmt, bindArray);
taos_stmt_add_batch(stmt);
k++;
recordFrom ++;
if (recordFrom >= insertRows) {
break;
}
}
return k;
}
......@@ -5304,7 +5453,7 @@ static int32_t generateStbProgressiveData(
int64_t tableSeq,
char *dbName, char *buffer,
int64_t insertRows,
uint64_t startFrom, int64_t startTime, int64_t *pSamplePos,
uint64_t recordFrom, int64_t startTime, int64_t *pSamplePos,
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
......@@ -5327,7 +5476,7 @@ static int32_t generateStbProgressiveData(
return generateStbDataTail(superTblInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, startFrom,
insertRows, recordFrom,
startTime,
pSamplePos, &dataLen);
}
......@@ -5342,7 +5491,7 @@ static int64_t generateProgressiveDataWithoutStb(
/* int64_t tableSeq, */
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
uint64_t startFrom, int64_t startTime, /*int64_t *pSamplePos, */
uint64_t recordFrom, int64_t startTime, /*int64_t *pSamplePos, */
int64_t *pRemainderBufLen)
{
assert(buffer != NULL);
......@@ -5363,7 +5512,7 @@ static int64_t generateProgressiveDataWithoutStb(
int64_t dataLen;
return generateDataTailWithoutStb(
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, startFrom,
g_args.num_of_RPR, pstr, *pRemainderBufLen, insertRows, recordFrom,
startTime,
/*pSamplePos, */&dataLen);
}
......@@ -5677,7 +5826,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
generated = prepareStbStmt(superTblInfo,
pThreadInfo->stmt,
tableName, g_args.num_of_RPR,
insertRows, start_time, pstr);
insertRows, i, start_time, pstr);
} else {
generated = generateStbProgressiveData(
superTblInfo,
......@@ -5965,7 +6114,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
int64_t ntables = 0;
uint64_t startFrom;
uint64_t tableFrom;
if (superTblInfo) {
int64_t limit;
......@@ -5992,7 +6141,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
ntables = limit;
startFrom = offset;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
......@@ -6024,7 +6173,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
offset);
} else {
ntables = g_args.num_of_tables;
startFrom = 0;
tableFrom = 0;
}
taos_close(taos0);
......@@ -6101,10 +6250,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
......@@ -6743,15 +6892,15 @@ static int queryTestProcess() {
b = ntables % threads;
}
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infosOfSub + i;
pThreadInfo->threadID = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
pThreadInfo->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = i < b ? tableFrom + a : tableFrom + a - 1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, superTableQuery, pThreadInfo);
}
......@@ -7200,17 +7349,17 @@ static int subscribeTestProcess() {
}
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
uint64_t startFrom = 0;
uint64_t tableFrom = 0;
for (int j = 0; j < threads; j++) {
uint64_t seq = i * threads + j;
threadInfo *pThreadInfo = infosOfStable + seq;
pThreadInfo->threadID = seq;
pThreadInfo->querySeq = i;
pThreadInfo->start_table_from = startFrom;
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = j<b?a+1:a;
pThreadInfo->end_table_to = j<b?startFrom+a:startFrom+a-1;
startFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->end_table_to = j<b?tableFrom+a:tableFrom+a-1;
tableFrom = pThreadInfo->end_table_to + 1;
pThreadInfo->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfStable + seq,
NULL, superSubscribe, pThreadInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册