提交 07603323 编写于 作者: sangshuduo's avatar sangshuduo

[TD-5872]<fix>: taosdemo stmt improve.

上级 fd9c06c0
...@@ -5074,13 +5074,6 @@ static int getRowDataFromSample( ...@@ -5074,13 +5074,6 @@ static int getRowDataFromSample(
SSuperTable* superTblInfo, int64_t* sampleUsePos) SSuperTable* superTblInfo, int64_t* sampleUsePos)
{ {
if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
/* int ret = readSampleFromCsvFileToMem(superTblInfo);
if (0 != ret) {
tmfree(superTblInfo->sampleDataBuf);
superTblInfo->sampleDataBuf = NULL;
return -1;
}
*/
*sampleUsePos = 0; *sampleUsePos = 0;
} }
...@@ -5719,7 +5712,9 @@ static int64_t generateInterlaceDataWithoutStb( ...@@ -5719,7 +5712,9 @@ static int64_t generateInterlaceDataWithoutStb(
#if STMT_IFACE_ENABLED == 1 #if STMT_IFACE_ENABLED == 1
static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
char *dataType, int32_t dataLen, char **ptr, char *value) char *dataType, int32_t dataLen, char **ptr,
int32_t timePrec,
char *value)
{ {
if (0 == strncasecmp(dataType, if (0 == strncasecmp(dataType,
"BINARY", strlen("BINARY"))) { "BINARY", strlen("BINARY"))) {
...@@ -5878,7 +5873,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind, ...@@ -5878,7 +5873,25 @@ static int32_t prepareStmtBindArrayByType(TAOS_BIND *bind,
int64_t *bind_ts2 = (int64_t *) *ptr; int64_t *bind_ts2 = (int64_t *) *ptr;
if (value) { if (value) {
*bind_ts2 = atoll(value); if (strchr(value, ':') && strchr(value, '-')) {
int i = 0;
while(value[i] != '\0') {
if (value[i] == '\"' || value[i] == '\'') {
value[i] = ' ';
}
i++;
}
int64_t tmpEpoch;
if (TSDB_CODE_SUCCESS != taosParseTime(
value, &tmpEpoch, strlen(value),
timePrec, 0)) {
errorPrint("Input %s, time format error!\n", value);
return -1;
}
*bind_ts2 = tmpEpoch;
} else {
*bind_ts2 = atoll(value);
}
} else { } else {
*bind_ts2 = rand_bigint(); *bind_ts2 = rand_bigint();
} }
...@@ -5903,6 +5916,7 @@ static int32_t prepareStmtWithoutStb( ...@@ -5903,6 +5916,7 @@ static int32_t prepareStmtWithoutStb(
uint32_t batch, uint32_t batch,
int64_t insertRows, int64_t insertRows,
int64_t recordFrom, int64_t recordFrom,
int32_t timePrec,
int64_t startTime) int64_t startTime)
{ {
int ret = taos_stmt_set_tbname(stmt, tableName); int ret = taos_stmt_set_tbname(stmt, tableName);
...@@ -5957,7 +5971,9 @@ static int32_t prepareStmtWithoutStb( ...@@ -5957,7 +5971,9 @@ static int32_t prepareStmtWithoutStb(
bind, bind,
data_type[i], data_type[i],
g_args.len_of_binary, g_args.len_of_binary,
&ptr, NULL)) { &ptr,
timePrec,
NULL)) {
return -1; return -1;
} }
} }
...@@ -5987,6 +6003,8 @@ static int32_t prepareStmtWithoutStb( ...@@ -5987,6 +6003,8 @@ static int32_t prepareStmtWithoutStb(
static int32_t prepareStbStmtBind( static int32_t prepareStbStmtBind(
char *bindArray, SSuperTable *stbInfo, bool sourceRand, char *bindArray, SSuperTable *stbInfo, bool sourceRand,
int64_t startTime, int32_t recSeq, int64_t startTime, int32_t recSeq,
int32_t timePrec,
int64_t samplePos,
bool isColumn) bool isColumn)
{ {
char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary); char *bindBuffer = calloc(1, DOUBLE_BUFF_LEN); // g_args.len_of_binary);
...@@ -6035,12 +6053,14 @@ static int32_t prepareStbStmtBind( ...@@ -6035,12 +6053,14 @@ static int32_t prepareStbStmtBind(
stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen, stbInfo->columns[i-1].dataLen,
&ptr, &ptr,
timePrec,
NULL)) { NULL)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
} }
} else { } else {
char *restStr = stbInfo->sampleDataBuf + cursor; char *restStr = stbInfo->sampleDataBuf
+ stbInfo->lenOfOneRow * samplePos + cursor;
int lengthOfRest = strlen(restStr); int lengthOfRest = strlen(restStr);
int index = 0; int index = 0;
...@@ -6050,7 +6070,7 @@ static int32_t prepareStbStmtBind( ...@@ -6050,7 +6070,7 @@ static int32_t prepareStbStmtBind(
} }
} }
memset(bindBuffer, 0, g_args.len_of_binary); memset(bindBuffer, 0, DOUBLE_BUFF_LEN);
strncpy(bindBuffer, restStr, index); strncpy(bindBuffer, restStr, index);
cursor += index + 1; // skip ',' too cursor += index + 1; // skip ',' too
...@@ -6059,6 +6079,7 @@ static int32_t prepareStbStmtBind( ...@@ -6059,6 +6079,7 @@ static int32_t prepareStbStmtBind(
stbInfo->columns[i-1].dataType, stbInfo->columns[i-1].dataType,
stbInfo->columns[i-1].dataLen, stbInfo->columns[i-1].dataLen,
&ptr, &ptr,
timePrec,
bindBuffer)) { bindBuffer)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
...@@ -6076,6 +6097,7 @@ static int32_t prepareStbStmtBind( ...@@ -6076,6 +6097,7 @@ static int32_t prepareStbStmtBind(
stbInfo->tags[t].dataType, stbInfo->tags[t].dataType,
stbInfo->tags[t].dataLen, stbInfo->tags[t].dataLen,
&ptr, &ptr,
timePrec,
NULL)) { NULL)) {
free(bindBuffer); free(bindBuffer);
return -1; return -1;
...@@ -6097,6 +6119,7 @@ static int32_t prepareStbStmt( ...@@ -6097,6 +6119,7 @@ static int32_t prepareStbStmt(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
int ret; int ret;
...@@ -6137,7 +6160,10 @@ static int32_t prepareStbStmt( ...@@ -6137,7 +6160,10 @@ static int32_t prepareStbStmt(
} }
if (-1 == prepareStbStmtBind( if (-1 == prepareStbStmtBind(
tagsArray, stbInfo, tagRand, -1, -1, false /* is tag */)) { tagsArray, stbInfo, tagRand, -1, -1,
timePrec,
*pSamplePos,
false /* is tag */)) {
tmfree(tagsValBuf); tmfree(tagsValBuf);
tmfree(tagsArray); tmfree(tagsArray);
return -1; return -1;
...@@ -6173,7 +6199,10 @@ static int32_t prepareStbStmt( ...@@ -6173,7 +6199,10 @@ static int32_t prepareStbStmt(
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
/* columnCount + 1 (ts) */ /* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand, if (-1 == prepareStbStmtBind(bindArray, stbInfo, sourceRand,
startTime, k, true /* is column */)) { startTime, k,
timePrec,
*pSamplePos,
true /* is column */)) {
free(bindArray); free(bindArray);
return -1; return -1;
} }
...@@ -6198,6 +6227,9 @@ static int32_t prepareStbStmt( ...@@ -6198,6 +6227,9 @@ static int32_t prepareStbStmt(
if (!sourceRand) { if (!sourceRand) {
(*pSamplePos) ++; (*pSamplePos) ++;
if ((*pSamplePos) == MAX_SAMPLES_ONCE_FROM_FILE) {
*pSamplePos = 0;
}
} }
if (recordFrom >= insertRows) { if (recordFrom >= insertRows) {
...@@ -6218,6 +6250,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -6218,6 +6250,7 @@ static int32_t prepareStbStmtInterlace(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
return prepareStbStmt( return prepareStbStmt(
...@@ -6227,6 +6260,7 @@ static int32_t prepareStbStmtInterlace( ...@@ -6227,6 +6260,7 @@ static int32_t prepareStbStmtInterlace(
tableSeq, tableSeq,
batch, batch,
insertRows, 0, startTime, insertRows, 0, startTime,
timePrec,
pSamplePos); pSamplePos);
} }
...@@ -6239,6 +6273,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -6239,6 +6273,7 @@ static int32_t prepareStbStmtProgressive(
uint64_t insertRows, uint64_t insertRows,
uint64_t recordFrom, uint64_t recordFrom,
int64_t startTime, int64_t startTime,
int32_t timePrec,
int64_t *pSamplePos) int64_t *pSamplePos)
{ {
return prepareStbStmt( return prepareStbStmt(
...@@ -6248,6 +6283,7 @@ static int32_t prepareStbStmtProgressive( ...@@ -6248,6 +6283,7 @@ static int32_t prepareStbStmtProgressive(
tableSeq, tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, recordFrom, startTime, insertRows, recordFrom, startTime,
timePrec,
pSamplePos); pSamplePos);
} }
...@@ -6450,6 +6486,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6450,6 +6486,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
startTime, startTime,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
...@@ -6476,6 +6513,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -6476,6 +6513,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->stmt, tableName, pThreadInfo->stmt, tableName,
batchPerTbl, batchPerTbl,
insertRows, i, insertRows, i,
pThreadInfo->time_precision,
startTime); startTime);
#else #else
generated = -1; generated = -1;
...@@ -6679,6 +6717,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6679,6 +6717,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
tableSeq, tableSeq,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, start_time, insertRows, i, start_time,
pThreadInfo->time_precision,
&(pThreadInfo->samplePos)); &(pThreadInfo->samplePos));
#else #else
generated = -1; generated = -1;
...@@ -6699,6 +6738,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -6699,6 +6738,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
tableName, tableName,
g_args.num_of_RPR, g_args.num_of_RPR,
insertRows, i, insertRows, i,
pThreadInfo->time_precision,
start_time); start_time);
#else #else
generated = -1; generated = -1;
...@@ -6915,6 +6955,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -6915,6 +6955,14 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0; return 0;
} }
#if STMT_IFACE_ENABLED == 1
static void parseSampleFileToStmt()
{
// TODO:
}
#endif
static void startMultiThreadInsertData(int threads, char* db_name, static void startMultiThreadInsertData(int threads, char* db_name,
char* precision, SSuperTable* superTblInfo) { char* precision, SSuperTable* superTblInfo) {
...@@ -7070,6 +7118,46 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7070,6 +7118,46 @@ static void startMultiThreadInsertData(int threads, char* db_name,
memset(pids, 0, threads * sizeof(pthread_t)); memset(pids, 0, threads * sizeof(pthread_t));
memset(infos, 0, threads * sizeof(threadInfo)); memset(infos, 0, threads * sizeof(threadInfo));
#if STMT_IFACE_ENABLED == 1
char *stmtBuffer = calloc(1, BUFFER_SIZE);
assert(stmtBuffer);
if ((g_args.iface == STMT_IFACE)
|| ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) {
char *pstr = stmtBuffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
parseSampleFileToStmt();
}
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *pThreadInfo = infos + i; threadInfo *pThreadInfo = infos + i;
pThreadInfo->threadID = i; pThreadInfo->threadID = i;
...@@ -7101,12 +7189,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7101,12 +7189,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|| ((superTblInfo) || ((superTblInfo)
&& (superTblInfo->iface == STMT_IFACE))) { && (superTblInfo->iface == STMT_IFACE))) {
int columnCount;
if (superTblInfo) {
columnCount = superTblInfo->columnCount;
} else {
columnCount = g_args.num_of_CPR;
}
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) { if (NULL == pThreadInfo->stmt) {
...@@ -7119,41 +7201,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7119,41 +7201,15 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(-1); exit(-1);
} }
char *buffer = calloc(1, BUFFER_SIZE); int ret = taos_stmt_prepare(pThreadInfo->stmt, stmtBuffer, 0);
assert(buffer);
char *pstr = buffer;
if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL
== superTblInfo->autoCreateTable)) {
pstr += sprintf(pstr, "INSERT INTO ? USING %s TAGS(?",
superTblInfo->sTblName);
for (int tag = 0; tag < (superTblInfo->tagCount - 1);
tag ++ ) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ") VALUES(?");
} else {
pstr += sprintf(pstr, "INSERT INTO ? VALUES(?");
}
for (int col = 0; col < columnCount; col ++) {
pstr += sprintf(pstr, ",?");
}
pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, buffer: %s", __func__, __LINE__, buffer);
int ret = taos_stmt_prepare(pThreadInfo->stmt, buffer, 0);
if (ret != 0){ if (ret != 0){
errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n", errorPrint("failed to execute taos_stmt_prepare. return 0x%x. reason: %s\n",
ret, taos_stmt_errstr(pThreadInfo->stmt)); ret, taos_stmt_errstr(pThreadInfo->stmt));
free(pids); free(pids);
free(infos); free(infos);
free(buffer); free(stmtBuffer);
exit(-1); exit(-1);
} }
free(buffer);
} }
#endif #endif
} else { } else {
...@@ -7181,6 +7237,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -7181,6 +7237,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
} }
} }
#if STMT_IFACE_ENABLED == 1
free(stmtBuffer);
#endif
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册