提交 86f48515 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support child table limit and offset.

resolve memory alloc issue.
上级 86105e72
...@@ -1997,7 +1997,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, ...@@ -1997,7 +1997,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos,
int childTblCount = (limit < 0)?10000:limit; int childTblCount = (limit < 0)?10000:limit;
int count = 0; int count = 0;
childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); // childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN);
char* pTblName = childTblName; char* pTblName = childTblName;
while ((row = taos_fetch_row(res)) != NULL) { while ((row = taos_fetch_row(res)) != NULL) {
int32_t* len = taos_fetch_lengths(res); int32_t* len = taos_fetch_lengths(res);
...@@ -3955,7 +3955,7 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, ...@@ -3955,7 +3955,7 @@ static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp,
return dataLen; return dataLen;
} }
int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) { static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* stbInfo) {
int dataLen = 0; int dataLen = 0;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp);
for (int i = 0; i < stbInfo->columnCount; i++) { for (int i = 0; i < stbInfo->columnCount; i++) {
...@@ -4343,26 +4343,22 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) ...@@ -4343,26 +4343,22 @@ static int execInsert(threadInfo *winfo, char *buffer, int k)
if (superTblInfo) { if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
} else { } else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) { if (0 != retCode) {
affectedRows = -1; affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", winfo->threadID);
} else { } else {
affectedRows = k; affectedRows = k;
} }
} }
} else { } else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, 1); affectedRows = queryDbExec(winfo->taos, buffer, 1);
}
if (0 > affectedRows){
return affectedRows;
} }
return affectedRows; return affectedRows;
...@@ -4390,6 +4386,12 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4390,6 +4386,12 @@ static int generateDataBuffer(int32_t tableSeq,
char *pChildTblName; char *pChildTblName;
int childTblCount; int childTblCount;
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN);
return -1;
}
if (superTblInfo && (superTblInfo->childTblOffset > 0)) { if (superTblInfo && (superTblInfo->childTblOffset > 0)) {
// select tbname from stb limit 1 offset tableSeq // select tbname from stb limit 1 offset tableSeq
getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos,
...@@ -4397,11 +4399,6 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4397,11 +4399,6 @@ static int generateDataBuffer(int32_t tableSeq,
&pChildTblName, &childTblCount, &pChildTblName, &childTblCount,
1, tableSeq); 1, tableSeq);
} else { } else {
pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1);
if (NULL == pChildTblName) {
fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN);
return -1;
}
snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d",
superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq);
} }
...@@ -4421,9 +4418,9 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4421,9 +4418,9 @@ static int generateDataBuffer(int32_t tableSeq,
tableSeq % superTblInfo->tagSampleCount); tableSeq % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
fprintf(stderr, "tag buf failed to allocate memory\n"); free(pChildTblName);
free(pChildTblName); fprintf(stderr, "tag buf failed to allocate memory\n");
return -1; return -1;
} }
pstr += snprintf(pstr, pstr += snprintf(pstr,
...@@ -4498,6 +4495,11 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4498,6 +4495,11 @@ static int generateDataBuffer(int32_t tableSeq,
} }
len += retLen; len += retLen;
if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite
k++;
break;
}
} }
} else { } else {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
...@@ -4518,15 +4520,15 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4518,15 +4520,15 @@ static int generateDataBuffer(int32_t tableSeq,
lenOfBinary); lenOfBinary);
} }
pstr += sprintf(pstr, " %s", data);
//assert(len + pstr - buffer < BUFFER_SIZE); //assert(len + pstr - buffer < BUFFER_SIZE);
if (len + pstr - buffer >= g_args.max_sql_len) { // too long if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long
break; k++;
break;
} }
pstr += sprintf(pstr, " %s", data);
} }
verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%p\n", __func__, __LINE__, len, k, buffer);
k++; k++;
startFrom ++; startFrom ++;
...@@ -4535,7 +4537,8 @@ static int generateDataBuffer(int32_t tableSeq, ...@@ -4535,7 +4537,8 @@ static int generateDataBuffer(int32_t tableSeq,
break; break;
} }
free(pChildTblName); if (pChildTblName)
free(pChildTblName);
return k; return k;
} }
...@@ -4613,6 +4616,7 @@ static void* syncWrite(void *sarg) { ...@@ -4613,6 +4616,7 @@ static void* syncWrite(void *sarg) {
if (affectedRows < 0) if (affectedRows < 0)
goto free_and_statistics_2; goto free_and_statistics_2;
winfo->totalInsertRows += generated; winfo->totalInsertRows += generated;
winfo->totalAffectedRows += affectedRows; winfo->totalAffectedRows += affectedRows;
...@@ -4712,7 +4716,7 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4712,7 +4716,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
break; break;
} }
} }
if (insert_interval) { if (insert_interval) {
winfo->st = taosGetTimestampUs(); winfo->st = taosGetTimestampUs();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册