未验证 提交 57d5f22a 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

Hotfix/sangshuduo/td 5872 taosdemo stmt improve for master (#7338)

* cherry pick from develop branch.

* cherry pick 54813175

* [TD-5872]<fix>: taosdemo stmt csv perf improve.

* rand func back to early impl.

* fix windows/mac compile error.

* cherry pick from develop branch.
Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 23e27ebb
......@@ -295,6 +295,9 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow;
char* sampleDataBuf;
#if STMT_IFACE_ENABLED == 1
char* sampleBindArray;
#endif
//int sampleRowCount;
//int sampleUsePos;
......@@ -441,6 +444,7 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S {
TAOS * taos;
TAOS_STMT *stmt;
int64_t *bind_ts;
int threadID;
char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision;
......@@ -454,7 +458,7 @@ typedef struct SThreadInfo_S {
int64_t start_time;
char* cols;
bool use_metric;
SSuperTable* superTblInfo;
SSuperTable* stbInfo;
char *buffer; // sql cmd buffer
// for async insert
......@@ -674,7 +678,7 @@ static FILE * g_fpOfInsertResult = NULL;
///////////////////////////////////////////////////
static void ERROR_EXIT(const char *msg) { perror(msg); exit(-1); }
static void ERROR_EXIT(const char *msg) { errorPrint("%s", msg); exit(-1); }
#ifndef TAOSDEMO_COMMIT_SHA1
#define TAOSDEMO_COMMIT_SHA1 "unknown"
......@@ -1136,8 +1140,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
}
if (0 == columnCount) {
perror("data type error!");
exit(-1);
ERROR_EXIT("data type error!");
}
g_args.num_of_CPR = columnCount;
......@@ -2425,14 +2428,14 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
#endif
debugPrint("%s() LN%d, sockfd=%d\n", __func__, __LINE__, sockfd);
free(request_buf);
ERROR_EXIT("ERROR opening socket");
ERROR_EXIT("opening socket");
}
int retConn = connect(sockfd, (struct sockaddr *)pServAddr, sizeof(struct sockaddr));
debugPrint("%s() LN%d connect() return %d\n", __func__, __LINE__, retConn);
if (retConn < 0) {
free(request_buf);
ERROR_EXIT("ERROR connecting");
ERROR_EXIT("connecting");
}
memset(base64_buf, 0, INPUT_BUF_LEN);
......@@ -2465,7 +2468,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
auth, strlen(sqlstr), sqlstr);
if (r >= req_buf_len) {
free(request_buf);
ERROR_EXIT("ERROR too long request");
ERROR_EXIT("too long request");
}
verbosePrint("%s() LN%d: Request:\n%s\n", __func__, __LINE__, request_buf);
......@@ -2478,7 +2481,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
bytes = write(sockfd, request_buf + sent, req_str_len - sent);
#endif
if (bytes < 0)
ERROR_EXIT("ERROR writing message to socket");
ERROR_EXIT("writing message to socket");
if (bytes == 0)
break;
sent+=bytes;
......@@ -2495,7 +2498,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
#endif
if (bytes < 0) {
free(request_buf);
ERROR_EXIT("ERROR reading response from socket");
ERROR_EXIT("reading response from socket");
}
if (bytes == 0)
break;
......@@ -2504,7 +2507,7 @@ static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port
if (received == resp_len) {
free(request_buf);
ERROR_EXIT("ERROR storing complete response from socket");
ERROR_EXIT("storing complete response from socket");
}
response_buf[RESP_BUF_LEN - 1] = '\0';
......@@ -2667,8 +2670,8 @@ static int calcRowLen(SSuperTable* superTbls) {
} else if (strcasecmp(dataType, "TIMESTAMP") == 0) {
lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else {
printf("get error data type : %s\n", dataType);
exit(-1);
errorPrint("get error data type : %s\n", dataType);
exit(EXIT_FAILURE);
}
}
......@@ -2698,8 +2701,8 @@ static int calcRowLen(SSuperTable* superTbls) {
} else if (strcasecmp(dataType, "DOUBLE") == 0) {
lenOfTagOfOneRow += superTbls->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN;
} else {
printf("get error tag type : %s\n", dataType);
exit(-1);
errorPrint("get error tag type : %s\n", dataType);
exit(EXIT_FAILURE);
}
}
......@@ -2737,7 +2740,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
taos_close(taos);
errorPrint("%s() LN%d, failed to run command %s\n",
__func__, __LINE__, command);
exit(-1);
exit(EXIT_FAILURE);
}
int64_t childTblCount = (limit < 0)?10000:limit;
......@@ -2748,7 +2751,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
taos_free_result(res);
taos_close(taos);
errorPrint("%s() LN%d, failed to allocate memory!\n", __func__, __LINE__);
exit(-1);
exit(EXIT_FAILURE);
}
}
......@@ -2759,7 +2762,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
if (0 == strlen((char *)row[0])) {
errorPrint("%s() LN%d, No.%"PRId64" table return empty name\n",
__func__, __LINE__, count);
exit(-1);
exit(EXIT_FAILURE);
}
tstrncpy(pTblName, (char *)row[0], len[0]+1);
......@@ -2775,12 +2778,12 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
(size_t)((childTblCount-count)*TSDB_TABLE_NAME_LEN));
} else {
// exit, if allocate more memory failed
errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
tmfree(childTblName);
taos_free_result(res);
taos_close(taos);
exit(-1);
errorPrint("%s() LN%d, realloc fail for save child table name of %s.%s\n",
__func__, __LINE__, dbName, sTblName);
exit(EXIT_FAILURE);
}
}
pTblName = childTblName + count * TSDB_TABLE_NAME_LEN;
......@@ -2964,10 +2967,10 @@ static int createSuperTable(
lenOfOneRow += TIMESTAMP_BUFF_LEN;
} else {
taos_close(taos);
free(command);
errorPrint("%s() LN%d, config error data type : %s\n",
__func__, __LINE__, dataType);
free(command);
exit(-1);
exit(EXIT_FAILURE);
}
}
......@@ -2976,11 +2979,11 @@ static int createSuperTable(
// save for creating child table
superTbl->colsOfCreateChildTable = (char*)calloc(len+20, 1);
if (NULL == superTbl->colsOfCreateChildTable) {
errorPrint("%s() LN%d, Failed when calloc, size:%d",
__func__, __LINE__, len+1);
taos_close(taos);
free(command);
exit(-1);
errorPrint("%s() LN%d, Failed when calloc, size:%d",
__func__, __LINE__, len+1);
exit(EXIT_FAILURE);
}
snprintf(superTbl->colsOfCreateChildTable, len+20, "(ts timestamp%s)", cols);
......@@ -3054,10 +3057,10 @@ static int createSuperTable(
lenOfTagOfOneRow += superTbl->tags[tagIndex].dataLen + DOUBLE_BUFF_LEN;
} else {
taos_close(taos);
free(command);
errorPrint("%s() LN%d, config error tag type : %s\n",
__func__, __LINE__, dataType);
free(command);
exit(-1);
exit(EXIT_FAILURE);
}
}
......@@ -3089,7 +3092,7 @@ int createDatabasesAndStables(char *command) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
......@@ -3100,35 +3103,43 @@ int createDatabasesAndStables(char *command) {
int dataLen = 0;
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s", g_Dbs.db[i].dbName);
BUFFER_SIZE - dataLen, "create database if not exists %s",
g_Dbs.db[i].dbName);
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " blocks %d", g_Dbs.db[i].dbCfg.blocks);
BUFFER_SIZE - dataLen, " blocks %d",
g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cache %d", g_Dbs.db[i].dbCfg.cache);
BUFFER_SIZE - dataLen, " cache %d",
g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " days %d", g_Dbs.db[i].dbCfg.days);
BUFFER_SIZE - dataLen, " days %d",
g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " keep %d", g_Dbs.db[i].dbCfg.keep);
BUFFER_SIZE - dataLen, " keep %d",
g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.quorum > 1) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " quorum %d", g_Dbs.db[i].dbCfg.quorum);
BUFFER_SIZE - dataLen, " quorum %d",
g_Dbs.db[i].dbCfg.quorum);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " replica %d", g_Dbs.db[i].dbCfg.replica);
BUFFER_SIZE - dataLen, " replica %d",
g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " update %d", g_Dbs.db[i].dbCfg.update);
BUFFER_SIZE - dataLen, " update %d",
g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen,
......@@ -3136,42 +3147,48 @@ int createDatabasesAndStables(char *command) {
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " minrows %d", g_Dbs.db[i].dbCfg.minRows);
BUFFER_SIZE - dataLen, " minrows %d",
g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " maxrows %d", g_Dbs.db[i].dbCfg.maxRows);
BUFFER_SIZE - dataLen, " maxrows %d",
g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " comp %d", g_Dbs.db[i].dbCfg.comp);
BUFFER_SIZE - dataLen, " comp %d",
g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " wal %d", g_Dbs.db[i].dbCfg.walLevel);
BUFFER_SIZE - dataLen, " wal %d",
g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, " cachelast %d", g_Dbs.db[i].dbCfg.cacheLast);
BUFFER_SIZE - dataLen, " cachelast %d",
g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" fsync %d", g_Dbs.db[i].dbCfg.fsync);
}
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms")))
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
#if NANO_SECOND_ENABLED == 1
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"ns", strlen("ns")))
"ns", 2))
#endif
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision,
"us", strlen("us")))) {
"us", 2))) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen,
" precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE, false)) {
taos_close(taos);
errorPrint( "\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
errorPrint( "\ncreate database %s failed!\n\n",
g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
......@@ -3218,7 +3235,7 @@ int createDatabasesAndStables(char *command) {
static void* createTable(void *sarg)
{
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("createTable");
......@@ -3229,7 +3246,7 @@ static void* createTable(void *sarg)
pThreadInfo->buffer = calloc(buff_len, 1);
if (pThreadInfo->buffer == NULL) {
errorPrint("%s() LN%d, Memory allocated failed!\n", __func__, __LINE__);
exit(-1);
exit(EXIT_FAILURE);
}
int len = 0;
......@@ -3248,11 +3265,11 @@ static void* createTable(void *sarg)
g_args.tb_prefix, i,
pThreadInfo->cols);
} else {
if (superTblInfo == NULL) {
if (stbInfo == NULL) {
free(pThreadInfo->buffer);
errorPrint("%s() LN%d, use metric, but super table info is NULL\n",
__func__, __LINE__);
free(pThreadInfo->buffer);
exit(-1);
exit(EXIT_FAILURE);
} else {
if (0 == len) {
batchNum = 0;
......@@ -3260,29 +3277,35 @@ static void* createTable(void *sarg)
len += snprintf(pThreadInfo->buffer + len,
buff_len - len, "create table ");
}
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, i);
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, i);
} else {
if (0 == stbInfo->tagSampleCount) {
free(pThreadInfo->buffer);
ERROR_EXIT("use sample file for tag, but has no content!\n");
}
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
i % superTblInfo->tagSampleCount);
stbInfo,
i % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
free(pThreadInfo->buffer);
return NULL;
ERROR_EXIT("use metric, but tag buffer is NULL\n");
}
len += snprintf(pThreadInfo->buffer + len,
buff_len - len,
"if not exists %s.%s%"PRIu64" using %s.%s tags %s ",
pThreadInfo->db_name, superTblInfo->childTblPrefix,
pThreadInfo->db_name, stbInfo->childTblPrefix,
i, pThreadInfo->db_name,
superTblInfo->sTblName, tagsValBuf);
stbInfo->sTblName, tagsValBuf);
free(tagsValBuf);
batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum)
if ((batchNum < stbInfo->batchCreateTableNum)
&& ((buff_len - len)
>= (superTblInfo->lenOfTagOfOneRow + 256))) {
>= (stbInfo->lenOfTagOfOneRow + 256))) {
continue;
}
}
......@@ -3317,14 +3340,13 @@ static void* createTable(void *sarg)
static int startMultiThreadCreateChildTable(
char* cols, int threads, uint64_t tableFrom, int64_t ntables,
char* db_name, SSuperTable* superTblInfo) {
char* db_name, SSuperTable* stbInfo) {
pthread_t *pids = calloc(1, threads * sizeof(pthread_t));
threadInfo *infos = calloc(1, threads * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
printf("malloc failed\n");
exit(-1);
ERROR_EXIT("createChildTable malloc failed\n");
}
if (threads < 1) {
......@@ -3344,7 +3366,7 @@ static int startMultiThreadCreateChildTable(
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->stbInfo = stbInfo;
verbosePrint("%s() %d db_name: %s\n", __func__, __LINE__, db_name);
pThreadInfo->taos = taos_connect(
g_Dbs.host,
......@@ -3451,26 +3473,26 @@ static void createChildTables() {
/*
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
static int readTagFromCsvFileToMem(SSuperTable * stbInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
FILE *fp = fopen(superTblInfo->tagsFile, "r");
FILE *fp = fopen(stbInfo->tagsFile, "r");
if (fp == NULL) {
printf("Failed to open tags file: %s, reason:%s\n",
superTblInfo->tagsFile, strerror(errno));
stbInfo->tagsFile, strerror(errno));
return -1;
}
if (superTblInfo->tagDataBuf) {
free(superTblInfo->tagDataBuf);
superTblInfo->tagDataBuf = NULL;
if (stbInfo->tagDataBuf) {
free(stbInfo->tagDataBuf);
stbInfo->tagDataBuf = NULL;
}
int tagCount = 10000;
int count = 0;
char* tagDataBuf = calloc(1, superTblInfo->lenOfTagOfOneRow * tagCount);
char* tagDataBuf = calloc(1, stbInfo->lenOfTagOfOneRow * tagCount);
if (tagDataBuf == NULL) {
printf("Failed to calloc, reason:%s\n", strerror(errno));
fclose(fp);
......@@ -3486,20 +3508,20 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
continue;
}
memcpy(tagDataBuf + count * superTblInfo->lenOfTagOfOneRow, line, readLen);
memcpy(tagDataBuf + count * stbInfo->lenOfTagOfOneRow, line, readLen);
count++;
if (count >= tagCount - 1) {
char *tmp = realloc(tagDataBuf,
(size_t)tagCount*1.5*superTblInfo->lenOfTagOfOneRow);
(size_t)tagCount*1.5*stbInfo->lenOfTagOfOneRow);
if (tmp != NULL) {
tagDataBuf = tmp;
tagCount = (int)(tagCount*1.5);
memset(tagDataBuf + count*superTblInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*superTblInfo->lenOfTagOfOneRow));
memset(tagDataBuf + count*stbInfo->lenOfTagOfOneRow,
0, (size_t)((tagCount-count)*stbInfo->lenOfTagOfOneRow));
} else {
// exit, if allocate more memory failed
printf("realloc fail for save tag val from %s\n", superTblInfo->tagsFile);
printf("realloc fail for save tag val from %s\n", stbInfo->tagsFile);
tmfree(tagDataBuf);
free(line);
fclose(fp);
......@@ -3508,8 +3530,8 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
}
}
superTblInfo->tagDataBuf = tagDataBuf;
superTblInfo->tagSampleCount = count;
stbInfo->tagDataBuf = tagDataBuf;
stbInfo->tagSampleCount = count;
free(line);
fclose(fp);
......@@ -3520,28 +3542,28 @@ static int readTagFromCsvFileToMem(SSuperTable * superTblInfo) {
Read 10000 lines at most. If more than 10000 lines, continue to read after using
*/
static int readSampleFromCsvFileToMem(
SSuperTable* superTblInfo) {
SSuperTable* stbInfo) {
size_t n = 0;
ssize_t readLen = 0;
char * line = NULL;
int getRows = 0;
FILE* fp = fopen(superTblInfo->sampleFile, "r");
FILE* fp = fopen(stbInfo->sampleFile, "r");
if (fp == NULL) {
errorPrint( "Failed to open sample file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
stbInfo->sampleFile, strerror(errno));
return -1;
}
assert(superTblInfo->sampleDataBuf);
memset(superTblInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow);
assert(stbInfo->sampleDataBuf);
memset(stbInfo->sampleDataBuf, 0,
MAX_SAMPLES_ONCE_FROM_FILE * stbInfo->lenOfOneRow);
while(1) {
readLen = tgetline(&line, &n, fp);
if (-1 == readLen) {
if(0 != fseek(fp, 0, SEEK_SET)) {
errorPrint( "Failed to fseek file: %s, reason:%s\n",
superTblInfo->sampleFile, strerror(errno));
stbInfo->sampleFile, strerror(errno));
fclose(fp);
return -1;
}
......@@ -3556,13 +3578,13 @@ static int readSampleFromCsvFileToMem(
continue;
}
if (readLen > superTblInfo->lenOfOneRow) {
if (readLen > stbInfo->lenOfOneRow) {
printf("sample row len[%d] overflow define schema len[%"PRIu64"], so discard this row\n",
(int32_t)readLen, superTblInfo->lenOfOneRow);
(int32_t)readLen, stbInfo->lenOfOneRow);
continue;
}
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
memcpy(stbInfo->sampleDataBuf + getRows * stbInfo->lenOfOneRow,
line, readLen);
getRows++;
......@@ -5047,6 +5069,23 @@ static void postFreeResource() {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
}
#if STMT_IFACE_ENABLED == 1
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
}
tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray);
#endif
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf);
g_Dbs.db[i].superTbls[j].tagDataBuf = NULL;
......@@ -5067,21 +5106,14 @@ static void postFreeResource() {
tmfree(g_randfloat_buff);
tmfree(g_rand_current_buff);
tmfree(g_rand_phase_buff);
tmfree(g_randdouble_buff);
}
static int getRowDataFromSample(
char* dataBuf, int64_t maxLen, int64_t timestamp,
SSuperTable* superTblInfo, int64_t* sampleUsePos)
SSuperTable* stbInfo, int64_t* sampleUsePos)
{
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
*/
*sampleUsePos = 0;
}
......@@ -5091,8 +5123,8 @@ static int getRowDataFromSample(
"(%" PRId64 ", ", timestamp);
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen,
"%s",
superTblInfo->sampleDataBuf
+ superTblInfo->lenOfOneRow * (*sampleUsePos));
stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * (*sampleUsePos));
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
(*sampleUsePos)++;
......@@ -5248,7 +5280,7 @@ static int64_t generateData(char *recBuf, char **data_type,
if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
exit(-1);
exit(EXIT_FAILURE);
}
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
......@@ -5258,7 +5290,7 @@ static int64_t generateData(char *recBuf, char **data_type,
if (s == NULL) {
errorPrint("%s() LN%d, memory allocation %d bytes failed\n",
__func__, __LINE__, lenOfBinary + 1);
exit(-1);
exit(EXIT_FAILURE);
}
rand_string(s, lenOfBinary);
pstr += sprintf(pstr, ",\"%s\"", s);
......@@ -5266,8 +5298,7 @@ static int64_t generateData(char *recBuf, char **data_type,
}
if (strlen(recBuf) > MAX_DATA_SIZE) {
perror("column length too long, abort");
exit(-1);
ERROR_EXIT("column length too long, abort");
}
}
......@@ -5278,27 +5309,27 @@ static int64_t generateData(char *recBuf, char **data_type,
return (int32_t)strlen(recBuf);
}
static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
static int prepareSampleDataForSTable(SSuperTable *stbInfo) {
char* sampleDataBuf = NULL;
sampleDataBuf = calloc(
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1);
if (sampleDataBuf == NULL) {
errorPrint("%s() LN%d, Failed to calloc %"PRIu64" Bytes, reason:%s\n",
__func__, __LINE__,
superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
stbInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE,
strerror(errno));
return -1;
}
superTblInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(superTblInfo);
stbInfo->sampleDataBuf = sampleDataBuf;
int ret = readSampleFromCsvFileToMem(stbInfo);
if (0 != ret) {
errorPrint("%s() LN%d, read sample from csv file failed.\n",
__func__, __LINE__);
tmfree(sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
stbInfo->sampleDataBuf = NULL;
return -1;
}
......@@ -5308,14 +5339,14 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
{
int32_t affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface;
if (superTblInfo)
iface = superTblInfo->iface;
if (stbInfo)
iface = stbInfo->iface;
else {
if (g_args.iface == INTERFACE_BUT)
iface = TAOSC_IFACE;
......@@ -5355,7 +5386,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
__func__, __LINE__, taos_stmt_errstr(pThreadInfo->stmt));
fprintf(stderr, "\n\033[31m === Please reduce batch number if WAL size exceeds limit. ===\033[0m\n\n");
exit(-1);
exit(EXIT_FAILURE);
}
affectedRows = k;
break;
......@@ -5363,7 +5394,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
default:
errorPrint("%s() LN%d: unknown insert mode: %d\n",
__func__, __LINE__, superTblInfo->iface);
__func__, __LINE__, stbInfo->iface);
affectedRows = 0;
}
......@@ -5373,24 +5404,24 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq)
{
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) {
if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) {
if (superTblInfo->childTblLimit > 0) {
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) {
if (AUTO_CREATE_SUBTBL != stbInfo->autoCreateTable) {
if (stbInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName +
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
stbInfo->childTblName +
(tableSeq - stbInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
stbInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
superTblInfo->childTblPrefix, tableSeq);
stbInfo->childTblPrefix, tableSeq);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
......@@ -5471,7 +5502,7 @@ static int64_t getTSRandTail(int64_t timeStampStep, int32_t seq,
}
static int32_t generateStbDataTail(
SSuperTable* superTblInfo,
SSuperTable* stbInfo,
uint32_t batch, char* buffer,
int64_t remainderBufLen, int64_t insertRows,
uint64_t recordFrom, int64_t startTime,
......@@ -5481,7 +5512,7 @@ static int32_t generateStbDataTail(
char *pstr = buffer;
bool tsRand;
if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
tsRand = true;
} else {
tsRand = false;
......@@ -5496,26 +5527,26 @@ static int32_t generateStbDataTail(
int64_t lenOfRow = 0;
if (tsRand) {
if (superTblInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(superTblInfo, data,
if (stbInfo->disorderRatio > 0) {
lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen,
startTime + getTSRandTail(
superTblInfo->timeStampStep, k,
superTblInfo->disorderRatio,
superTblInfo->disorderRange)
stbInfo->timeStampStep, k,
stbInfo->disorderRatio,
stbInfo->disorderRange)
);
} else {
lenOfRow = generateStbRowData(superTblInfo, data,
lenOfRow = generateStbRowData(stbInfo, data,
remainderBufLen,
startTime + superTblInfo->timeStampStep * k
startTime + stbInfo->timeStampStep * k
);
}
} else {
lenOfRow = getRowDataFromSample(
data,
(remainderBufLen < MAX_DATA_SIZE)?remainderBufLen:MAX_DATA_SIZE,
startTime + superTblInfo->timeStampStep * k,
superTblInfo,
startTime + stbInfo->timeStampStep * k,
stbInfo,
pSamplePos);
}
......@@ -5571,7 +5602,7 @@ static int generateSQLHeadWithoutStb(char *tableName,
}
static int generateStbSQLHead(
SSuperTable* superTblInfo,
SSuperTable* stbInfo,
char *tableName, int64_t tableSeq,
char *dbName,
char *buffer, int remainderBufLen)
......@@ -5580,14 +5611,14 @@ static int generateStbSQLHead(
char headBuf[HEAD_BUFF_LEN];
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(superTblInfo, tableSeq);
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
superTblInfo,
tableSeq % superTblInfo->tagSampleCount);
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
......@@ -5602,10 +5633,10 @@ static int generateStbSQLHead(
dbName,
tableName,
dbName,
superTblInfo->sTblName,
stbInfo->sTblName,
tagsValBuf);
tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
} else if (TBL_ALREADY_EXISTS == stbInfo->childTblExists) {
len = snprintf(
headBuf,
HEAD_BUFF_LEN,
......@@ -5630,12 +5661,12 @@ static int generateStbSQLHead(
}
static int32_t generateStbInterlaceData(
SSuperTable *superTblInfo,
threadInfo *pThreadInfo,
char *tableName, uint32_t batchPerTbl,
uint64_t i,
uint32_t batchPerTblTimes,
uint64_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
char *buffer,
int64_t insertRows,
int64_t startTime,
uint64_t *pRemainderBufLen)
......@@ -5643,8 +5674,9 @@ static int32_t generateStbInterlaceData(
assert(buffer);
char *pstr = buffer;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
int headLen = generateStbSQLHead(
superTblInfo,
stbInfo,
tableName, tableSeq, pThreadInfo->db_name,
pstr, *pRemainderBufLen);
......@@ -5664,12 +5696,12 @@ static int32_t generateStbInterlaceData(
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
int32_t k = generateStbDataTail(
superTblInfo,
stbInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
......@@ -5732,8 +5764,206 @@ static int64_t generateInterlaceDataWithoutStb(
}
#if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr, char *value)
static int32_t prepareStmtBindArrayByType(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
int32_t timePrec,
char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "binary length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_binary;
bind->buffer_type = TSDB_DATA_TYPE_BINARY;
if (value) {
bind_binary = calloc(1, strlen(value) + 1);
strncpy(bind_binary, value, strlen(value));
bind->buffer_length = strlen(bind_binary);
} else {
bind_binary = calloc(1, dataLen + 1);
rand_string(bind_binary, dataLen);
bind->buffer_length = dataLen;
}
bind->length = &bind->buffer_length;
bind->buffer = bind_binary;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"NCHAR", strlen("NCHAR"))) {
if (dataLen > TSDB_MAX_BINARY_LEN) {
errorPrint( "nchar length overflow, max size:%u\n",
(uint32_t)TSDB_MAX_BINARY_LEN);
return -1;
}
char *bind_nchar;
bind->buffer_type = TSDB_DATA_TYPE_NCHAR;
if (value) {
bind_nchar = calloc(1, strlen(value) + 1);
strncpy(bind_nchar, value, strlen(value));
} else {
bind_nchar = calloc(1, dataLen + 1);
rand_string(bind_nchar, dataLen);
}
bind->buffer_length = strlen(bind_nchar);
bind->buffer = bind_nchar;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"INT", strlen("INT"))) {
int32_t *bind_int = malloc(sizeof(int32_t));
if (value) {
*bind_int = atoi(value);
} else {
*bind_int = rand_int();
}
bind->buffer_type = TSDB_DATA_TYPE_INT;
bind->buffer_length = sizeof(int32_t);
bind->buffer = bind_int;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BIGINT", strlen("BIGINT"))) {
int64_t *bind_bigint = malloc(sizeof(int64_t));
if (value) {
*bind_bigint = atoll(value);
} else {
*bind_bigint = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_BIGINT;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_bigint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = malloc(sizeof(float));
if (value) {
*bind_float = (float)atof(value);
} else {
*bind_float = rand_float();
}
bind->buffer_type = TSDB_DATA_TYPE_FLOAT;
bind->buffer_length = sizeof(float);
bind->buffer = bind_float;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = malloc(sizeof(double));
if (value) {
*bind_double = atof(value);
} else {
*bind_double = rand_double();
}
bind->buffer_type = TSDB_DATA_TYPE_DOUBLE;
bind->buffer_length = sizeof(double);
bind->buffer = bind_double;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"SMALLINT", strlen("SMALLINT"))) {
int16_t *bind_smallint = malloc(sizeof(int16_t));
if (value) {
*bind_smallint = (int16_t)atoi(value);
} else {
*bind_smallint = rand_smallint();
}
bind->buffer_type = TSDB_DATA_TYPE_SMALLINT;
bind->buffer_length = sizeof(int16_t);
bind->buffer = bind_smallint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = malloc(sizeof(int8_t));
if (value) {
*bind_tinyint = (int8_t)atoi(value);
} else {
*bind_tinyint = rand_tinyint();
}
bind->buffer_type = TSDB_DATA_TYPE_TINYINT;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = malloc(sizeof(int8_t));
if (value) {
if (strncasecmp(value, "true", 4)) {
*bind_bool = true;
} else {
*bind_bool = false;
}
} else {
*bind_bool = rand_bool();
}
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = malloc(sizeof(int64_t));
if (value) {
if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value);
}
} else {
*bind_ts2 = rand_bigint();
}
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts2;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
errorPrint( "No support data type: %s\n", dataType);
return -1;
}
return 0;
}
static int32_t prepareStmtBindArrayByTypeForRand(
TAOS_BIND *bind,
char *dataType, int32_t dataLen,
int32_t timePrec,
char **ptr,
char *value)
{
if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) {
......@@ -5814,7 +6044,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"FLOAT", strlen("FLOAT"))) {
float *bind_float = (float *) *ptr;
float *bind_float = (float *)*ptr;
if (value) {
*bind_float = (float)atof(value);
......@@ -5830,7 +6060,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"DOUBLE", strlen("DOUBLE"))) {
double *bind_double = (double *)*ptr;
double *bind_double = (double *)*ptr;
if (value) {
*bind_double = atof(value);
......@@ -5862,7 +6092,7 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TINYINT", strlen("TINYINT"))) {
int8_t *bind_tinyint = (int8_t *)*ptr;
int8_t *bind_tinyint = (int8_t *)*ptr;
if (value) {
*bind_tinyint = (int8_t)atoi(value);
......@@ -5874,12 +6104,21 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
bind->buffer = bind_tinyint;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"BOOL", strlen("BOOL"))) {
int8_t *bind_bool = (int8_t *)*ptr;
int8_t *bind_bool = (int8_t *)*ptr;
*bind_bool = rand_bool();
if (value) {
if (strncasecmp(value, "true", 4)) {
*bind_bool = true;
} else {
*bind_bool = false;
}
} else {
*bind_bool = rand_bool();
}
bind->buffer_type = TSDB_DATA_TYPE_BOOL;
bind->buffer_length = sizeof(int8_t);
bind->buffer = bind_bool;
......@@ -5889,10 +6128,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
*ptr += bind->buffer_length;
} else if (0 == strncasecmp(dataType,
"TIMESTAMP", strlen("TIMESTAMP"))) {
int64_t *bind_ts2 = (int64_t *) *ptr;
int64_t *bind_ts2 = (int64_t *)*ptr;
if (value) {
*bind_ts2 = atoll(value);
if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value);
}
} else {
*bind_ts2 = rand_bigint();
}
......@@ -5912,13 +6169,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
}
static int32_t prepareStmtWithoutStb(
TAOS_STMT *stmt,
threadInfo *pThreadInfo,
char *tableName,
uint32_t batch,
int64_t insertRows,
int64_t recordFrom,
int64_t startTime)
{
TAOS_STMT *stmt = pThreadInfo->stmt;
int ret = taos_stmt_set_tbname(stmt, tableName);
if (ret != 0) {
errorPrint("failed to execute taos_stmt_set_tbname(%s). return 0x%x. reason: %s\n",
......@@ -5938,15 +6196,11 @@ static int32_t prepareStmtWithoutStb(
int32_t k = 0;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE);
char *ptr = data;
TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0);
int64_t *bind_ts;
int64_t *bind_ts = pThreadInfo->bind_ts;
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (g_args.disorderRatio) {
......@@ -5962,8 +6216,6 @@ static int32_t prepareStmtWithoutStb(
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
for (int i = 0; i < g_args.num_of_CPR; i ++) {
bind = (TAOS_BIND *)((char *)bindArray
+ (sizeof(TAOS_BIND) * (i + 1)));
......@@ -5971,7 +6223,8 @@ static int32_t prepareStmtWithoutStb(
bind,
data_type[i],
g_args.len_of_binary,
&ptr, NULL)) {
pThreadInfo->time_precision,
NULL)) {
return -1;
}
}
......@@ -5998,10 +6251,42 @@ static int32_t prepareStmtWithoutStb(
return k;
}
static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand,
static int32_t prepareStbStmtBindTag(
char *bindArray, SSuperTable *stbInfo,
char *tagsVal,
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, g_args.len_of_binary);
return -1;
}
TAOS_BIND *tag;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
timePrec,
NULL)) {
free(bindBuffer);
return -1;
}
}
free(bindBuffer);
return 0;
}
static int32_t prepareStbStmtBindRand(
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
bool isColumn)
int32_t timePrec)
{
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
if (bindBuffer == NULL) {
......@@ -6016,121 +6301,92 @@ static int32_t prepareStbStmtBind(
TAOS_BIND *bind;
if (isColumn) {
int cursor = 0;
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
for (int i = 0; i < stbInfo->columnCount + 1; i ++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i));
if (i == 0) {
int64_t *bind_ts = ts;
if (i == 0) {
int64_t *bind_ts;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else if ( -1 == prepareStmtBindArrayByTypeForRand(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
timePrec,
&ptr,
NULL)) {
tmfree(bindBuffer);
return -1;
}
}
bind_ts = (int64_t *)ptr;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
ptr += bind->buffer_length;
} else {
tmfree(bindBuffer);
return 0;
}
if (sourceRand) {
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
} else {
char *restStr = stbInfo->sampleDataBuf + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
static int32_t prepareStbStmtBindWithSample(
int64_t *ts,
char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos)
{
TAOS_BIND *bind;
memset(bindBuffer, 0, g_args.len_of_binary);
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if ( -1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen,
&ptr,
bindBuffer)) {
free(bindBuffer);
return -1;
}
}
}
}
} else {
TAOS_BIND *tag;
bind = (TAOS_BIND *)bindArray;
for (int t = 0; t < stbInfo->tagCount; t ++) {
tag = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * t));
if ( -1 == prepareStmtBindArrayByType(
tag,
stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen,
&ptr,
NULL)) {
free(bindBuffer);
return -1;
}
}
int64_t *bind_ts = ts;
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
if (stbInfo->disorderRatio) {
*bind_ts = startTime + getTSRandTail(
stbInfo->timeStampStep, recSeq,
stbInfo->disorderRatio,
stbInfo->disorderRange);
} else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq;
}
bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
free(bindBuffer);
return 0;
}
static int32_t prepareStbStmt(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
static int32_t prepareStbStmtRand(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
int64_t startTime)
{
int ret;
bool sourceRand;
if (0 == strncasecmp(stbInfo->dataSource, "rand", strlen("rand"))) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
bool tagRand;
if (0 == stbInfo->tagSource) {
tagRand = true;
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagRand = false;
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
......@@ -6150,8 +6406,9 @@ static int32_t prepareStbStmt(
return -1;
}
if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) {
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
......@@ -6186,8 +6443,12 @@ static int32_t prepareStbStmt(
uint32_t k;
for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
startTime, k, true /* is column */)) {
if (-1 == prepareStbStmtBindRand(
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision
/* is column */)) {
free(bindArray);
return -1;
}
......@@ -6210,10 +6471,6 @@ static int32_t prepareStbStmt(
k++;
recordFrom ++;
if (!sourceRand) {
(*pSamplePos) ++;
}
if (recordFrom >= insertRows) {
break;
}
......@@ -6223,9 +6480,8 @@ static int32_t prepareStbStmt(
return k;
}
static int32_t prepareStbStmtInterlace(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
static int32_t prepareStbStmtWithSample(
threadInfo *pThreadInfo,
char *tableName,
int64_t tableSeq,
uint32_t batch,
......@@ -6234,41 +6490,109 @@ static int32_t prepareStbStmtInterlace(
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
tableSeq,
batch,
insertRows, 0, startTime,
pSamplePos);
}
int ret;
SSuperTable *stbInfo = pThreadInfo->stbInfo;
TAOS_STMT *stmt = pThreadInfo->stmt;
static int32_t prepareStbStmtProgressive(
SSuperTable *stbInfo,
TAOS_STMT *stmt,
char *tableName,
int64_t tableSeq,
uint32_t batch,
uint64_t insertRows,
uint64_t recordFrom,
int64_t startTime,
int64_t *pSamplePos)
{
return prepareStbStmt(
stbInfo,
stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, recordFrom, startTime,
pSamplePos);
}
if (AUTO_CREATE_SUBTBL == stbInfo->autoCreateTable) {
char* tagsValBuf = NULL;
if (0 == stbInfo->tagSource) {
tagsValBuf = generateTagValuesForStb(stbInfo, tableSeq);
} else {
tagsValBuf = getTagValueFromTagSample(
stbInfo,
tableSeq % stbInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
char *tagsArray = calloc(1, sizeof(TAOS_BIND) * stbInfo->tagCount);
if (NULL == tagsArray) {
tmfree(tagsValBuf);
errorPrint("%s() LN%d, tag buf failed to allocate memory\n",
__func__, __LINE__);
return -1;
}
if (-1 == prepareStbStmtBindTag(
tagsArray, stbInfo, tagsValBuf, pThreadInfo->time_precision
/* is tag */)) {
tmfree(tagsValBuf);
tmfree(tagsArray);
return -1;
}
ret = taos_stmt_set_tbname_tags(stmt, tableName, (TAOS_BIND *)tagsArray);
tmfree(tagsValBuf);
tmfree(tagsArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname_tags() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
} else {
ret = taos_stmt_set_tbname(stmt, tableName);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_set_tbname() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
}
uint32_t k;
for (k = 0; k < batch;) {
char *bindArray = (char *)(*((uintptr_t *)
(stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
/* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample(
pThreadInfo->bind_ts,
bindArray, stbInfo,
startTime, k,
pThreadInfo->time_precision,
*pSamplePos
/* is column */)) {
return -1;
}
ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
// if msg > 3MB, break
ret = taos_stmt_add_batch(stmt);
if (0 != ret) {
errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n",
__func__, __LINE__, taos_stmt_errstr(stmt));
return -1;
}
k++;
recordFrom ++;
(*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
if (recordFrom >= insertRows) {
break;
}
}
return k;
}
#endif
static int32_t generateStbProgressiveData(
SSuperTable *superTblInfo,
SSuperTable *stbInfo,
char *tableName,
int64_t tableSeq,
char *dbName, char *buffer,
......@@ -6282,7 +6606,7 @@ static int32_t generateStbProgressiveData(
memset(pstr, 0, *pRemainderBufLen);
int64_t headLen = generateStbSQLHead(
superTblInfo,
stbInfo,
tableName, tableSeq, dbName,
buffer, *pRemainderBufLen);
......@@ -6294,7 +6618,7 @@ static int32_t generateStbProgressiveData(
int64_t dataLen;
return generateStbDataTail(superTblInfo,
return generateStbDataTail(stbInfo,
g_args.num_of_RPR, pstr, *pRemainderBufLen,
insertRows, recordFrom,
startTime,
......@@ -6354,26 +6678,34 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t nTimeStampStep;
uint64_t insert_interval;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
bool sourceRand;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (superTblInfo) {
insertRows = superTblInfo->insertRows;
if (stbInfo) {
insertRows = stbInfo->insertRows;
if ((superTblInfo->interlaceRows == 0)
if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
interlaceRows = stbInfo->interlaceRows;
}
maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval;
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
maxSqlLen = superTblInfo->maxSqlLen;
nTimeStampStep = superTblInfo->timeStampStep;
insert_interval = superTblInfo->insertInterval;
} else {
insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval;
sourceRand = true;
}
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
......@@ -6456,28 +6788,38 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
uint64_t oldRemainderLen = remainderBufLen;
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtInterlace(
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
batchPerTbl,
insertRows, i,
startTime,
&(pThreadInfo->samplePos));
if (sourceRand) {
generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime,
&(pThreadInfo->samplePos));
}
#else
generated = -1;
#endif
} else {
generated = generateStbInterlaceData(
superTblInfo,
pThreadInfo,
tableName, batchPerTbl, i,
batchPerTblTimes,
tableSeq,
pThreadInfo, pstr,
pstr,
insertRows,
startTime,
&remainderBufLen);
......@@ -6490,7 +6832,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
tableName, batchPerTbl, startTime);
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt, tableName,
pThreadInfo,
tableName,
batchPerTbl,
insertRows, i,
startTime);
......@@ -6639,12 +6982,12 @@ free_of_interlace:
static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
uint64_t maxSqlLen = stbInfo?stbInfo->maxSqlLen:g_args.max_sql_len;
int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:g_args.timestamp_step;
stbInfo?stbInfo->timeStampStep:g_args.timestamp_step;
int64_t insertRows =
(superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
(stbInfo)?stbInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n",
__func__, __LINE__, insertRows);
......@@ -6663,6 +7006,17 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0;
bool sourceRand;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
sourceRand = true;
}
pThreadInfo->samplePos = 0;
int percentComplete = 0;
......@@ -6696,24 +7050,35 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
remainderBufLen -= len;
int32_t generated;
if (superTblInfo) {
if (superTblInfo->iface == STMT_IFACE) {
if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStbStmtProgressive(
superTblInfo,
pThreadInfo->stmt,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
if (sourceRand) {
generated = prepareStbStmtRand(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows,
i, start_time
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
}
#else
generated = -1;
#endif
} else {
generated = generateStbProgressiveData(
superTblInfo,
tableName, tableSeq, pThreadInfo->db_name, pstr,
stbInfo,
tableName, tableSeq,
pThreadInfo->db_name, pstr,
insertRows, i, start_time,
&(pThreadInfo->samplePos),
&remainderBufLen);
......@@ -6722,7 +7087,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (g_args.iface == STMT_IFACE) {
#if STMT_IFACE_ENABLED == 1
generated = prepareStmtWithoutStb(
pThreadInfo->stmt,
pThreadInfo,
tableName,
g_args.num_of_RPR,
insertRows, i,
......@@ -6792,9 +7157,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} // num_of_DPT
if ((g_args.verbose_print) &&
(tableSeq == pThreadInfo->ntables - 1) && (superTblInfo)
(tableSeq == pThreadInfo->ntables - 1) && (stbInfo)
&& (0 == strncasecmp(
superTblInfo->dataSource,
stbInfo->dataSource,
"sample", strlen("sample")))) {
verbosePrint("%s() LN%d samplePos=%"PRId64"\n",
__func__, __LINE__, pThreadInfo->samplePos);
......@@ -6812,18 +7177,18 @@ free_of_progressive:
static void* syncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("syncWrite");
uint32_t interlaceRows;
if (superTblInfo) {
if ((superTblInfo->interlaceRows == 0)
if (stbInfo) {
if ((stbInfo->interlaceRows == 0)
&& (g_args.interlace_rows > 0)) {
interlaceRows = g_args.interlace_rows;
} else {
interlaceRows = superTblInfo->interlaceRows;
interlaceRows = stbInfo->interlaceRows;
}
} else {
interlaceRows = g_args.interlace_rows;
......@@ -6840,10 +7205,10 @@ static void* syncWrite(void *sarg) {
static void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* pThreadInfo = (threadInfo*)param;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->et = taosGetTimestampMs();
if ((pThreadInfo->et - pThreadInfo->st) < insert_interval) {
......@@ -6851,13 +7216,13 @@ static void callBack(void *param, TAOS_RES *res, int code) {
}
}
char *buffer = calloc(1, pThreadInfo->superTblInfo->maxSqlLen);
char *buffer = calloc(1, pThreadInfo->stbInfo->maxSqlLen);
char data[MAX_DATA_SIZE];
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%"PRId64" values",
pThreadInfo->db_name, pThreadInfo->tb_prefix,
pThreadInfo->start_table_from);
// if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
// if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
if (pThreadInfo->counter >= g_args.num_of_RPR) {
pThreadInfo->start_table_from++;
pThreadInfo->counter = 0;
......@@ -6871,15 +7236,15 @@ static void callBack(void *param, TAOS_RES *res, int code) {
for (int i = 0; i < g_args.num_of_RPR; i++) {
int rand_num = taosRandom() % 100;
if (0 != pThreadInfo->superTblInfo->disorderRatio
&& rand_num < pThreadInfo->superTblInfo->disorderRatio) {
if (0 != pThreadInfo->stbInfo->disorderRatio
&& rand_num < pThreadInfo->stbInfo->disorderRatio) {
int64_t d = pThreadInfo->lastTs
- (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->superTblInfo, data,
- (taosRandom() % pThreadInfo->stbInfo->disorderRange + 1);
generateStbRowData(pThreadInfo->stbInfo, data,
MAX_DATA_SIZE,
d);
} else {
generateStbRowData(pThreadInfo->superTblInfo,
generateStbRowData(pThreadInfo->stbInfo,
data,
MAX_DATA_SIZE,
pThreadInfo->lastTs += 1000);
......@@ -6887,7 +7252,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "%s", data);
pThreadInfo->counter++;
if (pThreadInfo->counter >= pThreadInfo->superTblInfo->insertRows) {
if (pThreadInfo->counter >= pThreadInfo->stbInfo->insertRows) {
break;
}
}
......@@ -6903,7 +7268,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
static void *asyncWrite(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
SSuperTable* stbInfo = pThreadInfo->stbInfo;
setThreadName("asyncWrite");
......@@ -6912,7 +7277,7 @@ static void *asyncWrite(void *sarg) {
pThreadInfo->lastTs = pThreadInfo->start_time;
int insert_interval =
superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
stbInfo?stbInfo->insertInterval:g_args.insert_interval;
if (insert_interval) {
pThreadInfo->st = taosGetTimestampMs();
}
......@@ -6949,8 +7314,81 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0;
}
#if STMT_IFACE_ENABLED == 1
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
{
stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (stbInfo->sampleBindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1;
}
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1));
return -1;
}
TAOS_BIND *bind;
int cursor = 0;
for (int c = 0; c < stbInfo->columnCount + 1; c++) {
bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c));
if (c == 0) {
bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
bind->buffer_length = sizeof(int64_t);
bind->buffer = NULL; //bind_ts;
bind->length = &bind->buffer_length;
bind->is_null = NULL;
} else {
char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * i + cursor;
int lengthOfRest = strlen(restStr);
int index = 0;
for (index = 0; index < lengthOfRest; index ++) {
if (restStr[index] == ',') {
break;
}
}
char *bindBuffer = calloc(1, index + 1);
if (bindBuffer == NULL) {
errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n",
__func__, __LINE__, DOUBLE_BUFF_LEN);
return -1;
}
strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too
if (-1 == prepareStmtBindArrayByType(
bind,
stbInfo->columns[c-1].dataType,
stbInfo->columns[c-1].dataLen,
timePrec,
bindBuffer)) {
free(bindBuffer);
return -1;
}
free(bindBuffer);
}
}
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray;
}
return 0;
}
#endif
static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) {
char* precision, SSuperTable* stbInfo) {
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
......@@ -6964,19 +7402,19 @@ static void startMultiThreadInsertData(int threads, char* db_name,
#endif
} else {
errorPrint("Not support precision: %s\n", precision);
exit(-1);
exit(EXIT_FAILURE);
}
}
int64_t start_time;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
if (stbInfo) {
if (0 == strncasecmp(stbInfo->startTimestamp, "now", 3)) {
start_time = taosGetTimestamp(timePrec);
} else {
if (TSDB_CODE_SUCCESS != taosParseTime(
superTblInfo->startTimestamp,
stbInfo->startTimestamp,
&start_time,
strlen(superTblInfo->startTimestamp),
strlen(stbInfo->startTimestamp),
timePrec, 0)) {
ERROR_EXIT("failed to parse time!\n");
}
......@@ -6990,12 +7428,12 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t start = taosGetTimestampMs();
// read sample data from file first
if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource,
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
if (0 != prepareSampleDataForSTable(superTblInfo)) {
if (0 != prepareSampleDataForSTable(stbInfo)) {
errorPrint("%s() LN%d, prepare sample data for stable failed!\n",
__func__, __LINE__);
exit(-1);
exit(EXIT_FAILURE);
}
}
......@@ -7005,68 +7443,68 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (NULL == taos0) {
errorPrint("%s() LN%d, connect to server fail , reason: %s\n",
__func__, __LINE__, taos_errstr(NULL));
exit(-1);
exit(EXIT_FAILURE);
}
int64_t ntables = 0;
uint64_t tableFrom;
if (superTblInfo) {
if (stbInfo) {
int64_t limit;
uint64_t offset;
if ((NULL != g_args.sqlFile)
&& (superTblInfo->childTblExists == TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset != 0)
|| (superTblInfo->childTblLimit >= 0))) {
&& (stbInfo->childTblExists == TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset != 0)
|| (stbInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables not exists!\n");
}
if (superTblInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset
+ superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset;
if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) {
if ((stbInfo->childTblLimit < 0)
|| ((stbInfo->childTblOffset
+ stbInfo->childTblLimit)
> (stbInfo->childTblCount))) {
stbInfo->childTblLimit =
stbInfo->childTblCount - stbInfo->childTblOffset;
}
offset = superTblInfo->childTblOffset;
limit = superTblInfo->childTblLimit;
offset = stbInfo->childTblOffset;
limit = stbInfo->childTblLimit;
} else {
limit = superTblInfo->childTblCount;
limit = stbInfo->childTblCount;
offset = 0;
}
ntables = limit;
tableFrom = offset;
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& ((superTblInfo->childTblOffset + superTblInfo->childTblLimit )
> superTblInfo->childTblCount)) {
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& ((stbInfo->childTblOffset + stbInfo->childTblLimit)
> stbInfo->childTblCount)) {
printf("WARNING: specified offset + limit > child table count!\n");
prompt();
}
if ((superTblInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == superTblInfo->childTblLimit)) {
if ((stbInfo->childTblExists != TBL_NO_EXISTS)
&& (0 == stbInfo->childTblLimit)) {
printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n");
prompt();
}
superTblInfo->childTblName = (char*)calloc(1,
stbInfo->childTblName = (char*)calloc(1,
limit * TSDB_TABLE_NAME_LEN);
if (superTblInfo->childTblName == NULL) {
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
if (stbInfo->childTblName == NULL) {
taos_close(taos0);
exit(-1);
errorPrint("%s() LN%d, alloc memory failed!\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
int64_t childTblCount;
getChildNameOfSuperTableWithLimitAndOffset(
taos0,
db_name, superTblInfo->sTblName,
&superTblInfo->childTblName, &childTblCount,
db_name, stbInfo->sTblName,
&stbInfo->childTblName, &childTblCount,
limit,
offset);
} else {
......@@ -7087,11 +7525,11 @@ static void startMultiThreadInsertData(int threads, char* db_name,
b = ntables % threads;
}
if ((superTblInfo)
&& (superTblInfo->iface == REST_IFACE)) {
if ((stbInfo)
&& (stbInfo->iface == REST_IFACE)) {
if (convertHostToServAddr(
g_Dbs.host, g_Dbs.port, &(g_Dbs.serv_addr)) != 0) {
exit(-1);
ERROR_EXIT("convert host to server address");
}
}
......@@ -7104,98 +7542,110 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo));
#if STMT_IFACE_ENABLED == 1
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
if ((g_args.iface == STMT_IFACE)
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
char *pstr = stmtBuffer;
if ((stbInfo)
&& (AUTO_CREATE_SUBTBL
== stbInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
stbInfo->sTblName);
for (int tag = 0; tag < (stbInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
int columnCount;
if (stbInfo) {
columnCount = stbInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
if ((stbInfo) && (0 == strncasecmp(stbInfo->dataSource,
"sample", strlen("sample")))) {
parseSampleFileToStmt(stbInfo, timePrec);
}
}
#endif
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i;
tstrncpy(pThreadInfo->db_name, db_name, TSDB_DB_NAME_LEN);
pThreadInfo->time_precision = timePrec;
pThreadInfo->superTblInfo = superTblInfo;
pThreadInfo->stbInfo = stbInfo;
pThreadInfo->start_time = start_time;
pThreadInfo->minDelay = UINT64_MAX;
if ((NULL == superTblInfo) ||
(superTblInfo->iface != REST_IFACE)) {
if ((NULL == stbInfo) ||
(stbInfo->iface != REST_IFACE)) {
//t_info->taos = taos;
pThreadInfo->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == pThreadInfo->taos) {
free(infos);
errorPrint(
"%s() LN%d, connect to server fail from insert sub thread, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(infos);
exit(-1);
exit(EXIT_FAILURE);
}
#if STMT_IFACE_ENABLED == 1
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
|| ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) {
free(pids);
free(infos);
errorPrint(
"%s() LN%d, failed init stmt, reason: %s\n",
__func__, __LINE__,
taos_errstr(NULL));
free(pids);
free(infos);
exit(-1);
}
char *buffer = calloc(1, BUFFER_SIZE);
assert(buffer);
char *pstr = buffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
exit(EXIT_FAILURE);
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt));
free(pids);
free(infos);
free(buffer);
exit(-1);
free(stmtBuffer);
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt));
exit(EXIT_FAILURE);
}
free(buffer);
pThreadInfo->bind_ts = malloc(sizeof(int64_t));
}
#endif
} else {
pThreadInfo->taos = NULL;
}
/* if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
/* if ((NULL == stbInfo)
|| (0 == stbInfo->multiThreadWriteOneTbl)) {
*/
pThreadInfo->start_table_from = tableFrom;
pThreadInfo->ntables = i<b?a+1:a;
......@@ -7203,7 +7653,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tableFrom = pThreadInfo->end_table_to + 1;
/* } else {
pThreadInfo->start_table_from = 0;
pThreadInfo->ntables = superTblInfo->childTblCount;
pThreadInfo->ntables = stbInfo->childTblCount;
pThreadInfo->start_time = pThreadInfo->start_time + rand_int() % 10000 - rand_tinyint();
}
*/
......@@ -7215,6 +7665,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
}
}
#if STMT_IFACE_ENABLED == 1
free(stmtBuffer);
#endif
for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL);
}
......@@ -7228,11 +7682,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i;
tsem_destroy(&(pThreadInfo->lock_sem));
#if STMT_IFACE_ENABLED == 1
if (pThreadInfo->stmt) {
taos_stmt_close(pThreadInfo->stmt);
tmfree((char *)pThreadInfo->bind_ts);
}
#endif
tsem_destroy(&(pThreadInfo->lock_sem));
......@@ -7242,9 +7695,9 @@ static void startMultiThreadInsertData(int threads, char* db_name,
__func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
if (superTblInfo) {
superTblInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
superTblInfo->totalInsertRows += pThreadInfo->totalInsertRows;
if (stbInfo) {
stbInfo->totalAffectedRows += pThreadInfo->totalAffectedRows;
stbInfo->totalInsertRows += pThreadInfo->totalInsertRows;
} else {
g_args.totalAffectedRows += pThreadInfo->totalAffectedRows;
g_args.totalInsertRows += pThreadInfo->totalInsertRows;
......@@ -7265,22 +7718,22 @@ static void startMultiThreadInsertData(int threads, char* db_name,
double tInMs = t/1000.0;
if (superTblInfo) {
if (stbInfo) {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
tInMs, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName,
tInMs, stbInfo->totalInsertRows,
stbInfo->totalAffectedRows,
threads, db_name, stbInfo->sTblName,
(tInMs)?
(double)(superTblInfo->totalInsertRows/tInMs):FLT_MAX);
(double)(stbInfo->totalInsertRows/tInMs):FLT_MAX);
}
} else {
fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
......@@ -7335,8 +7788,8 @@ static void *readTable(void *sarg) {
}
int64_t num_of_DPT;
/* if (pThreadInfo->superTblInfo) {
num_of_DPT = pThreadInfo->superTblInfo->insertRows; // nrecords_per_table;
/* if (pThreadInfo->stbInfo) {
num_of_DPT = pThreadInfo->stbInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
......@@ -7410,7 +7863,7 @@ static void *readMetric(void *sarg) {
return NULL;
}
int64_t num_of_DPT = pThreadInfo->superTblInfo->insertRows;
int64_t num_of_DPT = pThreadInfo->stbInfo->insertRows;
int64_t num_of_tables = pThreadInfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
int64_t totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -7549,14 +8002,14 @@ static int insertTestProcess() {
if (g_Dbs.db[i].superTblCount > 0) {
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
SSuperTable* stbInfo = &g_Dbs.db[i].superTbls[j];
if (superTblInfo && (superTblInfo->insertRows > 0)) {
if (stbInfo && (stbInfo->insertRows > 0)) {
startMultiThreadInsertData(
g_Dbs.threadCount,
g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision,
superTblInfo);
stbInfo);
}
}
}
......@@ -7776,7 +8229,7 @@ static int queryTestProcess() {
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
exit(EXIT_FAILURE);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
......@@ -7796,7 +8249,7 @@ static int queryTestProcess() {
if (0 == strncasecmp(g_queryInfo.queryMode, "rest", strlen("rest"))) {
if (convertHostToServAddr(
g_queryInfo.host, g_queryInfo.port, &g_queryInfo.serv_addr) != 0)
exit(-1);
ERROR_EXIT("convert host to server address");
}
pthread_t *pids = NULL;
......@@ -8000,10 +8453,10 @@ static void *superSubscribe(void *sarg) {
setThreadName("superSub");
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
free(subSqlStr);
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
pThreadInfo->ntables, MAX_QUERY_SQL_COUNT);
free(subSqlStr);
exit(-1);
exit(EXIT_FAILURE);
}
if (pThreadInfo->taos == NULL) {
......@@ -8269,7 +8722,7 @@ static int subscribeTestProcess() {
if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
exit(-1);
exit(EXIT_FAILURE);
}
if (0 != g_queryInfo.superQueryInfo.sqlCount) {
......@@ -8298,7 +8751,7 @@ static int subscribeTestProcess() {
errorPrint("%s() LN%d, sepcified query sqlCount %d.\n",
__func__, __LINE__,
g_queryInfo.specifiedQueryInfo.sqlCount);
exit(-1);
exit(EXIT_FAILURE);
}
pids = calloc(
......@@ -8313,7 +8766,7 @@ static int subscribeTestProcess() {
sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__);
exit(-1);
exit(EXIT_FAILURE);
}
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
......@@ -8350,7 +8803,7 @@ static int subscribeTestProcess() {
errorPrint("%s() LN%d, malloc failed for create threads\n",
__func__, __LINE__);
// taos_close(taos);
exit(-1);
exit(EXIT_FAILURE);
}
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
......@@ -8553,8 +9006,7 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
/* Compile regular expression */
if (regcomp(&regex, reg, cflags) != 0) {
printf("Fail to compile regex\n");
exit(-1);
ERROR_EXIT("Fail to compile regex\n");
}
/* Execute regular expression */
......@@ -8567,9 +9019,9 @@ static int regexMatch(const char *s, const char *reg, int cflags) {
return 0;
} else {
regerror(reti, &regex, msgbuf, sizeof(msgbuf));
printf("Regex match failed: %s\n", msgbuf);
regfree(&regex);
exit(-1);
printf("Regex match failed: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
return 0;
......@@ -8671,7 +9123,7 @@ static void queryResult() {
if (g_args.use_metric) {
pThreadInfo->ntables = g_Dbs.db[0].superTbls[0].childTblCount;
pThreadInfo->end_table_to = g_Dbs.db[0].superTbls[0].childTblCount - 1;
pThreadInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
pThreadInfo->stbInfo = &g_Dbs.db[0].superTbls[0];
tstrncpy(pThreadInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix, TBNAME_PREFIX_LEN);
} else {
......@@ -8687,10 +9139,10 @@ static void queryResult() {
g_Dbs.db[0].dbName,
g_Dbs.port);
if (pThreadInfo->taos == NULL) {
free(pThreadInfo);
errorPrint( "Failed to connect to TDengine, reason:%s\n",
taos_errstr(NULL));
free(pThreadInfo);
exit(-1);
exit(EXIT_FAILURE);
}
tstrncpy(pThreadInfo->filePath, g_Dbs.resultFile, MAX_FILE_NAME_LEN);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册