未验证 提交 f9876dd7 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1521 from taosdata/feature/query

[TD-98] fix memory leaks and invalid write
......@@ -232,7 +232,7 @@ void tscDoQuery(SSqlObj* pSql);
* @param pPrevSql
* @return
*/
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
......
......@@ -1070,14 +1070,14 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
* interrupted position.
*/
if (TSDB_CODE_ACTION_IN_PROGRESS == code) {
tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos);
tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql,
pos, pSql->asyncTblPos);
return code;
}
// todo add to return
tscError("async insert parse error, code:%d, %s", code, tstrerror(code));
tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code));
pSql->asyncTblPos = NULL;
goto _error_clean; // TODO: should _clean or _error_clean to async flow ????
}
......
......@@ -279,7 +279,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) {
assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns
taos_free_result(pPrevSub);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL);
if (pNew == NULL) {
tscDestroyJoinSupporter(pSupporter);
success = false;
......@@ -834,7 +834,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu
}
}
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL);
if (pNew == NULL) {
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
......@@ -1386,7 +1386,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) {
const int32_t table_index = 0;
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj);
SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0);
pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
......@@ -1545,7 +1545,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL);
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
......@@ -1563,7 +1563,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
pSub->cmd.command = TSDB_SQL_INSERT;
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -192,7 +192,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx
return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta);
#endif
return NULL;
}
STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) {
......@@ -662,7 +662,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN);
/*
* The metermeta may be released since the metermeta cache are completed clean by other thread
* The table meta may be released since the table meta cache are completed clean by other thread
* due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value.
*/
......@@ -1755,29 +1755,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) {
}
int32_t command = pSql->cmd.command;
if (command == TSDB_SQL_CONNECT) {
if (command == TSDB_SQL_CONNECT || command == TSDB_SQL_INSERT) {
return true;
}
if (command == TSDB_SQL_INSERT) {
SSqlCmd* pCmd = &pSql->cmd;
/*
* in case of multi-vnode insertion, the object should not be released until all
* data blocks have been submit to vnode.
*/
SDataBlockList* pDataBlocks = pCmd->pDataBlocks;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2);
if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) {
tscTrace("%p object should be release since all data blocks have been submit", pSql);
return true;
} else {
return false;
}
} else {
return tscKeepConn[command] == 0 ||
(pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS);
......@@ -1997,7 +1976,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
pRes->numOfRows = 0;
}
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) {
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex);
......@@ -2020,7 +1999,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd));
pNew->cmd.command = TSDB_SQL_SELECT;
pNew->cmd.command = cmd;
pNew->cmd.payload = NULL;
pNew->cmd.allocSize = 0;
......@@ -2105,25 +2084,31 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
pNew->param = param;
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pQueryInfo, key, uid);
if (cmd == TSDB_SQL_SELECT) {
tscGetMetricMetaCacheKey(pQueryInfo, key, uid);
}
#ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key);
tscTrace("the metricmeta key is:%s", key);
#endif
char* name = pTableMetaInfo->name;
char* name = pTableMetaInfo->name;
STableMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name);
SSuperTableMeta* pMetricMeta = NULL;
if (cmd == TSDB_SQL_SELECT) {
pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
}
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pTableMeta, pMetricMeta, pTableMetaInfo->numOfTags,
pTableMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object.
STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta);
// SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
// pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags,
......@@ -2135,14 +2120,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
// assert(pFinalInfo->pMetricMeta != NULL);
}
tscTrace(
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0);
if (cmd == TSDB_SQL_SELECT) {
tscTrace(
"%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d,"
"fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64,
pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs,
pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime,
pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit);
tscPrintSelectClause(pNew, 0);
} else {
tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex);
}
return pNew;
}
......@@ -2226,12 +2215,12 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
* in case of multi-vnode super table projection query and the result does not reach the limitation.
*/
bool hasMoreVnodesToTry(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
// SSqlCmd* pCmd = &pSql->cmd;
// SSqlRes* pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) {
return false;
// }
......
......@@ -392,9 +392,9 @@ typedef struct SSqlFuncExprMsg {
} SSqlFuncExprMsg;
typedef struct SSqlBinaryExprInfo {
struct tSQLBinaryExpr *pBinExpr; /* for binary expression */
int32_t numOfCols; /* binary expression involves the readed number of columns*/
SColIndexEx * pReqColumns; /* source column list */
struct tExprNode *pBinExpr; /* for binary expression */
int32_t numOfCols; /* binary expression involves the readed number of columns*/
SColIndexEx * pReqColumns; /* source column list */
} SSqlBinaryExprInfo;
typedef struct SSqlFunctionExpr {
......
......@@ -1148,7 +1148,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb
return NULL;
}
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1);
memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN);
pCreate->contLen = htonl(contLen);
pCreate->vgId = htonl(pTable->vgId);
pCreate->tableType = pTable->info.type;
......@@ -1275,7 +1275,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj
return NULL;
}
mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid);
mTrace("table:%s, create table in vgroup, id:%d, uid:%" PRIu64 , pTable->info.tableId, pTable->sid, pTable->uid);
return pTable;
}
......@@ -1540,7 +1540,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg) {
return;
}
memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData));
strcpy(pCreateMsg->tableId, pInfo->tableId);
strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId));
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
......
......@@ -450,10 +450,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) {
if (ptNode != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
pTrace("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key);
pTrace("key:%s not in cache, retrieved failed", key);
}
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
......@@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
}
int32_t ref = T_REF_INC(ptNode);
pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref)
pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2);
......@@ -515,17 +515,15 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
}
*data = NULL;
int16_t ref = T_REF_DEC(pNode);
pTrace("%p is released, refcnt:%d", pNode, ref);
if (_remove) {
__cache_wr_lock(pCacheObj);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
T_REF_DEC(pNode);
taosCacheMoveToTrash(pCacheObj, pNode);
__cache_unlock(pCacheObj);
} else {
T_REF_DEC(pNode);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册