提交 0fc1c8cd 编写于 作者: H Haojun Liao

[td-837] check mem after malloc

上级 8ba7387e
...@@ -32,7 +32,7 @@ ENDIF () ...@@ -32,7 +32,7 @@ ENDIF ()
IF (TD_LINUX_64) IF (TD_LINUX_64)
ADD_DEFINITIONS(-D_M_X64) ADD_DEFINITIONS(-D_M_X64)
ADD_DEFINITIONS(-D_TD_LINUX_64) ADD_DEFINITIONS(-D_TD_LINUX_64)
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -pg -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
ENDIF () ENDIF ()
......
...@@ -1502,7 +1502,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in ...@@ -1502,7 +1502,7 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
// clear local saved number of results // clear local saved number of results
trsupport->localBuffer->num = 0; trsupport->localBuffer->num = 0;
tscTrace("%p sub:%p retrieve failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql, tscDebug("%p sub:%p retrieve/query failed, code:%s, orderOfSub:%d, retry:%d", trsupport->pParentSql, pSql,
tstrerror(code), subqueryIndex, trsupport->numOfRetry); tstrerror(code), subqueryIndex, trsupport->numOfRetry);
SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql); SSqlObj *pNew = tscCreateSqlObjForSubquery(trsupport->pParentSql, trsupport, pSql);
...@@ -1521,9 +1521,10 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in ...@@ -1521,9 +1521,10 @@ static int32_t tscReissueSubquery(SRetrieveSupport *trsupport, SSqlObj *pSql, in
// if failed to process sql, let following code handle the pSql // if failed to process sql, let following code handle the pSql
if (ret == TSDB_CODE_SUCCESS) { if (ret == TSDB_CODE_SUCCESS) {
taos_free_result(pSql); taos_free_result(pSql);
return ret;
} else {
return ret;
} }
return code;
} }
void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) { void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numOfRows) {
...@@ -1681,7 +1682,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1681,7 +1682,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", tscDebug("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code)); pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(numOfRows), tstrerror(pParentSql->res.code));
tscHandleSubqueryError(param, tres, numOfRows); tscHandleSubqueryError(param, tres, numOfRows);
...@@ -1692,13 +1693,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -1692,13 +1693,13 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
assert(numOfRows == taos_errno(pSql)); assert(numOfRows == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry); tscDebug("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(numOfRows), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) { if (tscReissueSubquery(trsupport, pSql, numOfRows) == TSDB_CODE_SUCCESS) {
return; return;
} }
} else { } else {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows)); tscDebug("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(numOfRows));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); // set global code and abort atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, numOfRows); // set global code and abort
} }
...@@ -1792,7 +1793,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1792,7 +1793,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
// stable query killed or other subquery failed, all query stopped // stable query killed or other subquery failed, all query stopped
if (pParentSql->res.code != TSDB_CODE_SUCCESS) { if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY; trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
tscTrace("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s", tscError("%p query cancelled or failed, sub:%p, vgId:%d, orderOfSub:%d, code:%s, global code:%s",
pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code)); pParentSql, pSql, pVgroup->vgId, trsupport->subqueryIndex, tstrerror(code), tstrerror(pParentSql->res.code));
tscHandleSubqueryError(param, tres, code); tscHandleSubqueryError(param, tres, code);
...@@ -1810,12 +1811,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1810,12 +1811,12 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
assert(code == taos_errno(pSql)); assert(code == taos_errno(pSql));
if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) { if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY) {
tscTrace("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry); tscWarn("%p sub:%p failed code:%s, retry:%d", pParentSql, pSql, tstrerror(code), trsupport->numOfRetry);
if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) { if (tscReissueSubquery(trsupport, pSql, code) == TSDB_CODE_SUCCESS) {
return; return;
} }
} else { } else {
tscTrace("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code)); tscError("%p sub:%p reach the max retry times, set global code:%s", pParentSql, pSql, tstrerror(code));
atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort atomic_val_compare_exchange_32(&pParentSql->res.code, TSDB_CODE_SUCCESS, code); // set global code and abort
} }
...@@ -1823,7 +1824,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { ...@@ -1823,7 +1824,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
return; return;
} }
tscTrace("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql, tscDebug("%p sub:%p query complete, ep:%s, vgId:%d, orderOfSub:%d, retrieve data", trsupport->pParentSql, pSql,
pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex); pVgroup->epAddr[0].fqdn, pVgroup->vgId, trsupport->subqueryIndex);
if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode if (pSql->res.qhandle == 0) { // qhandle is NULL, code is TSDB_CODE_SUCCESS means no results generated from this vnode
......
...@@ -113,6 +113,15 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) { ...@@ -113,6 +113,15 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
} }
} }
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
uint32_t v = rand();
if (v % 5 <= 1) {
return NULL;
} else {
return realloc(p, __size);
}
}
#define calloc u_calloc #define calloc u_calloc
#define malloc u_malloc #define malloc u_malloc
#endif #endif
...@@ -430,7 +439,10 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin ...@@ -430,7 +439,10 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
pRuntimeEnv->summary.internalSupSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc; pRuntimeEnv->summary.internalSupSize += (pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * inc;
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) { for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize); int32_t ret = createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} }
pWindowResInfo->capacity = (int32_t)newCap; pWindowResInfo->capacity = (int32_t)newCap;
...@@ -1465,7 +1477,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY ...@@ -1465,7 +1477,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
} }
// set the output buffer for the selectivity + tag query // set the output buffer for the selectivity + tag query
static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) { static int32_t setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
SQuery* pQuery = pRuntimeEnv->pQuery; SQuery* pQuery = pRuntimeEnv->pQuery;
if (isSelectivityWithTagsQuery(pQuery)) { if (isSelectivityWithTagsQuery(pQuery)) {
...@@ -1474,6 +1486,9 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p ...@@ -1474,6 +1486,9 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
SQLFunctionCtx *p = NULL; SQLFunctionCtx *p = NULL;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES); SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
if (pTagCtx == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base; SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
...@@ -1499,6 +1514,8 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p ...@@ -1499,6 +1514,8 @@ static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *p
taosTFree(pTagCtx); taosTFree(pTagCtx);
} }
} }
return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery, char* buf) { static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery, char* buf) {
...@@ -1600,7 +1617,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order ...@@ -1600,7 +1617,9 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
resetCtxOutputBuf(pRuntimeEnv); resetCtxOutputBuf(pRuntimeEnv);
} }
setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx); if (setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx) != TSDB_CODE_SUCCESS) {
goto _clean;
}
qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2232,7 +2251,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa ...@@ -2232,7 +2251,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage)); char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom if (tmp == NULL) { // todo handle the oom
assert(0); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} else { } else {
pQuery->sdata[i] = (tFilePage *)tmp; pQuery->sdata[i] = (tFilePage *)tmp;
} }
...@@ -2263,7 +2282,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB ...@@ -2263,7 +2282,7 @@ static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pB
char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage)); char *tmp = realloc(pQuery->sdata[i], bytes * newSize + sizeof(tFilePage));
if (tmp == NULL) { // todo handle the oom if (tmp == NULL) { // todo handle the oom
assert(0); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} else { } else {
memset(tmp + sizeof(tFilePage) + bytes * pRec->rows, 0, (size_t)((newSize - pRec->rows) * bytes)); memset(tmp + sizeof(tFilePage) + bytes * pRec->rows, 0, (size_t)((newSize - pRec->rows) * bytes));
pQuery->sdata[i] = (tFilePage *)tmp; pQuery->sdata[i] = (tFilePage *)tmp;
...@@ -2803,7 +2822,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2803,7 +2822,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
size_t size = taosArrayGetSize(pGroup); size_t size = taosArrayGetSize(pGroup);
tFilePage **buffer = pQuery->sdata; tFilePage **buffer = pQuery->sdata;
int32_t* posList = calloc(size, sizeof(int32_t)); int32_t *posList = calloc(size, sizeof(int32_t));
STableQueryInfo **pTableList = malloc(POINTER_BYTES * size); STableQueryInfo **pTableList = malloc(POINTER_BYTES * size);
if (pTableList == NULL || posList == NULL) { if (pTableList == NULL || posList == NULL) {
...@@ -2861,6 +2880,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { ...@@ -2861,6 +2880,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
} }
char* buf = calloc(1, pRuntimeEnv->interBufSize); char* buf = calloc(1, pRuntimeEnv->interBufSize);
if (buf == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf); setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery, buf);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo); resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
...@@ -4307,6 +4330,10 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) { ...@@ -4307,6 +4330,10 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
int32_t offset = 0; int32_t offset = 0;
SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo)); SFillColInfo* pFillCol = calloc(numOfCols, sizeof(SFillColInfo));
if (pFillCol == NULL) {
return NULL;
}
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; SExprInfo* pExprInfo = &pQuery->pSelectExpr[i];
...@@ -5433,6 +5460,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5433,6 +5460,10 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
int32_t numOfFilters = pColInfo->numOfFilters; int32_t numOfFilters = pColInfo->numOfFilters;
if (numOfFilters > 0) { if (numOfFilters > 0) {
pColInfo->filters = calloc(numOfFilters, sizeof(SColumnFilterInfo)); pColInfo->filters = calloc(numOfFilters, sizeof(SColumnFilterInfo));
if (pColInfo->filters == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
} }
for (int32_t f = 0; f < numOfFilters; ++f) { for (int32_t f = 0; f < numOfFilters; ++f) {
...@@ -5447,6 +5478,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5447,6 +5478,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
pColFilter->len = htobe64(pFilterMsg->len); pColFilter->len = htobe64(pFilterMsg->len);
pColFilter->pz = (int64_t)calloc(1, (size_t)(pColFilter->len + 1 * TSDB_NCHAR_SIZE)); // note: null-terminator pColFilter->pz = (int64_t)calloc(1, (size_t)(pColFilter->len + 1 * TSDB_NCHAR_SIZE)); // note: null-terminator
if (pColFilter->pz == 0) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
memcpy((void *)pColFilter->pz, pMsg, (size_t)pColFilter->len); memcpy((void *)pColFilter->pz, pMsg, (size_t)pColFilter->len);
pMsg += (pColFilter->len + 1); pMsg += (pColFilter->len + 1);
} else { } else {
...@@ -5460,6 +5496,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5460,6 +5496,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
} }
*pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES); *pExpr = calloc(pQueryMsg->numOfOutput, POINTER_BYTES);
if (*pExpr == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg; SSqlFuncMsg *pExprMsg = (SSqlFuncMsg *)pMsg;
for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfOutput; ++i) {
...@@ -5546,6 +5587,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5546,6 +5587,11 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
if (pQueryMsg->numOfTags > 0) { if (pQueryMsg->numOfTags > 0) {
(*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags); (*tagCols) = calloc(1, sizeof(SColumnInfo) * pQueryMsg->numOfTags);
if (*tagCols == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) { for (int32_t i = 0; i < pQueryMsg->numOfTags; ++i) {
SColumnInfo* pTagCol = (SColumnInfo*) pMsg; SColumnInfo* pTagCol = (SColumnInfo*) pMsg;
...@@ -5562,6 +5608,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList, ...@@ -5562,6 +5608,12 @@ static int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SArray **pTableIdList,
// the tag query condition expression string is located at the end of query msg // the tag query condition expression string is located at the end of query msg
if (pQueryMsg->tagCondLen > 0) { if (pQueryMsg->tagCondLen > 0) {
*tagCond = calloc(1, pQueryMsg->tagCondLen); *tagCond = calloc(1, pQueryMsg->tagCondLen);
if (*tagCond == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _cleanup;
}
memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen); memcpy(*tagCond, pMsg, pQueryMsg->tagCondLen);
pMsg += pQueryMsg->tagCondLen; pMsg += pQueryMsg->tagCondLen;
} }
...@@ -5752,6 +5804,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { ...@@ -5752,6 +5804,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
} }
pQuery->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQuery->numOfFilterCols); pQuery->pFilterInfo = calloc(1, sizeof(SSingleColumnFilterInfo) * pQuery->numOfFilterCols);
if (pQuery->pFilterInfo == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i) { for (int32_t i = 0, j = 0; i < pQuery->numOfCols; ++i) {
if (pQuery->colList[i].numOfFilters > 0) { if (pQuery->colList[i].numOfFilters > 0) {
...@@ -5762,6 +5817,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) { ...@@ -5762,6 +5817,9 @@ static int32_t createFilterInfo(void *pQInfo, SQuery *pQuery) {
pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters; pFilterInfo->numOfFilters = pQuery->colList[i].numOfFilters;
pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem)); pFilterInfo->pFilters = calloc(pFilterInfo->numOfFilters, sizeof(SColumnFilterElem));
if (pFilterInfo->pFilters == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) { for (int32_t f = 0; f < pFilterInfo->numOfFilters; ++f) {
SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f]; SColumnFilterElem *pSingleColFilter = &pFilterInfo->pFilters[f];
...@@ -5911,6 +5969,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5911,6 +5969,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
if (pQuery == NULL) { if (pQuery == NULL) {
goto _cleanup_query; goto _cleanup_query;
} }
pQInfo->runtimeEnv.pQuery = pQuery; pQInfo->runtimeEnv.pQuery = pQuery;
pQuery->numOfCols = numOfCols; pQuery->numOfCols = numOfCols;
...@@ -5996,6 +6055,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -5996,6 +6055,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pQInfo->pBuf == NULL) {
goto _cleanup;
}
int32_t index = 0; int32_t index = 0;
for(int32_t i = 0; i < numOfGroups; ++i) { for(int32_t i = 0; i < numOfGroups; ++i) {
...@@ -6010,8 +6073,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, ...@@ -6010,8 +6073,8 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
for(int32_t j = 0; j < s; ++j) { for(int32_t j = 0; j < s; ++j) {
STableKeyInfo* info = taosArrayGet(pa, j); STableKeyInfo* info = taosArrayGet(pa, j);
STableId* id = TSDB_TABLEID(info->pTable);
STableId* id = TSDB_TABLEID(info->pTable);
STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo); STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
if (pTableId != NULL ) { if (pTableId != NULL ) {
window.skey = pTableId->key; window.skey = pTableId->key;
...@@ -6140,35 +6203,59 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -6140,35 +6203,59 @@ static void freeQInfo(SQInfo *pQInfo) {
return; return;
} }
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
qDebug("QInfo:%p start to free QInfo", pQInfo); qDebug("QInfo:%p start to free QInfo", pQInfo);
for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
taosTFree(pQuery->sdata[col]);
}
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pQuery != NULL) {
if (pColFilter->numOfFilters > 0) { if (pQuery->sdata != NULL) {
taosTFree(pColFilter->pFilters); for (int32_t col = 0; col < pQuery->numOfOutput; ++col) {
taosTFree(pQuery->sdata[col]);
}
taosTFree(pQuery->sdata);
} }
}
if (pQuery->pSelectExpr != NULL) { if (pQuery->fillVal != NULL) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { taosTFree(pQuery->fillVal);
SExprInfo* pExprInfo = &pQuery->pSelectExpr[i]; }
if (pExprInfo->pExpr != NULL) { for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
tExprTreeDestroy(&pExprInfo->pExpr, NULL); SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i];
if (pColFilter->numOfFilters > 0) {
taosTFree(pColFilter->pFilters);
} }
} }
taosTFree(pQuery->pSelectExpr); if (pQuery->pSelectExpr != NULL) {
} for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SExprInfo *pExprInfo = &pQuery->pSelectExpr[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
}
}
taosTFree(pQuery->pSelectExpr);
}
if (pQuery->pGroupbyExpr != NULL) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
taosTFree(pQuery->pGroupbyExpr);
}
if (pQuery->fillVal != NULL) { taosTFree(pQuery->tagColList);
taosTFree(pQuery->fillVal); taosTFree(pQuery->pFilterInfo);
if (pQuery->colList != NULL) {
for (int32_t i = 0; i < pQuery->numOfCols; i++) {
SColumnInfo *column = pQuery->colList + i;
freeColumnFilterInfo(column->filters, column->numOfFilters);
}
taosTFree(pQuery->colList);
}
taosTFree(pQuery);
} }
// todo refactor, extract method to destroytableDataInfo // todo refactor, extract method to destroytableDataInfo
...@@ -6193,24 +6280,7 @@ static void freeQInfo(SQInfo *pQInfo) { ...@@ -6193,24 +6280,7 @@ static void freeQInfo(SQInfo *pQInfo) {
tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo);
taosArrayDestroy(pQInfo->arrTableIdInfo); taosArrayDestroy(pQInfo->arrTableIdInfo);
if (pQuery->pGroupbyExpr != NULL) {
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
taosTFree(pQuery->pGroupbyExpr);
}
taosTFree(pQuery->tagColList);
taosTFree(pQuery->pFilterInfo);
if (pQuery->colList != NULL) {
for (int32_t i = 0; i < pQuery->numOfCols; i++) {
SColumnInfo* column = pQuery->colList + i;
freeColumnFilterInfo(column->filters, column->numOfFilters);
}
taosTFree(pQuery->colList);
}
taosTFree(pQuery->sdata);
taosTFree(pQuery);
pQInfo->signature = 0; pQInfo->signature = 0;
qDebug("QInfo:%p QInfo is freed", pQInfo); qDebug("QInfo:%p QInfo is freed", pQInfo);
...@@ -6786,12 +6856,16 @@ void freeqinfoFn(void *qhandle) { ...@@ -6786,12 +6856,16 @@ void freeqinfoFn(void *qhandle) {
} }
void* qOpenQueryMgmt(int32_t vgId) { void* qOpenQueryMgmt(int32_t vgId) {
const int32_t REFRESH_HANDLE_INTERVAL = 60; // every 30 seconds, refresh handle pool const int32_t REFRESH_HANDLE_INTERVAL = 30; // every 30 seconds, refresh handle pool
char cacheName[128] = {0}; char cacheName[128] = {0};
sprintf(cacheName, "qhandle_%d", vgId); sprintf(cacheName, "qhandle_%d", vgId);
SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt)); SQueryMgmt* pQueryMgmt = calloc(1, sizeof(SQueryMgmt));
if (pQueryMgmt == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName); pQueryMgmt->qinfoPool = taosCacheInit(TSDB_DATA_TYPE_BIGINT, REFRESH_HANDLE_INTERVAL, true, freeqinfoFn, cacheName);
pQueryMgmt->closed = false; pQueryMgmt->closed = false;
......
...@@ -1840,6 +1840,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1840,6 +1840,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return true; return true;
} else { } else {
STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pSecQueryHandle == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return false;
}
pSecQueryHandle->order = TSDB_ORDER_ASC; pSecQueryHandle->order = TSDB_ORDER_ASC;
pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
pSecQueryHandle->pTsdb = pQueryHandle->pTsdb; pSecQueryHandle->pTsdb = pQueryHandle->pTsdb;
...@@ -1851,6 +1856,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1851,6 +1856,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pSecQueryHandle); free(pSecQueryHandle);
return false; return false;
} }
...@@ -1862,6 +1868,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1862,6 +1868,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis));
pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
if (pSecQueryHandle->statis == NULL || pSecQueryHandle->pColumns == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {{0}, 0}; SColumnInfoData colInfo = {{0}, 0};
...@@ -1869,6 +1880,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { ...@@ -1869,6 +1880,12 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
colInfo.info = pCol->info; colInfo.info = pCol->info;
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes);
if (colInfo.pData == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
taosArrayPush(pSecQueryHandle->pColumns, &colInfo); taosArrayPush(pSecQueryHandle->pColumns, &colInfo);
} }
...@@ -2280,6 +2297,10 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2280,6 +2297,10 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo)); SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo));
if (sa == NULL) {
taosArrayDestroy(pTableGroup);
return NULL;
}
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i); STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
...@@ -2294,14 +2315,13 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC ...@@ -2294,14 +2315,13 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
taosArrayPush(pTableGroup, &sa); taosArrayPush(pTableGroup, &sa);
tsdbDebug("all %" PRIzu " tables belong to one group", size); tsdbDebug("all %" PRIzu " tables belong to one group", size);
} else { } else {
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter)); STableGroupSupporter sup = {0};
pSupp->numOfCols = numOfOrderCols; sup.numOfCols = numOfOrderCols;
pSupp->pTagSchema = pTagSchema; sup.pTagSchema = pTagSchema;
pSupp->pCols = pCols; sup.pCols = pCols;
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), pSupp, tableGroupComparFn); taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, skey, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList, size, skey, &sup, tableGroupComparFn);
taosTFree(pSupp);
} }
return pTableGroup; return pTableGroup;
......
...@@ -295,6 +295,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -295,6 +295,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId);
if (pVnode->qMgmt == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
pVnode->events = NULL; pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册