diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 3838344a8b59beafabeecbf4556123ffc5763eb0..8ca220893d71e91444a11cc1a8ae3f61f23a6c9c 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -5074,13 +5074,6 @@ static int getRowDataFromSample( SSuperTable* superTblInfo, int64_t* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - /* int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return -1; - } - */ *sampleUsePos = 0; } @@ -5719,7 +5712,9 @@ static int64_t generateInterlaceDataWithoutStb( #if STMT_IFACE_ENABLED == 1 static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, - char *dataType, int32_t dataLen, char **ptr, char *value) + char *dataType, int32_t dataLen, char **ptr, + int32_t timePrec, + char *value) { if (0 == strncasecmp(dataType, "BINARY", strlen("BINARY"))) { @@ -5878,7 +5873,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, int64_t *bind_ts2 = (int64_t *) *ptr; if (value) { - *bind_ts2 = atoll(value); + if (strchr(value, ':') && strchr(value, '-')) { + int i = 0; + while(value[i] != '\0') { + if (value[i] == '\"' || value[i] == '\'') { + value[i] = ' '; + } + i++; + } + int64_t tmpEpoch; + if (TSDB_CODE_SUCCESS != taosParseTime( + value, &tmpEpoch, strlen(value), + timePrec, 0)) { + errorPrint("Input %s, time format error!\n", value); + return -1; + } + *bind_ts2 = tmpEpoch; + } else { + *bind_ts2 = atoll(value); + } } else { *bind_ts2 = rand_bigint(); } @@ -5903,6 +5916,7 @@ static int32_t prepareStmtWithoutStb( uint32_t batch, int64_t insertRows, int64_t recordFrom, + int32_t timePrec, int64_t startTime) { int ret = taos_stmt_set_tbname(stmt, tableName); @@ -5957,7 +5971,9 @@ static int32_t prepareStmtWithoutStb( bind, data_type[i], g_args.len_of_binary, - &ptr, NULL)) { + &ptr, + timePrec, + NULL)) { return -1; } } @@ -5987,6 +6003,8 @@ static int32_t prepareStmtWithoutStb( static int32_t prepareStbStmtBind( char *bindArray, SSuperTable *stbInfo, bool sourceRand, int64_t startTime, int32_t recSeq, + int32_t timePrec, + int64_t samplePos, bool isColumn) { char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); @@ -6035,12 +6053,14 @@ static int32_t prepareStbStmtBind( stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataLen, &ptr, + timePrec, NULL)) { free(bindBuffer); return -1; } } else { - char *restStr = stbInfo->sampleDataBuf + cursor; + char *restStr = stbInfo->sampleDataBuf + + stbInfo->lenOfOneRow * samplePos + cursor; int lengthOfRest = strlen(restStr); int index = 0; @@ -6050,7 +6070,7 @@ static int32_t prepareStbStmtBind( } } - memset(bindBuffer, 0, g_args.len_of_binary); + memset(bindBuffer, 0, DOUBLE_BUFF_LEN); strncpy(bindBuffer, restStr, index); cursor += index + 1; // skip ',' too @@ -6059,6 +6079,7 @@ static int32_t prepareStbStmtBind( stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataLen, &ptr, + timePrec, bindBuffer)) { free(bindBuffer); return -1; @@ -6076,6 +6097,7 @@ static int32_t prepareStbStmtBind( stbInfo->tags[t].dataType, stbInfo->tags[t].dataLen, &ptr, + timePrec, NULL)) { free(bindBuffer); return -1; @@ -6097,6 +6119,7 @@ static int32_t prepareStbStmt( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { int ret; @@ -6137,7 +6160,10 @@ static int32_t prepareStbStmt( } if (-1 == prepareStbStmtBind( - tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { + tagsArray, stbInfo, tagRand, -1, -1, + timePrec, + *pSamplePos, + false /* is tag */)) { tmfree(tagsValBuf); tmfree(tagsArray); return -1; @@ -6173,7 +6199,10 @@ static int32_t prepareStbStmt( for (k = 0; k < batch;) { /* columnCount + 1 (ts) */ if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, - startTime, k, true /* is column */)) { + startTime, k, + timePrec, + *pSamplePos, + true /* is column */)) { free(bindArray); return -1; } @@ -6198,6 +6227,9 @@ static int32_t prepareStbStmt( if (!sourceRand) { (*pSamplePos) ++; + if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) { + *pSamplePos = 0; + } } if (recordFrom >= insertRows) { @@ -6218,6 +6250,7 @@ static int32_t prepareStbStmtInterlace( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { return prepareStbStmt( @@ -6227,6 +6260,7 @@ static int32_t prepareStbStmtInterlace( tableSeq, batch, insertRows, 0, startTime, + timePrec, pSamplePos); } @@ -6239,6 +6273,7 @@ static int32_t prepareStbStmtProgressive( uint64_t insertRows, uint64_t recordFrom, int64_t startTime, + int32_t timePrec, int64_t *pSamplePos) { return prepareStbStmt( @@ -6248,6 +6283,7 @@ static int32_t prepareStbStmtProgressive( tableSeq, g_args.num_of_RPR, insertRows, recordFrom, startTime, + timePrec, pSamplePos); } @@ -6450,6 +6486,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { batchPerTbl, insertRows, i, startTime, + pThreadInfo->time_precision, &(pThreadInfo->samplePos)); #else generated = -1; @@ -6476,6 +6513,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { pThreadInfo->stmt, tableName, batchPerTbl, insertRows, i, + pThreadInfo->time_precision, startTime); #else generated = -1; @@ -6679,6 +6717,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { tableSeq, g_args.num_of_RPR, insertRows, i, start_time, + pThreadInfo->time_precision, &(pThreadInfo->samplePos)); #else generated = -1; @@ -6699,6 +6738,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { tableName, g_args.num_of_RPR, insertRows, i, + pThreadInfo->time_precision, start_time); #else generated = -1; @@ -6915,6 +6955,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * return 0; } +#if STMT_IFACE_ENABLED == 1 +static void parseSampleFileToStmt() +{ + // TODO: + +} +#endif + static void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSuperTable* superTblInfo) { @@ -7070,6 +7118,46 @@ static void startMultiThreadInsertData(int threads, char* db_name, memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); +#if STMT_IFACE_ENABLED == 1 + char *stmtBuffer = calloc(1, BUFFER_SIZE); + assert(stmtBuffer); + if ((g_args.iface == STMT_IFACE) + || ((superTblInfo) + && (superTblInfo->iface == STMT_IFACE))) { + char *pstr = stmtBuffer; + + if ((superTblInfo) + && (AUTO_CREATE_SUBTBL + == superTblInfo->autoCreateTable)) { + pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", + superTblInfo->sTblName); + for (int tag = 0; tag < (superTblInfo->tagCount - 1); + tag ++ ) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ") VALUES(?"); + } else { + pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); + } + + int columnCount; + if (superTblInfo) { + columnCount = superTblInfo->columnCount; + } else { + columnCount = g_args.num_of_CPR; + } + + for (int col = 0; col < columnCount; col ++) { + pstr += sprintf(pstr, ",?"); + } + pstr += sprintf(pstr, ")"); + + debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); + + parseSampleFileToStmt(); + } +#endif + for (int i = 0; i < threads; i++) { threadInfo *pThreadInfo = infos + i; pThreadInfo->threadID = i; @@ -7101,12 +7189,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, || ((superTblInfo) && (superTblInfo->iface == STMT_IFACE))) { - int columnCount; - if (superTblInfo) { - columnCount = superTblInfo->columnCount; - } else { - columnCount = g_args.num_of_CPR; - } pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); if (NULL == pThreadInfo->stmt) { @@ -7119,41 +7201,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, exit(-1); } - char *buffer = calloc(1, BUFFER_SIZE); - assert(buffer); - char *pstr = buffer; - - if ((superTblInfo) - && (AUTO_CREATE_SUBTBL - == superTblInfo->autoCreateTable)) { - pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?", - superTblInfo->sTblName); - for (int tag = 0; tag < (superTblInfo->tagCount - 1); - tag ++ ) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ") VALUES(?"); - } else { - pstr += sprintf(pstr, "INSERT INTO ? VALUES(?"); - } - - for (int col = 0; col < columnCount; col ++) { - pstr += sprintf(pstr, ",?"); - } - pstr += sprintf(pstr, ")"); - - debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer); - int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0); + int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0); if (ret != 0){ errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", ret, taos_stmt_errstr(pThreadInfo->stmt)); free(pids); free(infos); - free(buffer); + free(stmtBuffer); exit(-1); } - - free(buffer); } #endif } else { @@ -7181,6 +7237,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, } } +#if STMT_IFACE_ENABLED == 1 + free(stmtBuffer); +#endif + for (int i = 0; i < threads; i++) { pthread_join(pids[i], NULL); }