未验证 提交 625b2dc9 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20513 from taosdata/fix/TD-23136

fix:converity scan defects
......@@ -192,7 +192,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql;
SSyncQueryParam* newpParam;
SSyncQueryParam* newpParam = NULL;
if (param == NULL) {
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (newpParam == NULL) {
......
......@@ -506,6 +506,9 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
}
if(code != 0){
taosMemoryFree(pRes);
}
tFreeSShowVariablesRsp(&rsp);
}
......
......@@ -117,7 +117,7 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
if (unlikely(fromPrecision >= TSDB_TIME_PRECISION_HOURS)) {
int64_t unit = smlToMilli[fromPrecision - TSDB_TIME_PRECISION_HOURS];
if (unit > INT64_MAX / tsInt64) {
if (tsInt64 != 0 && unit > INT64_MAX / tsInt64) {
return -1;
}
tsInt64 *= unit;
......@@ -637,7 +637,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
ESchemaAction action = SCHEMA_ACTION_NULL;
smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
int code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
if(code != 0){
return code;
}
if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG) {
SField field = {0};
field.type = kv->type;
......@@ -646,6 +649,10 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
taosArrayPush(results, &field);
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
if(index == NULL){
uError("smlBuildFieldsList get error, key:%s", kv->key);
return TSDB_CODE_SML_INVALID_DATA;
}
uint16_t newIndex = *index;
if (isTag) newIndex -= numOfCols;
SField *field = (SField *)taosArrayGet(results, newIndex);
......@@ -774,9 +781,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
SArray *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SArray *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
code = smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList tag1 failed. %s", info->id, pName.tname);
goto end;
}
code = smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList col1 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
......@@ -820,8 +834,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosArrayPush(pTags, &field);
}
}
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags,
pTableMeta->tableInfo.numOfColumns, true);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList tag2 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
......@@ -868,8 +886,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
}
}
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
code = smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns,
pTableMeta->tableInfo.numOfColumns, false);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlBuildFieldsList col2 failed. %s", info->id, pName.tname);
goto end;
}
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1097,6 +1119,9 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
}
if (taos != NULL) {
info->taos = acquireTscObj(*(int64_t *)taos);
if(info->taos == NULL){
goto cleanup;
}
code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog);
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " get catalog error %d", info->id, code);
......@@ -1151,13 +1176,16 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
SSmlLineInfo *elements = info->lines + i;
SSmlTableInfo *tinfo = NULL;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measure, elements->measureTagsLen);
if(tmp) tinfo = *tmp;
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
if(tmp) tinfo = *tmp;
} else {
tinfo = *(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
SSmlTableInfo** tmp = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag,
elements->measureLen + elements->tagsLen);
if(tmp) tinfo = *tmp;
}
if (tinfo == NULL) {
......
......@@ -1237,10 +1237,12 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
if (cnt >= payloadNum) {
payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if (tmp != NULL) {
info->lines = (SSmlLineInfo *)tmp;
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
if (tmp == NULL) {
ret = TSDB_CODE_OUT_OF_MEMORY;
return ret;
}
info->lines = (SSmlLineInfo *)tmp;
memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
}
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break;
......
......@@ -292,6 +292,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);
if (tinfo->tableDataCtx == NULL) {
smlDestroyTableInfo(info, tinfo);
smlBuildInvalidDataMsg(&info->msgBuf, "smlInitTableDataCtx error", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
......
......@@ -292,7 +292,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return TSDB_CODE_SUCCESS;
}
if (info->dataFormat) {
if (info->dataFormat && info->currSTableMeta != NULL) {
if (needConverTime) {
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
......
......@@ -115,7 +115,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb->cfg.numOfVgroups > 1) {
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
......
......@@ -134,7 +134,7 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
showObj.pMnode = pMnode;
showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
strncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000;
SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
......
......@@ -269,6 +269,7 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
}
int32_t tqMetaRestoreHandle(STQ* pTq) {
int code = 0;
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
return -1;
......@@ -290,7 +291,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
return -1;
code = -1;
goto end;
}
walRefVer(handle.pRef, handle.snapshotVer);
......@@ -307,16 +309,21 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL);
if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey);
return -1;
code = -1;
goto end;
}
void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner);
if (scanner == NULL) {
tqError("cannot extract stream scanner for %s", handle.subKey);
code = -1;
goto end;
}
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
if (handle.execHandle.pExecReader == NULL) {
tqError("cannot extract exec reader for %s", handle.subKey);
code = -1;
goto end;
}
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
......@@ -347,8 +354,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
end:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return 0;
return code;
}
......@@ -370,11 +370,6 @@ int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) {
}
return TSDB_CODE_SUCCESS;
_error:
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
return TSDB_CODE_OUT_OF_MEMORY;
}
void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
......@@ -441,9 +436,6 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) {
goto _end;
}
// set const
pCreateTbReq->flags = 0;
......@@ -460,6 +452,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
if (size == 2) {
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
......@@ -477,6 +470,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} else {
tagArray = taosArrayInit(size - 1, sizeof(STagVal));
if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
......@@ -503,6 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
......@@ -556,6 +551,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SVCreateTbReq* pCreateTbReq = NULL;
if (!(pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateStbReq)))) {
taosMemoryFree(ctbName);
goto _end;
};
......@@ -572,6 +568,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// set tag content
tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq);
goto _end;
}
STagVal tagVal = {
......@@ -586,6 +584,8 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tTagNew(tagArray, 1, false, &pTag);
tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) {
taosMemoryFree(ctbName);
tdDestroySVCreateTbReq(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
......@@ -630,6 +630,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
// rows
if (!pVals && !(pVals = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)))) {
taosArrayDestroy(tbData.aRowP);
tdDestroySVCreateTbReq(tbData.pCreateTbReq);
goto _end;
}
......@@ -680,6 +681,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SSubmitReq2 submitReq = {0};
if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) {
tDestroySSubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
......@@ -693,6 +695,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
goto _end;
}
((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
......@@ -704,6 +707,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
tqError("failed to encode submit req since %s", terrstr());
tEncoderClear(&encoder);
rpcFreeCont(pBuf);
tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE);
continue;
}
tEncoderClear(&encoder);
......
......@@ -245,8 +245,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes);
const char *sql[] = {
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\","
"\"dc\":\"lga\"}}]"};
"[{\"metric\":\"sys.cpu.nice3\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"};
char *sql1[1] = {0};
for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024);
......
......@@ -858,7 +858,9 @@ void loop_consume(SThreadInfo* pInfo) {
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
taosFsyncFile(pInfo->pConsumeRowsFile);
if(taosFsyncFile(pInfo->pConsumeRowsFile) < 0){
printf("taosFsyncFile error:%s", strerror(errno));
}
taosCloseFile(&pInfo->pConsumeRowsFile);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册