未验证 提交 56776b62 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-6448]<fix>: taosdemo stmt rand race. (#7719)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 2483985d
...@@ -291,7 +291,6 @@ typedef struct SSuperTable_S { ...@@ -291,7 +291,6 @@ typedef struct SSuperTable_S {
uint64_t lenOfTagOfOneRow; uint64_t lenOfTagOfOneRow;
char* sampleDataBuf; char* sampleDataBuf;
char* sampleBindArray;
//int sampleRowCount; //int sampleRowCount;
//int sampleUsePos; //int sampleUsePos;
...@@ -438,7 +437,8 @@ typedef struct SQueryMetaInfo_S { ...@@ -438,7 +437,8 @@ typedef struct SQueryMetaInfo_S {
typedef struct SThreadInfo_S { typedef struct SThreadInfo_S {
TAOS * taos; TAOS * taos;
TAOS_STMT *stmt; TAOS_STMT *stmt;
int64_t *bind_ts; char* sampleBindArray;
int64_t *bind_ts;
int threadID; int threadID;
char db_name[TSDB_DB_NAME_LEN]; char db_name[TSDB_DB_NAME_LEN];
uint32_t time_precision; uint32_t time_precision;
...@@ -5738,20 +5738,6 @@ static void postFreeResource() { ...@@ -5738,20 +5738,6 @@ static void postFreeResource() {
free(g_Dbs.db[i].superTbls[j].sampleDataBuf); free(g_Dbs.db[i].superTbls[j].sampleDataBuf);
g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL; g_Dbs.db[i].superTbls[j].sampleDataBuf = NULL;
} }
if (g_Dbs.db[i].superTbls[j].sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
g_Dbs.db[i].superTbls[j].sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < g_Dbs.db[i].superTbls[j].columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
}
tmfree((char *)g_Dbs.db[i].superTbls[j].sampleBindArray);
if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) { if (0 != g_Dbs.db[i].superTbls[j].tagDataBuf) {
free(g_Dbs.db[i].superTbls[j].tagDataBuf); free(g_Dbs.db[i].superTbls[j].tagDataBuf);
...@@ -6085,9 +6071,6 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -6085,9 +6071,6 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
int32_t affectedRows; int32_t affectedRows;
SSuperTable* stbInfo = pThreadInfo->stbInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
uint16_t iface; uint16_t iface;
if (stbInfo) if (stbInfo)
iface = stbInfo->iface; iface = stbInfo->iface;
...@@ -6105,12 +6088,18 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) ...@@ -6105,12 +6088,18 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
switch(iface) { switch(iface) {
case TAOSC_IFACE: case TAOSC_IFACE:
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
affectedRows = queryDbExec( affectedRows = queryDbExec(
pThreadInfo->taos, pThreadInfo->taos,
pThreadInfo->buffer, INSERT_TYPE, false); pThreadInfo->buffer, INSERT_TYPE, false);
break; break;
case REST_IFACE: case REST_IFACE:
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
__func__, __LINE__, pThreadInfo->buffer);
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port, if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
pThreadInfo->buffer, pThreadInfo)) { pThreadInfo->buffer, pThreadInfo)) {
affectedRows = -1; affectedRows = -1;
...@@ -7088,12 +7077,12 @@ static int32_t prepareStbStmtBindRand( ...@@ -7088,12 +7077,12 @@ static int32_t prepareStbStmtBindRand(
return 0; return 0;
} }
static int32_t prepareStbStmtBindWithSample( static int32_t prepareStbStmtBindStartTime(
char *tableName,
int64_t *ts, int64_t *ts,
char *bindArray, SSuperTable *stbInfo, char *bindArray, SSuperTable *stbInfo,
int64_t startTime, int32_t recSeq, int64_t startTime, int32_t recSeq,
int32_t timePrec, int32_t timePrec)
int64_t samplePos)
{ {
TAOS_BIND *bind; TAOS_BIND *bind;
...@@ -7110,6 +7099,10 @@ static int32_t prepareStbStmtBindWithSample( ...@@ -7110,6 +7099,10 @@ static int32_t prepareStbStmtBindWithSample(
} else { } else {
*bind_ts = startTime + stbInfo->timeStampStep * recSeq; *bind_ts = startTime + stbInfo->timeStampStep * recSeq;
} }
verbosePrint("%s() LN%d, tableName: %s, bind_ts=%"PRId64"\n",
__func__, __LINE__, tableName, *bind_ts);
bind->buffer_length = sizeof(int64_t); bind->buffer_length = sizeof(int64_t);
bind->buffer = bind_ts; bind->buffer = bind_ts;
bind->length = &bind->buffer_length; bind->length = &bind->buffer_length;
...@@ -7118,7 +7111,7 @@ static int32_t prepareStbStmtBindWithSample( ...@@ -7118,7 +7111,7 @@ static int32_t prepareStbStmtBindWithSample(
return 0; return 0;
} }
static int32_t prepareStbStmtRand( UNUSED_FUNC static int32_t prepareStbStmtRand(
threadInfo *pThreadInfo, threadInfo *pThreadInfo,
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
...@@ -7299,14 +7292,14 @@ static int32_t prepareStbStmtWithSample( ...@@ -7299,14 +7292,14 @@ static int32_t prepareStbStmtWithSample(
uint32_t k; uint32_t k;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char *bindArray = (char *)(*((uintptr_t *) char *bindArray = (char *)(*((uintptr_t *)
(stbInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos)))); (pThreadInfo->sampleBindArray + (sizeof(char *)) * (*pSamplePos))));
/* columnCount + 1 (ts) */ /* columnCount + 1 (ts) */
if (-1 == prepareStbStmtBindWithSample( if (-1 == prepareStbStmtBindStartTime(
tableName,
pThreadInfo->bind_ts, pThreadInfo->bind_ts,
bindArray, stbInfo, bindArray, stbInfo,
startTime, k, startTime, k,
pThreadInfo->time_precision, pThreadInfo->time_precision
*pSamplePos
/* is column */)) { /* is column */)) {
return -1; return -1;
} }
...@@ -7427,8 +7420,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -7427,8 +7420,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t nTimeStampStep; int64_t nTimeStampStep;
uint64_t insert_interval; uint64_t insert_interval;
bool sourceRand;
SSuperTable* stbInfo = pThreadInfo->stbInfo; SSuperTable* stbInfo = pThreadInfo->stbInfo;
if (stbInfo) { if (stbInfo) {
...@@ -7443,18 +7434,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -7443,18 +7434,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
maxSqlLen = stbInfo->maxSqlLen; maxSqlLen = stbInfo->maxSqlLen;
nTimeStampStep = stbInfo->timeStampStep; nTimeStampStep = stbInfo->timeStampStep;
insert_interval = stbInfo->insertInterval; insert_interval = stbInfo->insertInterval;
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else { } else {
insertRows = g_args.num_of_DPT; insertRows = g_args.num_of_DPT;
interlaceRows = g_args.interlace_rows; interlaceRows = g_args.interlace_rows;
maxSqlLen = g_args.max_sql_len; maxSqlLen = g_args.max_sql_len;
nTimeStampStep = g_args.timestamp_step; nTimeStampStep = g_args.timestamp_step;
insert_interval = g_args.insert_interval; insert_interval = g_args.insert_interval;
sourceRand = true;
} }
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
...@@ -7539,25 +7524,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -7539,25 +7524,14 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int32_t generated; int32_t generated;
if (stbInfo) { if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) { if (stbInfo->iface == STMT_IFACE) {
if (sourceRand) { generated = prepareStbStmtWithSample(
generated = prepareStbStmtRand( pThreadInfo,
pThreadInfo, tableName,
tableName, tableSeq,
tableSeq, batchPerTbl,
batchPerTbl, insertRows, 0,
insertRows, 0, startTime,
startTime &(pThreadInfo->samplePos));
);
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
batchPerTbl,
insertRows, 0,
startTime,
&(pThreadInfo->samplePos));
}
} else { } else {
generated = generateStbInterlaceData( generated = generateStbInterlaceData(
pThreadInfo, pThreadInfo,
...@@ -7747,17 +7721,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -7747,17 +7721,6 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->totalInsertRows = 0; pThreadInfo->totalInsertRows = 0;
pThreadInfo->totalAffectedRows = 0; pThreadInfo->totalAffectedRows = 0;
bool sourceRand;
if (stbInfo) {
if (0 == strncasecmp(stbInfo->dataSource, "rand", 4)) {
sourceRand = true;
} else {
sourceRand = false; // from sample data file
}
} else {
sourceRand = true;
}
pThreadInfo->samplePos = 0; pThreadInfo->samplePos = 0;
int percentComplete = 0; int percentComplete = 0;
...@@ -7796,32 +7759,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -7796,32 +7759,13 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int32_t generated; int32_t generated;
if (stbInfo) { if (stbInfo) {
if (stbInfo->iface == STMT_IFACE) { if (stbInfo->iface == STMT_IFACE) {
if (sourceRand) { generated = prepareStbStmtWithSample(
/* generated = prepareStbStmtRand( pThreadInfo,
pThreadInfo, tableName,
tableName, tableSeq,
tableSeq, g_args.num_of_RPR,
g_args.num_of_RPR, insertRows, i, start_time,
insertRows, &(pThreadInfo->samplePos));
i, start_time
);
*/
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
} else {
generated = prepareStbStmtWithSample(
pThreadInfo,
tableName,
tableSeq,
g_args.num_of_RPR,
insertRows, i, start_time,
&(pThreadInfo->samplePos));
}
} else { } else {
generated = generateStbProgressiveData( generated = generateStbProgressiveData(
stbInfo, stbInfo,
...@@ -7849,6 +7793,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -7849,6 +7793,11 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
&remainderBufLen); &remainderBufLen);
} }
} }
verbosePrint("[%d] %s() LN%d generated=%d\n",
pThreadInfo->threadID,
__func__, __LINE__, generated);
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
...@@ -8059,17 +8008,22 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in * ...@@ -8059,17 +8008,22 @@ static int convertHostToServAddr(char *host, uint16_t port, struct sockaddr_in *
return 0; return 0;
} }
static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) static int parseSampleFileToStmt(
threadInfo *pThreadInfo,
SSuperTable *stbInfo, uint32_t timePrec)
{ {
stbInfo->sampleBindArray = calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); pThreadInfo->sampleBindArray =
if (stbInfo->sampleBindArray == NULL) { calloc(1, sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
if (pThreadInfo->sampleBindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n", errorPrint2("%s() LN%d, Failed to allocate %"PRIu64" bind array buffer\n",
__func__, __LINE__, (uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE); __func__, __LINE__,
(uint64_t)sizeof(char *) * MAX_SAMPLES_ONCE_FROM_FILE);
return -1; return -1;
} }
for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) { for (int i=0; i < MAX_SAMPLES_ONCE_FROM_FILE; i++) {
char *bindArray = calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1)); char *bindArray =
calloc(1, sizeof(TAOS_BIND) * (stbInfo->columnCount + 1));
if (bindArray == NULL) { if (bindArray == NULL) {
errorPrint2("%s() LN%d, Failed to allocate %d bind params\n", errorPrint2("%s() LN%d, Failed to allocate %d bind params\n",
__func__, __LINE__, (stbInfo->columnCount + 1)); __func__, __LINE__, (stbInfo->columnCount + 1));
...@@ -8122,7 +8076,8 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec) ...@@ -8122,7 +8076,8 @@ static int parseSampleFileToStmt(SSuperTable *stbInfo, uint32_t timePrec)
free(bindBuffer); free(bindBuffer);
} }
} }
*((uintptr_t *)(stbInfo->sampleBindArray + (sizeof(char *)) * i)) = (uintptr_t)bindArray; *((uintptr_t *)(pThreadInfo->sampleBindArray + (sizeof(char *)) * i)) =
(uintptr_t)bindArray;
} }
return 0; return 0;
...@@ -8312,10 +8267,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -8312,10 +8267,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pstr += sprintf(pstr, ")"); pstr += sprintf(pstr, ")");
debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer); debugPrint("%s() LN%d, stmtBuffer: %s", __func__, __LINE__, stmtBuffer);
if (stbInfo) {
parseSampleFileToStmt(stbInfo, timePrec);
}
} }
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
...@@ -8348,7 +8299,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -8348,7 +8299,6 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|| ((stbInfo) || ((stbInfo)
&& (stbInfo->iface == STMT_IFACE))) { && (stbInfo->iface == STMT_IFACE))) {
pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos); pThreadInfo->stmt = taos_stmt_init(pThreadInfo->taos);
if (NULL == pThreadInfo->stmt) { if (NULL == pThreadInfo->stmt) {
free(pids); free(pids);
...@@ -8370,6 +8320,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -8370,6 +8320,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
pThreadInfo->bind_ts = malloc(sizeof(int64_t)); pThreadInfo->bind_ts = malloc(sizeof(int64_t));
if (stbInfo) {
parseSampleFileToStmt(pThreadInfo, stbInfo, timePrec);
}
} }
} else { } else {
pThreadInfo->taos = NULL; pThreadInfo->taos = NULL;
...@@ -8420,6 +8374,21 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -8420,6 +8374,21 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(pThreadInfo->lock_sem)); tsem_destroy(&(pThreadInfo->lock_sem));
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
if (pThreadInfo->sampleBindArray) {
for (int k = 0; k < MAX_SAMPLES_ONCE_FROM_FILE; k++) {
uintptr_t *tmp = (uintptr_t *)(*(uintptr_t *)(
pThreadInfo->sampleBindArray
+ sizeof(uintptr_t *) * k));
for (int c = 1; c < pThreadInfo->stbInfo->columnCount + 1; c++) {
TAOS_BIND *bind = (TAOS_BIND *)((char *)tmp + (sizeof(TAOS_BIND) * c));
if (bind)
tmfree(bind->buffer);
}
tmfree((char *)tmp);
}
tmfree(pThreadInfo->sampleBindArray);
}
debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n", debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->totalInsertRows, pThreadInfo->threadID, pThreadInfo->totalInsertRows,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册