From 690faf1c30ce2dcdb5c5785d3aaa1d27221412dd Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 9 Aug 2021 15:14:26 +0800 Subject: [PATCH] cherry pick from develop branch. --- src/kit/taosdemo/taosdemo.c | 152 +++++++++++++++++++++++++----------- 1 file changed, 108 insertions(+), 44 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index bd0feaeb92..0bd10f9b9e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -5067,13 +5067,6 @@ static int getRowDataFromSample( SSuperTable* superTblInfo, int64_t* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - /* int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return -1; - } - */ *sampleUsePos = 0; } @@ -5725,7 +5718,9 @@ static int64_t generateInterlaceDataWithoutStb( #if STMT_IFACE_ENABLED == 1 static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, - char *dataType, int32_t dataLen, char **ptr, char *value) + char *dataType, int32_t dataLen, char **ptr, + int32_t timePrec, + char *value) { if (0 == strncasecmp(dataType, "BINARY", strlen("BINARY"))) { @@ -5884,7 +5879,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, int64_t *bind_ts2 = (int64_t *) *ptr; if (value) { - *bind_ts2 = atoll(value); + if (strchr(value, ':') && strchr(value, '-')) { + int i = 0; + while(value[i] != '\0') { + if (value[i] == '\"' || value[i] == '\'') { + value[i] = ' '; + } + i++; + } + int64_t tmpEpoch; + if (TSDB_CODE_SUCCESS != taosParseTime( + value, &tmpEpoch, strlen(value), + timePrec, 0)) { + errorPrint("Input %s, time format error!\n", value); + return -1; + } + *bind_ts2 = tmpEpoch; + } else { + *bind_ts2 = atoll(value); + } } else { *bind_ts2 = rand_bigint(); } @@ -5909,6 +5922,7 @@ static int32_t prepareStmtWithoutStb( uint32_t batch, int64_t insertRows, int64_t recordFrom, + int32_t timePrec, int64_t startTime) { int ret = taos_stmt_set_tbname(stmt, tableName); @@ -5963,7 +5977,9 @@ static int32_t prepareStmtWithoutStb( bind, data_type[i], g_args.len_of_binary, - &ptr, NULL)) { + &ptr, + timePrec, + NULL)) { return -1; } } @@ -5993,6 +6009,8 @@ static int32_t prepareStmtWithoutStb( static int32_t prepareStbStmtBind( char *bindArray, SSuperTable *stbInfo, bool sourceRand, int64_t startTime, int32_t recSeq, + int32_t timePrec, + int64_t samplePos, bool isColumn) { char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); @@ -6041,12 +6059,14 @@ static int32_t prepareStbStmtBind( stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataLen, &ptr, + timePrec, NULL)) { free(bindBuffer); return -1; } } else { - char *restStr = stbInfo->sampleDataBuf + cursor; + char *restStr = stbInfo->sampleDataBuf + + stbInfo->lenOfOneRow * samplePos + cursor; int lengthOfRest = strlen(restStr); int index = 0; @@ -6056,7 +6076,7 @@ static int32_t prepareStbStmtBind( } } - memset(bindBuffer, 0, g_args.len_of_binary); + memset(bindBuffer, 0, DOUBLE_BUFF_LEN); strncpy(bindBuffer, restStr, index); cursor += index + 1; // skip ',' too @@ -6065,6 +6085,7 @@ static int32_t prepareStbStmtBind( stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataLen, &ptr, + timePrec, bindBuffer)) { free(bindBuffer); return -1; @@ -6082,6 +6103,7 @@ static int32_t prepareStbStmtBind( stbInfo->tags[t].dataType, stbInfo->tags[t].dataLen, &ptr, + timePrec, NULL)) { free(bindBuffer); return -1; @@ -6103,6 +6125,7 @@ static int32_t prepareStbStmt( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { int ret; @@ -6143,7 +6166,10 @@ static int32_t prepareStbStmt( } if (-1 == prepareStbStmtBind( - tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { + tagsArray, stbInfo, tagRand, -1, -1, + timePrec, + *pSamplePos, + false /* is tag */)) { tmfree(tagsValBuf); tmfree(tagsArray); return -1; @@ -6179,7 +6205,10 @@ static int32_t prepareStbStmt( for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, - startTime, k, true /* is column */)) { + startTime, k, + timePrec, + *pSamplePos, + true /* is column */)) { free(bindArray); return -1; } @@ -6204,6 +6233,9 @@ static int32_t prepareStbStmt( if (!sourceRand) { (*pSamplePos) ++; + if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) { + *pSamplePos = 0; + } } if (recordFrom >= insertRows) { @@ -6224,6 +6256,7 @@ static int32_t prepareStbStmtInterlace( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { return prepareStbStmt( @@ -6233,6 +6266,7 @@ static int32_t prepareStbStmtInterlace( tableSeq, batch, insertRows, 0, startTime, + timePrec, pSamplePos); } @@ -6245,6 +6279,7 @@ static int32_t prepareStbStmtProgressive( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { return prepareStbStmt( @@ -6254,6 +6289,7 @@ static int32_t prepareStbStmtProgressive( tableSeq, g_args.num_of_RPR, insertRows, recordFrom, startTime, + timePrec, pSamplePos); } @@ -6459,6 +6495,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { batchPerTbl, insertRows, i, startTime, + pThreadInfo->time_precision, &(pThreadInfo->samplePos)); #else generated = -1; @@ -6485,6 +6522,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->stmt, tableName, batchPerTbl, insertRows, i, + pThreadInfo->time_precision, startTime); #else generated = -1; @@ -6698,6 +6736,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { tableSeq, g_args.num_of_RPR, insertRows, i, start_time, + pThreadInfo->time_precision, &(pThreadInfo->samplePos)); #else generated = -1; @@ -6718,6 +6757,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { tableName, g_args.num_of_RPR, insertRows, i, + pThreadInfo->time_precision, start_time); #else generated = -1; @@ -6941,6 +6981,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * return 0; } +#if STMT_IFACE_ENABLED == 1 +static void parseSampleFileToStmt() +{ + // TODO: + +} +#endif + static void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSuperTable* superTblInfo) { @@ -7096,6 +7144,46 @@ static void startMultiThreadInsertData(int threads, char* db_name, memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); +#if STMT_IFACE_ENABLED == 1 + char *stmtBuffer = calloc(1, BUFFER_SIZE); + assert(stmtBuffer); + if ((g_args.iface == STMT_IFACE) + || ((superTblInfo) + && (superTblInfo->iface == STMT_IFACE))) { + char *pstr = stmtBuffer; + + if ((superTblInfo) + && (AUTO_CREATE_SUBTBL + == superTblInfo->autoCreateTable)) { + pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", + superTblInfo->sTblName); + for (int tag = 0; tag < (superTblInfo->tagCount - 1); + tag ++ ) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ") VALUES(?"); + } else { + pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); + } + + int columnCount; + if (superTblInfo) { + columnCount = superTblInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); + + parseSampleFileToStmt(); + } +#endif + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; @@ -7127,12 +7215,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, || ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { - int columnCount; - if (superTblInfo) { - columnCount = superTblInfo->columnCount; - } else { - columnCount = g_args.num_of_CPR; - } pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { @@ -7145,35 +7227,13 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(-1); } - char buffer[BUFFER_SIZE]; - char *pstr = buffer; - - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL - == superTblInfo->autoCreateTable)) { - pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", - superTblInfo->sTblName); - for (int tag = 0; tag < (superTblInfo->tagCount - 1); - tag ++ ) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ") VALUES(?"); - } else { - pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); - } - - for (int col = 0; col < columnCount; col ++) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ")"); - - debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer); - int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); + int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0); if (ret != 0){ errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", ret, taos_stmt_errstr(pThreadInfo->stmt)); free(pids); free(infos); + free(stmtBuffer); exit(-1); } } @@ -7203,6 +7263,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } +#if STMT_IFACE_ENABLED == 1 + free(stmtBuffer); +#endif + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); } -- GitLab