From 850f9d628cb26765b0a6b9a40f9d3bb26f6d5bc4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 10 Aug 2021 18:04:20 +0800 Subject: [PATCH] [TD-5872]: taosdemo stmt csv perf improve. --- src/kit/taosdemo/taosdemo.c | 227 ++++++++++++------------------------ 1 file changed, 75 insertions(+), 152 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index f5c1bb2051..846bf677d8 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -444,6 +444,7 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; TAOS_STMT *stmt; + int64_t *bind_ts; int threadID; char db_name[TSDB_DB_NAME_LEN]; uint32_t time_precision; @@ -5058,10 +5059,15 @@ static void postFreeResource() { } #if STMT_IFACE_ENABLED == 1 if (g_Dbs.db[i].superTbls[j].sampleBindArray) { - for (int c = 0; c < MAX_SAMPLES_ONCE_FROM_FILE; c++) { + for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( g_Dbs.db[i].superTbls[j].sampleBindArray - + sizeof(uintptr_t *) * c)); + + sizeof(uintptr_t *) * k)); + for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) { + TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); + if (bind) + tmfree(bind->buffer); + } tmfree((char *)tmp); } } @@ -5747,8 +5753,9 @@ static int64_t generateInterlaceDataWithoutStb( } #if STMT_IFACE_ENABLED == 1 -static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, - char *dataType, int32_t dataLen, char **ptr, +static int32_t prepareStmtBindArrayByType( + TAOS_BIND *bind, + char *dataType, int32_t dataLen, int32_t timePrec, char *value) { @@ -5759,13 +5766,15 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, (uint32_t)TSDB_MAX_BINARY_LEN); return -1; } - char *bind_binary = (char *)*ptr; + char *bind_binary; bind->buffer_type = TSDB_DATA_TYPE_BINARY; if (value) { + bind_binary = calloc(1, strlen(value) + 1); strncpy(bind_binary, value, strlen(value)); bind->buffer_length = strlen(bind_binary); } else { + bind_binary = calloc(1, dataLen + 1); rand_string(bind_binary, dataLen); bind->buffer_length = dataLen; } @@ -5773,8 +5782,6 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->length = &bind->buffer_length; bind->buffer = bind_binary; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "NCHAR", strlen("NCHAR"))) { if (dataLen > TSDB_MAX_BINARY_LEN) { @@ -5782,12 +5789,14 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, (uint32_t)TSDB_MAX_BINARY_LEN); return -1; } - char *bind_nchar = (char *)*ptr; + char *bind_nchar; bind->buffer_type = TSDB_DATA_TYPE_NCHAR; if (value) { + bind_nchar = calloc(1, strlen(value) + 1); strncpy(bind_nchar, value, strlen(value)); } else { + bind_nchar = calloc(1, dataLen + 1); rand_string(bind_nchar, dataLen); } @@ -5795,11 +5804,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_nchar; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "INT", strlen("INT"))) { - int32_t *bind_int = (int32_t *)*ptr; + int32_t *bind_int = malloc(sizeof(int32_t)); if (value) { *bind_int = atoi(value); @@ -5811,11 +5818,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_int; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "BIGINT", strlen("BIGINT"))) { - int64_t *bind_bigint = (int64_t *)*ptr; + int64_t *bind_bigint = malloc(sizeof(int64_t)); if (value) { *bind_bigint = atoll(value); @@ -5827,11 +5832,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_bigint; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "FLOAT", strlen("FLOAT"))) { - float *bind_float = (float *) *ptr; + float *bind_float = malloc(sizeof(float)); if (value) { *bind_float = (float)atof(value); @@ -5843,11 +5846,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_float; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "DOUBLE", strlen("DOUBLE"))) { - double *bind_double = (double *)*ptr; + double *bind_double = malloc(sizeof(double)); if (value) { *bind_double = atof(value); @@ -5859,11 +5860,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_double; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "SMALLINT", strlen("SMALLINT"))) { - int16_t *bind_smallint = (int16_t *)*ptr; + int16_t *bind_smallint = malloc(sizeof(int16_t)); if (value) { *bind_smallint = (int16_t)atoi(value); @@ -5875,11 +5874,9 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_smallint; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "TINYINT", strlen("TINYINT"))) { - int8_t *bind_tinyint = (int8_t *)*ptr; + int8_t *bind_tinyint = malloc(sizeof(int8_t)); if (value) { *bind_tinyint = (int8_t)atoi(value); @@ -5891,22 +5888,28 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_tinyint; bind->length = &bind->buffer_length; bind->is_null = NULL; - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "BOOL", strlen("BOOL"))) { - int8_t *bind_bool = (int8_t *)*ptr; + int8_t *bind_bool = malloc(sizeof(int8_t)); - *bind_bool = rand_bool(); + if (value) { + if (strncasecmp(value, "true", 4)) { + *bind_bool = true; + } else { + *bind_bool = false; + } + } else { + *bind_bool = rand_bool(); + } bind->buffer_type = TSDB_DATA_TYPE_BOOL; bind->buffer_length = sizeof(int8_t); bind->buffer = bind_bool; bind->length = &bind->buffer_length; bind->is_null = NULL; - *ptr += bind->buffer_length; } else if (0 == strncasecmp(dataType, "TIMESTAMP", strlen("TIMESTAMP"))) { - int64_t *bind_ts2 = (int64_t *) *ptr; + int64_t *bind_ts2 = malloc(sizeof(int64_t)); if (value) { if (strchr(value, ':') && strchr(value, '-')) { @@ -5936,8 +5939,6 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, bind->buffer = bind_ts2; bind->length = &bind->buffer_length; bind->is_null = NULL; - - *ptr += bind->buffer_length; } else { errorPrint( "No support data type: %s\n", dataType); return -1; @@ -5974,15 +5975,11 @@ static int32_t prepareStmtWithoutStb( int32_t k = 0; for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; TAOS_BIND *bind = (TAOS_BIND *)(bindArray + 0); - int64_t *bind_ts; + int64_t *bind_ts = pThreadInfo->bind_ts; - bind_ts = (int64_t *)ptr; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; if (g_args.disorderRatio) { @@ -5998,8 +5995,6 @@ static int32_t prepareStmtWithoutStb( bind->length = &bind->buffer_length; bind->is_null = NULL; - ptr += bind->buffer_length; - for (int i = 0; i < g_args.num_of_CPR; i ++) { bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * (i + 1))); @@ -6007,7 +6002,6 @@ static int32_t prepareStmtWithoutStb( bind, data_type[i], g_args.len_of_binary, - &ptr, pThreadInfo->time_precision, NULL)) { return -1; @@ -6048,10 +6042,6 @@ static int32_t prepareStbStmtBindTag( return -1; } - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; - TAOS_BIND *tag; for (int t = 0; t < stbInfo->tagCount; t ++) { @@ -6060,7 +6050,6 @@ static int32_t prepareStbStmtBindTag( tag, stbInfo->tags[t].dataType, stbInfo->tags[t].dataLen, - &ptr, timePrec, NULL)) { free(bindBuffer); @@ -6073,6 +6062,7 @@ static int32_t prepareStbStmtBindTag( } static int32_t prepareStbStmtBindRand( + int64_t *ts, char *bindArray, SSuperTable *stbInfo, int64_t startTime, int32_t recSeq, int32_t timePrec) @@ -6084,19 +6074,14 @@ static int32_t prepareStbStmtBindRand( return -1; } - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; - TAOS_BIND *bind; for (int i = 0; i < stbInfo->columnCount + 1; i ++) { bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); if (i == 0) { - int64_t *bind_ts; + int64_t *bind_ts = ts; - bind_ts = (int64_t *)ptr; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; if (stbInfo->disorderRatio) { *bind_ts = startTime + getTSRandTail( @@ -6111,12 +6096,10 @@ static int32_t prepareStbStmtBindRand( bind->length = &bind->buffer_length; bind->is_null = NULL; - ptr += bind->buffer_length; } else if ( -1 == prepareStmtBindArrayByType( bind, stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataLen, - &ptr, timePrec, NULL)) { tmfree(bindBuffer); @@ -6129,78 +6112,32 @@ static int32_t prepareStbStmtBindRand( } static int32_t prepareStbStmtBindWithSample( + int64_t *ts, char *bindArray, SSuperTable *stbInfo, int64_t startTime, int32_t recSeq, int32_t timePrec, int64_t samplePos) { - char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); - if (bindBuffer == NULL) { - errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", - __func__, __LINE__, g_args.len_of_binary); - return -1; - } - - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; - TAOS_BIND *bind; - int cursor = 0; - - for (int i = 0; i < stbInfo->columnCount + 1; i ++) { - bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * i)); - - if (i == 0) { - int64_t *bind_ts; - - bind_ts = (int64_t *)ptr; - bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; - if (stbInfo->disorderRatio) { - *bind_ts = startTime + getTSRandTail( - stbInfo->timeStampStep, recSeq, - stbInfo->disorderRatio, - stbInfo->disorderRange); - } else { - *bind_ts = startTime + stbInfo->timeStampStep * recSeq; - } - bind->buffer_length = sizeof(int64_t); - bind->buffer = bind_ts; - bind->length = &bind->buffer_length; - bind->is_null = NULL; - - ptr += bind->buffer_length; - } else { - char *restStr = stbInfo->sampleDataBuf - + stbInfo->lenOfOneRow * samplePos + cursor; - int lengthOfRest = strlen(restStr); - - int index = 0; - for (index = 0; index < lengthOfRest; index ++) { - if (restStr[index] == ',') { - break; - } - } + bind = (TAOS_BIND *)bindArray; - memset(bindBuffer, 0, DOUBLE_BUFF_LEN); - strncpy(bindBuffer, restStr, index); - cursor += index + 1; // skip ',' too + int64_t *bind_ts = ts; - if ( -1 == prepareStmtBindArrayByType( - bind, - stbInfo->columns[i-1].dataType, - stbInfo->columns[i-1].dataLen, - &ptr, - timePrec, - bindBuffer)) { - free(bindBuffer); - return -1; - } - } + bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + if (stbInfo->disorderRatio) { + *bind_ts = startTime + getTSRandTail( + stbInfo->timeStampStep, recSeq, + stbInfo->disorderRatio, + stbInfo->disorderRange); + } else { + *bind_ts = startTime + stbInfo->timeStampStep * recSeq; } + bind->buffer_length = sizeof(int64_t); + bind->buffer = bind_ts; + bind->length = &bind->buffer_length; + bind->is_null = NULL; - free(bindBuffer); return 0; } @@ -6279,7 +6216,9 @@ static int32_t prepareStbStmtRand( uint32_t k; for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ - if (-1 == prepareStbStmtBindRand(bindArray, stbInfo, + if (-1 == prepareStbStmtBindRand( + pThreadInfo->bind_ts, + bindArray, stbInfo, startTime, k, pThreadInfo->time_precision /* is column */)) { @@ -6380,30 +6319,24 @@ static int32_t prepareStbStmtWithSample( } } - char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); - if (bindArray == NULL) { - errorPrint("%s() LN%d, Failed to allocate %d bind params\n", - __func__, __LINE__, (stbInfo->columnCount + 1)); - return -1; - } - uint32_t k; for (k = 0; k < batch;) { + char *bindArray = (char *)(*((uintptr_t *) + (stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); /* columnCount + 1 (ts) */ if (-1 == prepareStbStmtBindWithSample( + pThreadInfo->bind_ts, bindArray, stbInfo, startTime, k, pThreadInfo->time_precision, *pSamplePos /* is column */)) { - free(bindArray); return -1; } ret = taos_stmt_bind_param(stmt, (TAOS_BIND *)bindArray); if (0 != ret) { errorPrint("%s() LN%d, stmt_bind_param() failed! reason: %s\n", __func__, __LINE__, taos_stmt_errstr(stmt)); - free(bindArray); return -1; } // if msg > 3MB, break @@ -6411,7 +6344,6 @@ static int32_t prepareStbStmtWithSample( if (0 != ret) { errorPrint("%s() LN%d, stmt_add_batch() failed! reason: %s\n", __func__, __LINE__, taos_stmt_errstr(stmt)); - free(bindArray); return -1; } @@ -6428,11 +6360,8 @@ static int32_t prepareStbStmtWithSample( } } - free(bindArray); return k; } - - #endif static int32_t generateStbProgressiveData( @@ -7161,17 +7090,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * #if STMT_IFACE_ENABLED == 1 static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) { - // TODO: - stbInfo->sampleBindArray = calloc(sizeof(char *), MAX_SAMPLES_ONCE_FROM_FILE); - assert(stbInfo->sampleBindArray); - - char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); - if (bindBuffer == NULL) { - errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", - __func__, __LINE__, g_args.len_of_binary); + stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + if (stbInfo->sampleBindArray == NULL) { + errorPrint("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n", + __func__, __LINE__, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); return -1; } + for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); if (bindArray == NULL) { @@ -7180,9 +7106,6 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) return -1; } - char data[MAX_DATA_SIZE]; - memset(data, 0, MAX_DATA_SIZE); - char *ptr = data; TAOS_BIND *bind; int cursor = 0; @@ -7191,16 +7114,11 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) bind = (TAOS_BIND *)((char *)bindArray + (sizeof(TAOS_BIND) * c)); if (c == 0) { - int64_t *bind_ts; - - bind_ts = (int64_t *)ptr; bind->buffer_type = TSDB_DATA_TYPE_TIMESTAMP; bind->buffer_length = sizeof(int64_t); - bind->buffer = bind_ts; + bind->buffer = NULL; //bind_ts; bind->length = &bind->buffer_length; bind->is_null = NULL; - - ptr += bind->buffer_length; } else { char *restStr = stbInfo->sampleDataBuf + stbInfo->lenOfOneRow * i + cursor; @@ -7213,26 +7131,31 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) } } - memset(bindBuffer, 0, DOUBLE_BUFF_LEN); + char *bindBuffer = calloc(1, index + 1); + if (bindBuffer == NULL) { + errorPrint("%s() LN%d, Failed to allocate %d bind buffer\n", + __func__, __LINE__, DOUBLE_BUFF_LEN); + return -1; + } + strncpy(bindBuffer, restStr, index); cursor += index + 1; // skip ',' too - if ( -1 == prepareStmtBindArrayByType( + if (-1 == prepareStmtBindArrayByType( bind, stbInfo->columns[c-1].dataType, stbInfo->columns[c-1].dataLen, - &ptr, timePrec, bindBuffer)) { free(bindBuffer); return -1; } + free(bindBuffer); } } *((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray; } - free(bindBuffer); return 0; } #endif @@ -7487,6 +7410,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, free(stmtBuffer); exit(-1); } + pThreadInfo->bind_ts = malloc(sizeof(int64_t)); } #endif } else { @@ -7531,11 +7455,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; - tsem_destroy(&(pThreadInfo->lock_sem)); - #if STMT_IFACE_ENABLED == 1 if (pThreadInfo->stmt) { taos_stmt_close(pThreadInfo->stmt); + tmfree((char *)pThreadInfo->bind_ts); } #endif tsem_destroy(&(pThreadInfo->lock_sem)); -- GitLab