提交 4a7d25c8 编写于 作者: H Haojun Liao

[td-225] refactor codes.

上级 3662e7c3
......@@ -3945,7 +3945,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
tsBufFlush(pTSbuf);
strcpy(pCtx->aOutputBuf, pTSbuf->path);
tsBufDestory(pTSbuf);
tsBufDestroy(pTSbuf);
doFinalizer(pCtx);
}
......
......@@ -175,7 +175,6 @@ static int32_t handlePassword(SSqlCmd* pCmd, SSQLToken* pPwd) {
return TSDB_CODE_SUCCESS;
}
// todo handle memory leak in error handle function
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo == NULL || pSql == NULL || pSql->signature != pSql) {
return TSDB_CODE_TSC_APP_ERROR;
......
......@@ -152,8 +152,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ
tsBufFlush(output1);
tsBufFlush(output2);
tsBufDestory(pSupporter1->pTSBuf);
tsBufDestory(pSupporter2->pTSBuf);
tsBufDestroy(pSupporter1->pTSBuf);
tsBufDestroy(pSupporter2->pTSBuf);
tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " for secondary query after ts blocks "
"intersecting, skey:%" PRId64 ", ekey:%" PRId64, pSql, numOfInput1, numOfInput2, output1->numOfTotal,
......@@ -762,7 +762,7 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
tsBufMerge(pSupporter->pTSBuf, pBuf, pTableMetaInfo->vgroupIndex);
tsBufDestory(pBuf);
tsBufDestroy(pBuf);
}
// continue to retrieve ts-comp data from vnode
......
......@@ -1544,7 +1544,7 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) {
pQueryInfo->groupbyExpr.columnInfo = NULL;
}
pQueryInfo->tsBuf = tsBufDestory(pQueryInfo->tsBuf);
pQueryInfo->tsBuf = tsBufDestroy(pQueryInfo->tsBuf);
tfree(pQueryInfo->fillVal);
}
......
......@@ -120,6 +120,7 @@ static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
static void setQueryStatus(SQuery *pQuery, int8_t status);
static void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv);
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
......@@ -838,6 +839,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
if (sas->data == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -900,6 +902,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1146,6 +1149,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
if (sasArray == NULL) {
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
......@@ -1597,7 +1601,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
}
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
......@@ -2225,8 +2229,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
......@@ -3281,7 +3287,8 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
// check if query is killed or not
if (IS_QUERY_KILLED(pQInfo)) {
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
}
......@@ -3926,7 +3933,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
while (tsdbNextDataBlock(pQueryHandle)) {
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
......@@ -4133,7 +4141,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
pQuery->precision = tsdbGetCfg(tsdb)->precision;
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQInfo, false);
......@@ -4211,10 +4222,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pQuery->fillType, pColInfo);
}
// todo refactor
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS;
}
......@@ -4243,7 +4250,8 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_QUERY_KILLED(pQInfo)) {
break;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
......@@ -4527,7 +4535,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
if (IS_QUERY_KILLED(pQInfo)) {
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
pQuery->current = taosArrayGetP(group, pQInfo->tableIndex);
......@@ -4723,7 +4732,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// close all time window results
......@@ -4745,7 +4755,8 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
if (pQInfo->code != TSDB_CODE_SUCCESS || IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p query killed or error occurred, code:%s, abort", pQInfo, tstrerror(pQInfo->code));
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
......@@ -4784,7 +4795,8 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
finalizeQueryResult(pRuntimeEnv);
if (IS_QUERY_KILLED(pQInfo)) {
return;
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// since the numOfRows must be identical for all sql functions that are allowed to be executed simutaneously.
......@@ -4816,10 +4828,6 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
finalizeQueryResult(pRuntimeEnv);
if (IS_QUERY_KILLED(pQInfo)) {
return;
}
pQuery->rec.rows = getNumOfResult(pRuntimeEnv);
if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols > 0 && pQuery->rec.rows > 0) {
skipResults(pRuntimeEnv);
......@@ -4864,10 +4872,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
while (1) {
scanOneTableDataBlocks(pRuntimeEnv, start);
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
return;
}
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv);
......@@ -5788,7 +5792,7 @@ static bool isValidQInfo(void *param) {
return (sig == (uint64_t)pQInfo);
}
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param, _qinfo_free_fn_t fn) {
static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, bool isSTable, void* param) {
int32_t code = TSDB_CODE_SUCCESS;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -5813,7 +5817,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
}
pQInfo->param = param;
pQInfo->freeFn = fn;
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
......@@ -5995,8 +5998,7 @@ typedef struct SQueryMgmt {
pthread_mutex_t lock;
} SQueryMgmt;
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, _qinfo_free_fn_t fn,
qinfo_t* pQInfo) {
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, void* param, qinfo_t* pQInfo) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -6086,7 +6088,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
goto _over;
}
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param, fn);
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery, param);
_over:
free(tagCond);
......@@ -6105,43 +6107,25 @@ _over:
if (code != TSDB_CODE_SUCCESS) {
*pQInfo = NULL;
} else {
SQInfo* pq = (SQInfo*) (*pQInfo);
// SQInfo* pq = (SQInfo*) (*pQInfo);
T_REF_INC(pq);
T_REF_INC(pq);
// T_REF_INC(pq);
// T_REF_INC(pq);
}
// if failed to add ref for all meters in this query, abort current query
return code;
}
static void doDestoryQueryInfo(SQInfo* pQInfo) {
assert(pQInfo != NULL);
qDebug("QInfo:%p query completed", pQInfo);
queryCostStatis(pQInfo); // print the query cost summary
freeQInfo(pQInfo);
}
void qDestroyQueryInfo(qinfo_t qHandle) {
SQInfo* pQInfo = (SQInfo*) qHandle;
if (!isValidQInfo(pQInfo)) {
return;
}
int32_t ref = T_REF_DEC(pQInfo);
qDebug("QInfo:%p dec refCount, value:%d", pQInfo, ref);
if (ref == 0) {
_qinfo_free_fn_t freeFp = pQInfo->freeFn;
void* param = pQInfo->param;
doDestoryQueryInfo(pQInfo);
if (freeFp != NULL) {
assert(param != NULL);
freeFp(param);
}
}
qDebug("QInfo:%p query completed", pQInfo);
queryCostStatis(pQInfo); // print the query cost summary
freeQInfo(pQInfo);
}
void qTableQuery(qinfo_t qinfo) {
......@@ -6154,29 +6138,22 @@ void qTableQuery(qinfo_t qinfo) {
if (IS_QUERY_KILLED(pQInfo)) {
qDebug("QInfo:%p it is already killed, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
qDebug("QInfo:%p query abort due to error occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
qDebug("QInfo:%p query abort due to error/cancel occurs, code:%s", pQInfo, tstrerror(pQInfo->code));
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
return;
}
......@@ -6203,7 +6180,6 @@ void qTableQuery(qinfo_t qinfo) {
}
sem_post(&pQInfo->dataReady);
qDestroyQueryInfo(pQInfo);
}
int32_t qRetrieveQueryResultInfo(qinfo_t qinfo) {
......@@ -6247,7 +6223,7 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
}
if (ret) {
T_REF_INC(pQInfo);
// T_REF_INC(pQInfo);
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
}
......@@ -6304,7 +6280,6 @@ int32_t qKillQuery(qinfo_t qinfo) {
}
setQueryKilled(pQInfo);
qDestroyQueryInfo(pQInfo);
return TSDB_CODE_SUCCESS;
}
......@@ -6449,6 +6424,7 @@ void freeqinfoFn(void *qhandle) {
}
qKillQuery(*handle);
qDestroyQueryInfo(*handle);
}
void* qOpenQueryMgmt(int32_t vgId) {
......@@ -6467,7 +6443,11 @@ void* qOpenQueryMgmt(int32_t vgId) {
return pQueryHandle;
}
void qSetQueryMgmtClosed(void* pQMgmt) {
static void queryMgmtKillQueryFn(void* handle) {
qKillQuery(handle);
}
void qQueryMgmtNotifyClosed(void* pQMgmt) {
if (pQMgmt == NULL) {
return;
}
......@@ -6479,7 +6459,7 @@ void qSetQueryMgmtClosed(void* pQMgmt) {
pQueryMgmt->closed = true;
pthread_mutex_unlock(&pQueryMgmt->lock);
taosCacheRefresh(pQueryMgmt->qinfoPool, freeqinfoFn);
taosCacheRefresh(pQueryMgmt->qinfoPool, queryMgmtKillQueryFn);
}
void qCleanupQueryMgmt(void* pQMgmt) {
......
......@@ -509,10 +509,11 @@ uint32_t tSQLGetToken(char* z, uint32_t* tokenType) {
for (i = 1; isdigit(z[i]); i++) {
}
/* here is the 1a/2s/3m/9y */
if ((z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' || z[i] == 'y' ||
z[i] == 'w' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') &&
/* here is the 1u/1a/2s/3m/9y */
if ((z[i] == 'u' || z[i] == 'a' || z[i] == 's' || z[i] == 'm' || z[i] == 'h' || z[i] == 'd' || z[i] == 'n' ||
z[i] == 'y' || z[i] == 'w' ||
z[i] == 'U' || z[i] == 'A' || z[i] == 'S' || z[i] == 'M' || z[i] == 'H' || z[i] == 'D' || z[i] == 'N' ||
z[i] == 'Y' || z[i] == 'W') &&
(isIdChar[(uint8_t)z[i + 1]] == 0)) {
*tokenType = TK_VARIABLE;
i += 1;
......
......@@ -79,7 +79,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->numOfAlloc = header.numOfVnode;
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc);
if (tmp == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......@@ -92,7 +92,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
pTSBuf->tsOrder = header.tsOrder;
if (pTSBuf->tsOrder != TSDB_ORDER_ASC && pTSBuf->tsOrder != TSDB_ORDER_DESC) {
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......@@ -100,7 +100,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
if (buf == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......@@ -120,7 +120,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
struct stat fileStat;
if (fstat(fileno(pTSBuf->f), &fileStat) != 0) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......@@ -137,7 +137,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
return pTSBuf;
}
void* tsBufDestory(STSBuf* pTSBuf) {
void* tsBufDestroy(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return NULL;
}
......@@ -920,13 +920,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx));
if (pTSBuf->pData == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->tsData.rawBuf == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......@@ -936,13 +936,13 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->assistBuf == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
if (pTSBuf->block.payload == NULL) {
tsBufDestory(pTSBuf);
tsBufDestroy(pTSBuf);
return NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册