diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 97ebac097a1853e0be407da260dcb2370cd2385e..344152f7d4eba6bcd0613b14a2234fb05f1cbbe0 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -232,7 +232,7 @@ void tscDoQuery(SSqlObj* pSql); * @param pPrevSql * @return */ -SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql); +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index b8c12bbbdc79f9a34ed33d3915175c91f0fe7cb0..6f58d3bef328210f67e4aae4b47acd780fbc6cd3 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1070,14 +1070,14 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { * interrupted position. */ if (TSDB_CODE_ACTION_IN_PROGRESS == code) { - tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos); + tscTrace("%p waiting for get table meta during insert, then resume from offset: %" PRId64 " , %s", pSql, + pos, pSql->asyncTblPos); return code; } // todo add to return - tscError("async insert parse error, code:%d, %s", code, tstrerror(code)); + tscError("%p async insert parse error, code:%d, %s", pSql, code, tstrerror(code)); pSql->asyncTblPos = NULL; - goto _error_clean; // TODO: should _clean or _error_clean to async flow ???? } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index ca6ca369e422b448728d8761d7d5dc17e8377b0c..1c57287b22e0d1333ba41047822e0c67ac76f773 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -279,7 +279,7 @@ int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql) { assert(pSubQueryInfo->exprsInfo.numOfExprs == 1); // ts_comp query only requires one resutl columns taos_free_result(pPrevSub); - SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL); if (pNew == NULL) { tscDestroyJoinSupporter(pSupporter); success = false; @@ -834,7 +834,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu } } - SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, tableIndex, tscJoinQueryCallback, TSDB_SQL_SELECT, pSupporter, NULL); if (pNew == NULL) { return TSDB_CODE_CLI_OUT_OF_MEMORY; } @@ -1386,7 +1386,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsupport, SSqlObj *prevSqlObj) { const int32_t table_index = 0; - SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, prevSqlObj); + SSqlObj *pNew = createSubqueryObj(pSql, table_index, tscRetrieveDataRes, trsupport, TSDB_SQL_SELECT, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; @@ -1545,7 +1545,7 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pSupporter->pSql = pSql; pSupporter->pState = pState; - SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL); + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, TSDB_SQL_INSERT, NULL); if (pNew == NULL) { tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); break; @@ -1563,7 +1563,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { for (int32_t j = 0; j < pSql->numOfSubs; ++j) { SSqlObj *pSub = pSql->pSubs[j]; - pSub->cmd.command = TSDB_SQL_INSERT; int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); if (code != TSDB_CODE_SUCCESS) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 0b60c1d55c8feaa9550af178691dd98eb189a23d..a56d042ece5246cfa0fd057a60fe1f977fe6dd0d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -192,7 +192,7 @@ SVnodeSidList* tscGetVnodeSidList(SSuperTableMeta* pMetricmeta, int32_t vnodeIdx return (SVnodeSidList*)(pMetricmeta->list[vnodeIdx] + (char*)pMetricmeta); #endif - + return NULL; } STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx) { @@ -662,7 +662,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff strncpy(dataBuf->tableId, name, TSDB_TABLE_ID_LEN); /* - * The metermeta may be released since the metermeta cache are completed clean by other thread + * The table meta may be released since the table meta cache are completed clean by other thread * due to operation such as drop database. So here we add the reference count directly instead of invoke * taosGetDataFromCache, which may return NULL value. */ @@ -1755,29 +1755,8 @@ bool tscShouldFreeAsyncSqlObj(SSqlObj* pSql) { } int32_t command = pSql->cmd.command; - if (command == TSDB_SQL_CONNECT) { + if (command == TSDB_SQL_CONNECT || command == TSDB_SQL_INSERT) { return true; - } - - if (command == TSDB_SQL_INSERT) { - SSqlCmd* pCmd = &pSql->cmd; - - /* - * in case of multi-vnode insertion, the object should not be released until all - * data blocks have been submit to vnode. - */ - SDataBlockList* pDataBlocks = pCmd->pDataBlocks; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - assert(pQueryInfo->numOfTables == 1 || pQueryInfo->numOfTables == 2); - - if (pDataBlocks == NULL || pTableMetaInfo->vnodeIndex >= pDataBlocks->nSize) { - tscTrace("%p object should be release since all data blocks have been submit", pSql); - return true; - } else { - return false; - } } else { return tscKeepConn[command] == 0 || (pSql->res.code != TSDB_CODE_ACTION_IN_PROGRESS && pSql->res.code != TSDB_CODE_SUCCESS); @@ -1997,7 +1976,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } -SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, SSqlObj* pPrevSql) { +SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) { SSqlCmd* pCmd = &pSql->cmd; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, tableIndex); @@ -2020,7 +1999,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void memcpy(&pNew->cmd, pCmd, sizeof(SSqlCmd)); - pNew->cmd.command = TSDB_SQL_SELECT; + pNew->cmd.command = cmd; pNew->cmd.payload = NULL; pNew->cmd.allocSize = 0; @@ -2105,25 +2084,31 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pNew->param = param; char key[TSDB_MAX_TAGS_LEN + 1] = {0}; - tscGetMetricMetaCacheKey(pQueryInfo, key, uid); + if (cmd == TSDB_SQL_SELECT) { + tscGetMetricMetaCacheKey(pQueryInfo, key, uid); + } #ifdef _DEBUG_VIEW - printf("the metricmeta key is:%s\n", key); + tscTrace("the metricmeta key is:%s", key); #endif - char* name = pTableMetaInfo->name; + char* name = pTableMetaInfo->name; STableMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { - STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); - SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); + STableMeta* pTableMeta = taosCacheAcquireByName(tscCacheHandle, name); + + SSuperTableMeta* pMetricMeta = NULL; + if (cmd == TSDB_SQL_SELECT) { + pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); + } pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pTableMeta, pMetricMeta, pTableMetaInfo->numOfTags, pTableMetaInfo->tagColumnIndex); } else { // transfer the ownership of pTableMeta/pMetricMeta to the newly create sql object. - STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); +// STableMetaInfo* pPrevInfo = tscGetTableMetaInfoFromCmd(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); - STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); +// STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pTableMeta); // SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); // pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pTableMetaInfo->numOfTags, @@ -2135,14 +2120,18 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void // assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace( - "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," - "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, - pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, - pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, - pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); - - tscPrintSelectClause(pNew, 0); + if (cmd == TSDB_SQL_SELECT) { + tscTrace( + "%p new subquery: %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," + "fieldInfo:%d, name:%s, qrang:%" PRId64 " - %" PRId64 " order:%d, limit:%" PRId64, + pSql, pNew, tableIndex, pTableMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, + pNewQueryInfo->colList.numOfCols, pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name, pNewQueryInfo->stime, + pNewQueryInfo->etime, pNewQueryInfo->order.order, pNewQueryInfo->limit.limit); + + tscPrintSelectClause(pNew, 0); + } else { + tscTrace("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vnodeIndex); + } return pNew; } @@ -2226,12 +2215,12 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } * in case of multi-vnode super table projection query and the result does not reach the limitation. */ bool hasMoreVnodesToTry(SSqlObj* pSql) { - SSqlCmd* pCmd = &pSql->cmd; - SSqlRes* pRes = &pSql->res; +// SSqlCmd* pCmd = &pSql->cmd; +// SSqlRes* pRes = &pSql->res; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); +// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); +// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); // if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { return false; // } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 4632c67c3cd5dc65267a151f63dcad40cea24b1f..d4fd452952a93a6151b6fe6e4888627135f2e180 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -392,9 +392,9 @@ typedef struct SSqlFuncExprMsg { } SSqlFuncExprMsg; typedef struct SSqlBinaryExprInfo { - struct tSQLBinaryExpr *pBinExpr; /* for binary expression */ - int32_t numOfCols; /* binary expression involves the readed number of columns*/ - SColIndexEx * pReqColumns; /* source column list */ + struct tExprNode *pBinExpr; /* for binary expression */ + int32_t numOfCols; /* binary expression involves the readed number of columns*/ + SColIndexEx * pReqColumns; /* source column list */ } SSqlBinaryExprInfo; typedef struct SSqlFunctionExpr { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 0a6c0109f89bf8ecdab39e7da729269e6b89ca43..6dbf81bad7931323578661a383840e4b1f781a41 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -1148,7 +1148,7 @@ static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableOb return NULL; } - memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN + 1); + memcpy(pCreate->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN); pCreate->contLen = htonl(contLen); pCreate->vgId = htonl(pTable->vgId); pCreate->tableType = pTable->info.type; @@ -1275,7 +1275,7 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj return NULL; } - mTrace("table:%s, create ctable in vgroup, uid:%" PRIu64 , pTable->info.tableId, pTable->uid); + mTrace("table:%s, create table in vgroup, id:%d, uid:%" PRIu64 , pTable->info.tableId, pTable->sid, pTable->uid); return pTable; } @@ -1540,7 +1540,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg) { return; } memcpy(pCreateMsg->schema, pInfo->tags, sizeof(STagData)); - strcpy(pCreateMsg->tableId, pInfo->tableId); + strncpy(pCreateMsg->tableId, pInfo->tableId, tListLen(pInfo->tableId)); SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index c8c3879f0a34c5236a7b691bbc08ed05a08502cd..5f04765f010416eb06c0016764af08c4e88496bf 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -450,10 +450,10 @@ void *taosCacheAcquireByName(SCacheObj *pCacheObj, const char *key) { if (ptNode != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode)); + pTrace("key:%s is retrieved from cache, %p refcnt:%d", key, (*ptNode), T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); - pTrace("key:%s not in cache,retrieved failed", key); + pTrace("key:%s not in cache, retrieved failed", key); } atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); @@ -472,7 +472,7 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) { } int32_t ref = T_REF_INC(ptNode); - pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref) + pTrace("%p add data ref in cache, refcnt:%d", ptNode, ref) // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); @@ -515,17 +515,15 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } *data = NULL; + int16_t ref = T_REF_DEC(pNode); + pTrace("%p is released, refcnt:%d", pNode, ref); if (_remove) { __cache_wr_lock(pCacheObj); // pNode may be released immediately by other thread after the reference count of pNode is set to 0, // So we need to lock it in the first place. - T_REF_DEC(pNode); taosCacheMoveToTrash(pCacheObj, pNode); - __cache_unlock(pCacheObj); - } else { - T_REF_DEC(pNode); } }