未验证 提交 b18d392c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21855 from taosdata/fix/TD-24968

fix coverity san issues
...@@ -337,6 +337,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -337,6 +337,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tagArray = taosArrayInit(1, sizeof(STagVal)); tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) { if (!tagArray) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
goto _end; goto _end;
} }
STagVal tagVal = { STagVal tagVal = {
...@@ -352,6 +353,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -352,6 +353,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tagArray = taosArrayDestroy(tagArray); tagArray = taosArrayDestroy(tagArray);
if (pTag == NULL) { if (pTag == NULL) {
tdDestroySVCreateTbReq(pCreateTbReq); tdDestroySVCreateTbReq(pCreateTbReq);
taosMemoryFreeClear(pCreateTbReq);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end; goto _end;
} }
......
...@@ -74,7 +74,6 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -74,7 +74,6 @@ void streamSchedByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) { if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)trigger) < 0) {
taosFreeQitem(trigger);
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return; return;
} }
......
...@@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch); streamStateClearBatch(batch);
} }
...@@ -364,7 +364,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -364,7 +364,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
streamStateClearBatch(batch); streamStateClearBatch(batch);
...@@ -376,7 +376,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -376,7 +376,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf); taosMemoryFree(valBuf);
} }
{ {
...@@ -480,7 +480,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { ...@@ -480,7 +480,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
break; break;
} }
memcpy(pNewPos->pRowBuff, pVal, pVLen); memcpy(pNewPos->pRowBuff, pVal, pVLen);
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->rowSize, &pNewPos, POINTER_BYTES); code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos); destroyRowBuffPos(pNewPos);
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册