From 83e2e11e6e965fb321fd86df83550f2bb82806a4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 31 Aug 2021 10:30:57 +0800 Subject: [PATCH] cherry pick from develop branch. (#7726) --- src/kit/taosdemo/taosdemo.c | 173 +++++++++++++++--------------------- 1 file changed, 71 insertions(+), 102 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 6199005a62..411d6e48e4 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -291,7 +291,6 @@ typedef struct SSuperTable_S { uint64_t lenOfTagOfOneRow; char* sampleDataBuf; - char* sampleBindArray; //int sampleRowCount; //int sampleUsePos; @@ -438,7 +437,8 @@ typedef struct SQueryMetaInfo_S { typedef struct SThreadInfo_S { TAOS * taos; TAOS_STMT *stmt; - int64_t *bind_ts; + char* sampleBindArray; + int64_t *bind_ts; int threadID; char db_name[TSDB_DB_NAME_LEN]; uint32_t time_precision; @@ -5737,20 +5737,6 @@ static void postFreeResource() { free(g_Dbs.db[i].superTbls[j].sampleDataBuf); g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; } - if (g_Dbs.db[i].superTbls[j].sampleBindArray) { - for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { - uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( - g_Dbs.db[i].superTbls[j].sampleBindArray - + sizeof(uintptr_t *) * k)); - for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) { - TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); - if (bind) - tmfree(bind->buffer); - } - tmfree((char *)tmp); - } - } - tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray); if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { free(g_Dbs.db[i].superTbls[j].tagDataBuf); @@ -6084,9 +6070,6 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) int32_t affectedRows; SSuperTable* stbInfo = pThreadInfo->stbInfo; - verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, - __func__, __LINE__, pThreadInfo->buffer); - uint16_t iface; if (stbInfo) iface = stbInfo->iface; @@ -6104,12 +6087,18 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) switch(iface) { case TAOSC_IFACE: + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + affectedRows = queryDbExec( pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false); break; case REST_IFACE: + verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID, + __func__, __LINE__, pThreadInfo->buffer); + if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, pThreadInfo->buffer, pThreadInfo)) { affectedRows = -1; @@ -7087,12 +7076,12 @@ static int32_t prepareStbStmtBindRand( return 0; } -static int32_t prepareStbStmtBindWithSample( +static int32_t prepareStbStmtBindStartTime( + char *tableName, int64_t *ts, char *bindArray, SSuperTable *stbInfo, int64_t startTime, int32_t recSeq, - int32_t timePrec, - int64_t samplePos) + int32_t timePrec) { TAOS_BIND *bind; @@ -7109,6 +7098,10 @@ static int32_t prepareStbStmtBindWithSample( } else { *bind_ts = startTime + stbInfo->timeStampStep * recSeq; } + + verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n", + __func__, __LINE__, tableName, *bind_ts); + bind->buffer_length = sizeof(int64_t); bind->buffer = bind_ts; bind->length = &bind->buffer_length; @@ -7117,7 +7110,7 @@ static int32_t prepareStbStmtBindWithSample( return 0; } -static int32_t prepareStbStmtRand( +UNUSED_FUNC static int32_t prepareStbStmtRand( threadInfo *pThreadInfo, char *tableName, int64_t tableSeq, @@ -7298,14 +7291,14 @@ static int32_t prepareStbStmtWithSample( uint32_t k; for (k = 0; k < batch;) { char *bindArray = (char *)(*((uintptr_t *) - (stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); + (pThreadInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); /* columnCount + 1 (ts) */ - if (-1 == prepareStbStmtBindWithSample( + if (-1 == prepareStbStmtBindStartTime( + tableName, pThreadInfo->bind_ts, bindArray, stbInfo, startTime, k, - pThreadInfo->time_precision, - *pSamplePos + pThreadInfo->time_precision /* is column */)) { return -1; } @@ -7426,8 +7419,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int64_t nTimeStampStep; uint64_t insert_interval; - bool sourceRand; - SSuperTable* stbInfo = pThreadInfo->stbInfo; if (stbInfo) { @@ -7442,18 +7433,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { maxSqlLen = stbInfo->maxSqlLen; nTimeStampStep = stbInfo->timeStampStep; insert_interval = stbInfo->insertInterval; - if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { - sourceRand = true; - } else { - sourceRand = false; // from sample data file - } } else { insertRows = g_args.num_of_DPT; interlaceRows = g_args.interlace_rows; maxSqlLen = g_args.max_sql_len; nTimeStampStep = g_args.timestamp_step; insert_interval = g_args.insert_interval; - sourceRand = true; } debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", @@ -7538,25 +7523,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { - if (sourceRand) { - generated = prepareStbStmtRand( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime - ); - } else { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - batchPerTbl, - insertRows, 0, - startTime, - &(pThreadInfo->samplePos)); - } + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + batchPerTbl, + insertRows, 0, + startTime, + &(pThreadInfo->samplePos)); } else { generated = generateStbInterlaceData( pThreadInfo, @@ -7746,17 +7720,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { pThreadInfo->totalInsertRows = 0; pThreadInfo->totalAffectedRows = 0; - bool sourceRand; - if (stbInfo) { - if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) { - sourceRand = true; - } else { - sourceRand = false; // from sample data file - } - } else { - sourceRand = true; - } - pThreadInfo->samplePos = 0; int percentComplete = 0; @@ -7795,32 +7758,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { int32_t generated; if (stbInfo) { if (stbInfo->iface == STMT_IFACE) { - if (sourceRand) { -/* generated = prepareStbStmtRand( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, - i, start_time - ); - */ - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); - } else { - generated = prepareStbStmtWithSample( - pThreadInfo, - tableName, - tableSeq, - g_args.num_of_RPR, - insertRows, i, start_time, - &(pThreadInfo->samplePos)); - } + generated = prepareStbStmtWithSample( + pThreadInfo, + tableName, + tableSeq, + g_args.num_of_RPR, + insertRows, i, start_time, + &(pThreadInfo->samplePos)); } else { generated = generateStbProgressiveData( stbInfo, @@ -7848,6 +7792,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { &remainderBufLen); } } + + verbosePrint("[%d] %s() LN%d generated=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, generated); + if (generated > 0) i += generated; else @@ -8058,17 +8007,22 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * return 0; } -static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) +static int parseSampleFileToStmt( + threadInfo *pThreadInfo, + SSuperTable *stbInfo, uint32_t timePrec) { - stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); - if (stbInfo->sampleBindArray == NULL) { + pThreadInfo->sampleBindArray = + calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + if (pThreadInfo->sampleBindArray == NULL) { errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n", - __func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); + __func__, __LINE__, + (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); return -1; } for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { - char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); + char *bindArray = + calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); if (bindArray == NULL) { errorPrint2("%s() LN%d, Failed to allocate %d bind params\n", __func__, __LINE__, (stbInfo->columnCount + 1)); @@ -8121,7 +8075,8 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) free(bindBuffer); } } - *((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray; + *((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) = + (uintptr_t)bindArray; } return 0; @@ -8311,10 +8266,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, pstr += sprintf(pstr, ")"); debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); - - if (stbInfo) { - parseSampleFileToStmt(stbInfo, timePrec); - } } for (int i = 0; i < threads; i++) { @@ -8347,7 +8298,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, || ((stbInfo) && (stbInfo->iface == STMT_IFACE))) { - pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { free(pids); @@ -8369,6 +8319,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(EXIT_FAILURE); } pThreadInfo->bind_ts = malloc(sizeof(int64_t)); + + if (stbInfo) { + parseSampleFileToStmt(pThreadInfo, stbInfo, timePrec); + } } } else { pThreadInfo->taos = NULL; @@ -8419,6 +8373,21 @@ static void startMultiThreadInsertData(int threads, char* db_name, tsem_destroy(&(pThreadInfo->lock_sem)); taos_close(pThreadInfo->taos); + if (pThreadInfo->sampleBindArray) { + for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) { + uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)( + pThreadInfo->sampleBindArray + + sizeof(uintptr_t *) * k)); + for (int c = 1; c < pThreadInfo->stbInfo->columnCount + 1; c++) { + TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c)); + if (bind) + tmfree(bind->buffer); + } + tmfree((char *)tmp); + } + tmfree(pThreadInfo->sampleBindArray); + } + debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", __func__, __LINE__, pThreadInfo->threadID, pThreadInfo->totalInsertRows, -- GitLab