提交 99f92792 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/hotfix/coverity_query' into hotfix/crash

......@@ -5153,9 +5153,7 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
}
if (!validateQuerySourceCols(pQueryMsg, *pExpr)) {
tfree(*pExpr);
return TSDB_CODE_QRY_INVALID_MSG;
goto _cleanup;
}
pMsg = createTableIdList(pQueryMsg, pMsg, pTableIdList);
......@@ -5227,8 +5225,17 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pQueryMsg, pQueryMsg->numOfTables, pQueryMsg->queryType, pQueryMsg->window.skey, pQueryMsg->window.ekey, pQueryMsg->numOfGroupCols,
pQueryMsg->order, pQueryMsg->numOfOutput, pQueryMsg->numOfCols, pQueryMsg->intervalTime,
pQueryMsg->fillType, pQueryMsg->tsLen, pQueryMsg->tsNumOfBlocks, pQueryMsg->limit, pQueryMsg->offset);
return 0;
_cleanup:
tfree(*pExpr);
taosArrayDestroy(*pTableIdList);
*pTableIdList = NULL;
tfree(*tbnameCond);
tfree(*groupbyCols);
tfree(*tagCols);
tfree(*tagCond);
return TSDB_CODE_QRY_INVALID_MSG;
}
static int32_t buildAirthmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
......@@ -5494,6 +5501,8 @@ static int compareTableIdInfo(const void* a, const void* b) {
return 0;
}
static void freeQInfo(SQInfo *pQInfo);
static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols) {
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
......@@ -5634,22 +5643,27 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
return pQInfo;
_cleanup:
tfree(pQuery->fillVal);
//tfree(pQuery->fillVal);
if (pQuery->sdata != NULL) {
for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
tfree(pQuery->sdata[col]);
}
}
//if (pQuery->sdata != NULL) {
// for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
// tfree(pQuery->sdata[col]);
// }
//}
tfree(pQuery->sdata);
tfree(pQuery->pFilterInfo);
tfree(pQuery->colList);
//
//tfree(pQuery->sdata);
//tfree(pQuery->pFilterInfo);
//tfree(pQuery->colList);
tfree(pExprs);
tfree(pGroupbyExpr);
//tfree(pExprs);
//tfree(pGroupbyExpr);
tfree(pQInfo);
//taosArrayDestroy(pQInfo->arrTableIdInfo);
//tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
//
//tfree(pQInfo);
freeQInfo(pQInfo);
return NULL;
}
......@@ -5668,7 +5682,6 @@ static bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
static void freeQInfo(SQInfo *pQInfo);
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable) {
int32_t code = TSDB_CODE_SUCCESS;
......@@ -5869,6 +5882,8 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
SSqlFuncMsg **pExprMsg = NULL;
SColIndex * pGroupColIndex = NULL;
SColumnInfo* pTagColumnInfo = NULL;
SExprInfo *pExprs = NULL;
SSqlGroupbyExpr *pGroupbyExpr = NULL;
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) !=
TSDB_CODE_SUCCESS) {
......@@ -5887,12 +5902,12 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
goto _over;
}
SExprInfo *pExprs = NULL;
if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
free(pExprMsg);
goto _over;
}
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _over;
}
......@@ -5939,6 +5954,10 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
}
(*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &tableGroupInfo, pTagColumnInfo);
pExprs = NULL;
pGroupbyExpr = NULL;
pTagColumnInfo = NULL;
if ((*pQInfo) == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _over;
......@@ -5947,9 +5966,15 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
_over:
tfree(tagCond);
tfree(tbnameCond);
tfree(pGroupColIndex);
free(tagCond);
free(tbnameCond);
free(pGroupColIndex);
if (pGroupbyExpr != NULL) {
taosArrayDestroy(pGroupbyExpr->columnInfo);
free(pGroupbyExpr);
}
free(pTagColumnInfo);
free(pExprs);
taosArrayDestroy(pTableIdList);
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
......
......@@ -138,7 +138,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
tDataTypeDesc[pWindowResInfo->type].nSize);
assert(p != NULL);
int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size);
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, tDataTypeDesc[pWindowResInfo->type].nSize,
......
......@@ -879,8 +879,8 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
UNUSED(ret);
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(ret);
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(sz);
tMemBucketPut(pMemBucket, pPage->data, pPage->num);
}
......@@ -965,10 +965,11 @@ char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePag
*/
tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0];
assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize);
fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
size_t ret = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
int32_t ret;
ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET);
UNUSED(ret);
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
UNUSED(sz);
thisVal = pPage->data;
}
return thisVal;
......
......@@ -65,8 +65,10 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
// validate the file magic number
STSBufFileHeader header = {0};
fseek(pTSBuf->f, 0, SEEK_SET);
fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f);
int32_t ret = fseek(pTSBuf->f, 0, SEEK_SET);
UNUSED(ret);
size_t sz = fread(&header, 1, sizeof(STSBufFileHeader), pTSBuf->f);
UNUSED(sz);
// invalid file
if (header.magic != TS_COMP_FILE_MAGIC) {
......@@ -97,22 +99,30 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
if (buf == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
//int64_t pos = ftell(pTSBuf->f); //pos not used
fread(buf, infoSize, 1, pTSBuf->f);
sz = fread(buf, infoSize, 1, pTSBuf->f);
UNUSED(sz);
// the length value for each vnode is not kept in file, so does not set the length value
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i];
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo));
}
free(buf);
fseek(pTSBuf->f, 0, SEEK_END);
ret = fseek(pTSBuf->f, 0, SEEK_END);
UNUSED(ret);
struct stat fileStat;
fstat(fileno(pTSBuf->f), &fileStat);
if (fstat(fileno(pTSBuf->f), &fileStat) != 0) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize = (uint32_t)fileStat.st_size;
tsBufResetPos(pTSBuf);
......@@ -278,19 +288,24 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
* set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block
*/
fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
int32_t ret = fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
size_t sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
UNUSED(sz);
pBlock->compLen = pBlock->padding;
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -offset, SEEK_CUR);
ret = fseek(pTSBuf->f, -offset, SEEK_CUR);
UNUSED(ret);
}
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
size_t sz = fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
UNUSED(sz);
sz = fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
UNUSED(sz);
sz = fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
UNUSED(sz);
sz = fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
UNUSED(sz);
if (decomp) {
pTSBuf->tsData.len =
......@@ -299,12 +314,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
}
// read the comp length at the length of comp block
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
sz = fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
UNUSED(sz);
// for backwards traverse, set the start position at the end of previous block
if (order == TSDB_ORDER_DESC) {
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
int32_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
UNUSED(r);
}
......@@ -441,7 +457,8 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
STSBlock* pBlock = &pTSBuf->block;
int32_t compBlockSize =
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
int32_t ret = fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
UNUSED(ret);
}
return 0;
......@@ -538,7 +555,7 @@ int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
assert(pHeader->tsOrder == TSDB_ORDER_ASC || pHeader->tsOrder == TSDB_ORDER_DESC);
int64_t r = fseek(pTSBuf->f, 0, SEEK_SET);
int32_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) {
return -1;
}
......@@ -743,7 +760,9 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
int32_t oldSize = pDestBuf->fileSize;
struct stat fileStat;
fstat(fileno(pDestBuf->f), &fileStat);
if (fstat(fileno(pDestBuf->f), &fileStat) != 0) {
return -1;
}
pDestBuf->fileSize = (uint32_t)fileStat.st_size;
assert(pDestBuf->fileSize == oldSize + size);
......@@ -766,8 +785,10 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
fwrite((void*)pData, 1, len, pTSBuf->f);
int32_t ret = fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
UNUSED(ret);
size_t sz = fwrite((void*)pData, 1, len, pTSBuf->f);
UNUSED(sz);
pTSBuf->fileSize += len;
pTSBuf->tsOrder = order;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册