未验证 提交 483574a0 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3733]<fix>: taosdemo buffer processing refactor. (#5766)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 ef2246da
......@@ -2949,9 +2949,6 @@ static int readSampleFromCsvFileToMem(
continue;
}
verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", (long)readLen,
superTblInfo->lenOfOneRow, getRows);
memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow,
line, readLen);
getRows++;
......@@ -3527,6 +3524,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER;
}
/*
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
......@@ -3536,6 +3534,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("ERROR: failed to read json, batch_create_tbl_num not found\n");
goto PARSE_OVER;
}
*/
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists
......@@ -4619,7 +4618,8 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
}
static int generateSQLHead(char *tableName, int32_t tableSeq,
threadInfo* pThreadInfo, SSuperTable* superTblInfo, char *buffer)
threadInfo* pThreadInfo, SSuperTable* superTblInfo,
char *buffer, int remainderBufLen)
{
int len;
if (superTblInfo) {
......@@ -4671,7 +4671,62 @@ static int generateSQLHead(char *tableName, int32_t tableSeq,
return len;
}
static int generateProgressiveDataBuffer(char *pTblName,
static int generateInterlaceDataBuffer(
char *tableName, int batchPerTbl, int i, int batchPerTblTimes,
int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
int64_t startTime,
int *pRemainderBufLen)
{
char *pstr = buffer;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr, *pRemainderBufLen);
if (headLen <= 0) {
return 0;
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
*pRemainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int k = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, *pRemainderBufLen, insertRows, 0,
startTime,
&(pThreadInfo->samplePos), &dataLen);
if (k > 0) {
pstr += dataLen;
*pRemainderBufLen -= dataLen;
} else {
pstr -= headLen;
pstr[0] = '\0';
}
return k;
}
static int generateProgressiveDataBuffer(
char *tableName,
int32_t tableSeq,
threadInfo *pThreadInfo, char *buffer,
int64_t insertRows,
......@@ -4691,6 +4746,7 @@ static int generateProgressiveDataBuffer(char *pTblName,
assert(buffer != NULL);
int k = 0;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
int remainderBufLen = maxSqlLen;
......@@ -4698,14 +4754,17 @@ static int generateProgressiveDataBuffer(char *pTblName,
char *pstr = buffer;
int headLen = generateSQLHead(pTblName, tableSeq, pThreadInfo, superTblInfo,
buffer);
int headLen = generateSQLHead(tableName, tableSeq, pThreadInfo, superTblInfo,
buffer, remainderBufLen);
if (headLen <= 0) {
return 0;
}
pstr += headLen;
remainderBufLen -= headLen;
int k;
int dataLen;
k = generateDataTail(pTblName, tableSeq, pThreadInfo, superTblInfo,
k = generateDataTail(tableName, tableSeq, pThreadInfo, superTblInfo,
g_args.num_of_RPR, pstr, remainderBufLen, insertRows, startFrom,
startTime,
pSamplePos, &dataLen);
......@@ -4811,50 +4870,23 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
return NULL;
}
int headLen;
if (i == 0) {
headLen = generateSQLHead(tableName, tableSeq, pThreadInfo,
superTblInfo, pstr);
} else {
headLen = snprintf(pstr, TSDB_TABLE_NAME_LEN, "%s.%s values",
pThreadInfo->db_name,
tableName);
}
// generate data buffer
verbosePrint("[%d] %s() LN%d i=%d buffer:\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, i, buffer);
pstr += headLen;
remainderBufLen -= headLen;
int dataLen = 0;
verbosePrint("[%d] %s() LN%d i=%d batchPerTblTimes=%d batchPerTbl = %d\n",
pThreadInfo->threadID, __func__, __LINE__,
i, batchPerTblTimes, batchPerTbl);
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
startTime = taosGetTimestamp(pThreadInfo->time_precision);
}
} else {
startTime = 1500000000000;
}
int generated = generateDataTail(
tableName, tableSeq, pThreadInfo, superTblInfo,
batchPerTbl, pstr, remainderBufLen, insertRows, 0,
int generated = generateInterlaceDataBuffer(
tableName, batchPerTbl, i, batchPerTblTimes,
tableSeq,
pThreadInfo, pstr,
insertRows,
startTime,
&(pThreadInfo->samplePos), &dataLen);
&remainderBufLen);
if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %d\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace;
} else if (generated == 0) {
break;
}
pstr += dataLen;
remainderBufLen -= dataLen;
tableSeq ++;
recOfBatch += batchPerTbl;
// startTime += batchPerTbl * superTblInfo->timeStampStep;
pThreadInfo->totalInsertRows += batchPerTbl;
......@@ -4862,7 +4894,6 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl, recOfBatch);
tableSeq ++;
if (insertMode == INTERLACE_INSERT_MODE) {
if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) {
// turn to first table
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册