diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index f954667287c02c951c71e7fd3d11162209fa9890..b99a8a46d0874ee57d338389bde864fc1c9ae514 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -89,27 +89,21 @@ typedef struct SVgroupTableInfo { int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, const char* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks); void tscDestroyDataBlock(STableDataBlocks* pDataBlock); void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf); SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, short bytes, uint32_t offset); -SDataBlockList* tscCreateBlockArrayList(); - -void* tscDestroyBlockArrayList(SDataBlockList* pList); +void* tscDestroyBlockArrayList(SArray* pDataBlockList); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); -void tscFreeUnusedDataBlocks(SDataBlockList* pList); -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pDataList); -int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, +void tscFreeUnusedDataBlocks(SArray* pDataBlockList); +int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pDataList); +int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks); -//UNUSED_FUNC STableIdInfo* tscGetMeterSidInfo(SVnodeSidList* pSidList, int32_t idx); - /** - * * for the projection query on metric or point interpolation query on metric, * we iterate all the meters, instead of invoke query on all qualified meters simultaneously. * diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index a0eb1bd63c7767b774e58301625611e58817ec55..29abff7685e2a03a48a6859f7f41a369052dd936 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -183,11 +183,11 @@ typedef struct STableDataBlocks { SParamInfo *params; } STableDataBlocks; -typedef struct SDataBlockList { // todo remove - uint32_t nSize; - uint32_t nAlloc; - STableDataBlocks **pData; -} SDataBlockList; +//typedef struct SDataBlockList { // todo remove +// uint32_t nSize; +// uint32_t nAlloc; +// STableDataBlocks **pData; +//} SDataBlockList; typedef struct SQueryInfo { int16_t command; // the command may be different for each subclause, so keep it seperately. @@ -238,8 +238,7 @@ typedef struct { void * pTableList; // referred table involved in sql int32_t batchSize; // for parameter ('?') binding and batch processing int32_t numOfParams; - - SDataBlockList *pDataBlocks; // submit data blocks after parsing sql + SArray *pDataBlocks; // SArray submit data blocks after parsing sql } SSqlCmd; typedef struct SResRec { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index e65acc2483c641de4a4318e8f792ba78b1d3f177..dd33ae0897f3d8e59d5a8c43ec89048ab874b701 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1038,8 +1038,7 @@ int tsParseInsertSql(SSqlObj *pSql) { if (NULL == pCmd->pTableList) { pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); - - pSql->cmd.pDataBlocks = tscCreateBlockArrayList(); + pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; @@ -1170,7 +1169,7 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _error; } - tscAppendDataBlock(pCmd->pDataBlocks, pDataBlock); + taosArrayPush(pCmd->pDataBlocks, &pDataBlock); strcpy(pDataBlock->filename, fname); } else if (sToken.type == TK_LP) { /* insert into tablename(col1, col2,..., coln) values(v1, v2,... vn); */ @@ -1258,7 +1257,7 @@ int tsParseInsertSql(SSqlObj *pSql) { goto _clean; } - if (pCmd->pDataBlocks->nSize > 0) { // merge according to vgId + if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgId if ((code = tscMergeTableDataBlocks(pSql, pCmd->pDataBlocks)) != TSDB_CODE_SUCCESS) { goto _error; } @@ -1368,8 +1367,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock return code; } - // the pDataBlock is different from the pTableDataBlocks - STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; + STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) { return code; } @@ -1400,15 +1398,15 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { int32_t rowSize = tinfo.rowSize; - pCmd->pDataBlocks = tscCreateBlockArrayList(); + pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES); STableDataBlocks *pTableDataBlock = NULL; - int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), - pTableMetaInfo->name, pTableMeta, &pTableDataBlock); + + int32_t ret = tscCreateDataBlock(TSDB_PAYLOAD_SIZE, rowSize, sizeof(SSubmitBlk), pTableMetaInfo->name, pTableMeta, &pTableDataBlock); if (ret != TSDB_CODE_SUCCESS) { - return -1; + return ret; } - tscAppendDataBlock(pCmd->pDataBlocks, pTableDataBlock); + taosArrayPush(pCmd->pDataBlocks, &pTableDataBlock); code = tscAllocateMemIfNeed(pTableDataBlock, rowSize, &maxRows); if (TSDB_CODE_SUCCESS != code) return -1; @@ -1442,7 +1440,7 @@ static int tscInsertDataFromFile(SSqlObj *pSql, FILE *fp, char *tmpTokenBuf) { return -code; } - pTableDataBlock = pCmd->pDataBlocks->pData[0]; + pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); pTableDataBlock->size = sizeof(SSubmitBlk); pTableDataBlock->rowSize = tinfo.rowSize; @@ -1479,13 +1477,14 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) { int32_t affected_rows = 0; assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && pCmd->pDataBlocks != NULL); - SDataBlockList *pDataBlockList = pCmd->pDataBlocks; + SArray *pDataBlockList = pCmd->pDataBlocks; pCmd->pDataBlocks = NULL; char path[PATH_MAX] = {0}; - for (int32_t i = 0; i < pDataBlockList->nSize; ++i) { - pDataBlock = pDataBlockList->pData[i]; + size_t size = taosArrayGetSize(pDataBlockList); + for (int32_t i = 0; i < size; ++i) { + pDataBlock = taosArrayGetP(pDataBlockList, i ); if (pDataBlock == NULL) { continue; } diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 3c9e0cdd3bcf9991c2d497dd7234f5be3cc6d925..6a0ba02ad40dbe328f689fb0b5af21fcec52020a 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -331,8 +331,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { binded = pCmd->batchSize / 2; } - for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { - STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; + size_t size = taosArrayGetSize(pCmd->pDataBlocks); + for (int32_t i = 0; i < size; ++i) { + STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); uint32_t dataSize = totalDataSize / alloced; assert(dataSize * alloced == totalDataSize); @@ -370,8 +371,9 @@ static int insertStmtBindParam(STscStmt* stmt, TAOS_BIND* bind) { return TSDB_CODE_SUCCESS; } - for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { - STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; + size_t total = taosArrayGetSize(pCmd->pDataBlocks); + for (int32_t i = 0; i < total; ++i) { + STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); pBlock->size += totalDataSize / alloced; @@ -395,8 +397,10 @@ static int insertStmtReset(STscStmt* pStmt) { SSqlCmd* pCmd = &pStmt->pSql->cmd; if (pCmd->batchSize > 2) { int32_t alloced = (pCmd->batchSize + 1) / 2; - for (int32_t i = 0; i < pCmd->pDataBlocks->nSize; ++i) { - STableDataBlocks* pBlock = pCmd->pDataBlocks->pData[i]; + + size_t size = taosArrayGetSize(pCmd->pDataBlocks); + for (int32_t i = 0; i < size; ++i) { + STableDataBlocks* pBlock = taosArrayGetP(pCmd->pDataBlocks, i); uint32_t totalDataSize = pBlock->size - sizeof(SSubmitBlk); pBlock->size = sizeof(SSubmitBlk) + totalDataSize / alloced; @@ -423,15 +427,15 @@ static int insertStmtExecute(STscStmt* stmt) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); assert(pCmd->numOfClause == 1); - - if (pCmd->pDataBlocks->nSize > 0) { + + if (taosArrayGetSize(pCmd->pDataBlocks) > 0) { // merge according to vgid int code = tscMergeTableDataBlocks(stmt->pSql, pCmd->pDataBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } - STableDataBlocks *pDataBlock = pCmd->pDataBlocks->pData[0]; + STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0); code = tscCopyDataBlockToPayload(stmt->pSql, pDataBlock); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index f241e317cdc83b432a1a1d964dff80ea84b663bb..8b0244d5e62c5b7436d10747ee0f2c29648a30b1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4463,7 +4463,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); + SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId); pUpdateMsg->tid = htonl(pTableMeta->sid); pUpdateMsg->uid = htobe64(pTableMeta->uid); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b2e0c0107ea89a800550839055bb0b338fbf8160..58865d3eacf24b6b94e2ec3bf76385ab37a93ead 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -176,18 +176,16 @@ int tscSendMsgToServer(SSqlObj *pSql) { char *pMsg = rpcMallocCont(pCmd->payloadLen); if (NULL == pMsg) { - tscError("%p msg:%s malloc fail", pSql, taosMsg[pSql->cmd.msgType]); + tscError("%p msg:%s malloc failed", pSql, taosMsg[pSql->cmd.msgType]); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - if (pSql->cmd.command < TSDB_SQL_MGMT) { - memcpy(pMsg, pSql->cmd.payload + tsRpcHeadSize, pSql->cmd.payloadLen); - } else { + // set the mgmt ip list + if (pSql->cmd.command >= TSDB_SQL_MGMT) { pSql->ipList = tscMgmtIpSet; - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); } - // tscTrace("%p msg:%s is sent to server", pSql, taosMsg[pSql->cmd.msgType]); + memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); SRpcMsg rpcMsg = { .msgType = pSql->cmd.msgType, @@ -222,8 +220,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcIpSet *pIpSet) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { - tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, - pObj, pObj->signature); + tscTrace("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", + pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature); tscFreeSqlObj(pSql); rpcFreeCont(rpcMsg->pCont); @@ -449,18 +447,11 @@ void tscKillSTableQuery(SSqlObj *pSql) { } int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - char *pMsg, *pStart; - - pStart = pSql->cmd.payload + tsRpcHeadSize; - pMsg = pStart; - - SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *)pMsg; + SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg *) pSql->cmd.payload; pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle); - pMsg += sizeof(pSql->res.qhandle); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); pRetrieveMsg->free = htons(pQueryInfo->type); - pMsg += sizeof(pQueryInfo->type); // todo valid the vgroupId at the client side STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -474,12 +465,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgroupInfo.vgId); } - - pMsg += sizeof(SRetrieveTableMsg); - - pRetrieveMsg->header.contLen = htonl(pSql->cmd.payloadLen); - + + pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); pSql->cmd.msgType = TSDB_MSG_TYPE_FETCH; + + pRetrieveMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); + return TSDB_CODE_SUCCESS; } @@ -487,19 +478,18 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; - char* pMsg = pSql->cmd.payload + tsRpcHeadSize; + char* pMsg = pSql->cmd.payload; // NOTE: shell message size should not include SMsgDesc int32_t size = pSql->cmd.payloadLen - sizeof(SMsgDesc); - + int32_t vgId = pTableMeta->vgroupInfo.vgId; + SMsgDesc* pMsgDesc = (SMsgDesc*) pMsg; - - pMsgDesc->numOfVnodes = htonl(1); //todo set the right number of vnodes + pMsgDesc->numOfVnodes = htonl(1); // always one vnode + pMsg += sizeof(SMsgDesc); - SSubmitMsg *pShellMsg = (SSubmitMsg *)pMsg; - int32_t vgId = pTableMeta->vgroupInfo.vgId; - + pShellMsg->header.vgId = htonl(vgId); pShellMsg->header.contLen = htonl(size); pShellMsg->length = pShellMsg->header.contLen; @@ -510,7 +500,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; tscSetDnodeIpList(pSql, &pTableMeta->vgroupInfo); - tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), pSql->ipList.numOfIps); + tscTrace("%p build submit msg, vgId:%d numOfVgroup:%d numberOfIP:%d", pSql, vgId, htonl(pMsgDesc->numOfVnodes), + pSql->ipList.numOfIps); return TSDB_CODE_SUCCESS; } @@ -620,9 +611,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { return -1; } - char *pStart = pCmd->payload + tsRpcHeadSize; - - SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pStart; + SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; int32_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); @@ -821,7 +810,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } // compressed ts block - pQueryMsg->tsOffset = htonl(pMsg - pStart); + pQueryMsg->tsOffset = htonl(pMsg - pCmd->payload); int32_t tsLen = 0; int32_t numOfBlocks = 0; @@ -844,7 +833,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsOrder = htonl(pQueryInfo->tsBuf->tsOrder); } - int32_t msgLen = pMsg - pStart; + int32_t msgLen = pMsg - pCmd->payload; tscTrace("%p msg built success,len:%d bytes", pSql, msgLen); pCmd->payloadLen = msgLen; @@ -1286,10 +1275,12 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) { SSqlCmd* pCmd = &pSql->cmd; pCmd->msgType = TSDB_MSG_TYPE_UPDATE_TAG_VAL; - SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) (pCmd->payload + tsRpcHeadSize); + SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload; pCmd->payloadLen = htonl(pUpdateMsg->head.contLen); + SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + tscSetDnodeIpList(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo); return TSDB_CODE_SUCCESS; @@ -1552,150 +1543,6 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) { //} int tscBuildSTableVgroupMsg(SSqlObj *pSql, SSqlInfo *pInfo) { - -#if 0 - SSuperTableMetaMsg *pMetaMsg; - char * pMsg, *pStart; - int msgLen = 0; - int tableIndex = 0; - - SSqlCmd * pCmd = &pSql->cmd; - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - - STagCond *pTagCond = &pQueryInfo->tagCond; - - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); - - int32_t size = tscEstimateMetricMetaMsgSize(pCmd); - if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) { - tscError("%p failed to malloc for metric meter msg", pSql); - return -1; - } - - pStart = pCmd->payload + tsRpcHeadSize; - pMsg = pStart; - - SMgmtHead *pMgmt = (SMgmtHead *)pMsg; - tscGetDBInfoFromTableFullName(pTableMetaInfo->name, pMgmt->db); - - pMsg += sizeof(SMgmtHead); - - pMetaMsg = (SSuperTableMetaMsg *)pMsg; - pMetaMsg->numOfTables = htonl(pQueryInfo->numOfTables); - - pMsg += sizeof(SSuperTableMetaMsg); - - int32_t offset = pMsg - (char *)pMetaMsg; - pMetaMsg->join = htonl(offset); - - // todo refactor - pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2); - - memcpy(pMsg, pTagCond->joinInfo.left.tableId, TSDB_TABLE_ID_LEN); - pMsg += TSDB_TABLE_ID_LEN; - - *(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol; - pMsg += sizeof(int16_t); - - memcpy(pMsg, pTagCond->joinInfo.right.tableId, TSDB_TABLE_ID_LEN); - pMsg += TSDB_TABLE_ID_LEN; - - *(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol; - pMsg += sizeof(int16_t); - - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, i); - uint64_t uid = pTableMetaInfo->pTableMeta->uid; - - offset = pMsg - (char *)pMetaMsg; - pMetaMsg->metaElem[i] = htonl(offset); - - SSuperTableMetaElemMsg *pElem = (SSuperTableMetaElemMsg *)pMsg; - pMsg += sizeof(SSuperTableMetaElemMsg); - - // convert to unicode before sending to mnode for metric query - int32_t condLen = 0; - if (pTagCond->numOfTagCond > 0) { - SCond *pCond = tsGetSTableQueryCond(pTagCond, uid); - if (pCond != NULL && pCond->cond != NULL) { - condLen = strlen(pCond->cond) + 1; - - bool ret = taosMbsToUcs4(pCond->cond, condLen, pMsg, condLen * TSDB_NCHAR_SIZE); - if (!ret) { - tscError("%p mbs to ucs4 failed:%s", pSql, tsGetSTableQueryCond(pTagCond, uid)); - return 0; - } - } - } - - pElem->condLen = htonl(condLen); - - offset = pMsg - (char *)pMetaMsg; - pElem->cond = htonl(offset); - pMsg += condLen * TSDB_NCHAR_SIZE; - - pElem->rel = htons(pTagCond->relType); - if (pTagCond->tbnameCond.uid == uid) { - offset = pMsg - (char *)pMetaMsg; - - pElem->tableCond = htonl(offset); - - uint32_t len = 0; - if (pTagCond->tbnameCond.cond != NULL) { - len = strlen(pTagCond->tbnameCond.cond); - memcpy(pMsg, pTagCond->tbnameCond.cond, len); - } - - pElem->tableCondLen = htonl(len); - pMsg += len; - } - - SSqlGroupbyExpr *pGroupby = &pQueryInfo->groupbyExpr; - - if (pGroupby->tableIndex != i && pGroupby->numOfGroupCols > 0) { - pElem->orderType = 0; - pElem->orderIndex = 0; - pElem->numOfGroupCols = 0; - } else { - pElem->numOfGroupCols = htons(pGroupby->numOfGroupCols); - for (int32_t j = 0; j < pTableMetaInfo->numOfTags; ++j) { - pElem->tagCols[j] = htons(pTableMetaInfo->tagColumnIndex[j]); - } - - if (pGroupby->numOfGroupCols != 0) { - pElem->orderIndex = htons(pGroupby->orderIndex); - pElem->orderType = htons(pGroupby->orderType); - offset = pMsg - (char *)pMetaMsg; - - pElem->groupbyTagColumnList = htonl(offset); - for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) { - SColIndex *pCol = &pQueryInfo->groupbyExpr.columnInfo[j]; - SColIndex *pDestCol = (SColIndex *)pMsg; - - pDestCol->colIdxInBuf = 0; - pDestCol->colIndex = htons(pCol->colIndex); - pDestCol->colId = htons(pDestCol->colId); - pDestCol->flag = htons(pDestCol->flag); - strncpy(pDestCol->name, pCol->name, tListLen(pCol->name)); - - pMsg += sizeof(SColIndex); - } - } - } - - strcpy(pElem->tableId, pTableMetaInfo->name); - pElem->numOfTags = htons(pTableMetaInfo->numOfTags); - - int16_t len = pMsg - (char *)pElem; - pElem->elemLen = htons(len); // redundant data for integrate check - } - - msgLen = pMsg - pStart; - pCmd->payloadLen = msgLen; - pCmd->msgType = TSDB_MSG_TYPE_CM_STABLE_VGROUP; - assert(msgLen + minMsgSize() <= size); -#endif - SSqlCmd *pCmd = &pSql->cmd; char* pMsg = pCmd->payload; @@ -1795,7 +1642,11 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) { for (int i = 0; i < numOfTotalCols; ++i) { pSchema->bytes = htons(pSchema->bytes); pSchema->colId = htons(pSchema->colId); - + + if (pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + assert(i == 0); + } + assert(pSchema->type >= TSDB_DATA_TYPE_BOOL && pSchema->type <= TSDB_DATA_TYPE_NCHAR); pSchema++; } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 51b49304a8e72adb980ccbcf9368f68051f02745..8668c31cf4f27cd6d1feb085cf1e91e00a7f99b5 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -134,7 +134,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con } // tsRpcHeaderSize will be updated during RPC initialization, so only after it initialization, this value is valid - tsInsertHeadSize = tsRpcHeadSize + sizeof(SMsgDesc) + sizeof(SSubmitMsg); + tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); return pSql; } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 87c75bd7e684ae942a4d7b49dd33efedf0c0d388..7a64e7f496819fd7d1138ea624e315e32ca76389 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1864,7 +1864,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->res.code = pSql->res.code; } - // it is not the initial sqlObj, free it taos_free_result(tres); tfree(pSupporter); @@ -1889,12 +1888,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - SDataBlockList *pDataBlocks = pCmd->pDataBlocks; - pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); - pSql->numOfSubs = pDataBlocks->nSize; - assert(pDataBlocks->nSize > 0); - - tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); + size_t size = taosArrayGetSize(pCmd->pDataBlocks); + assert(size > 0); + + pSql->pSubs = calloc(size, POINTER_BYTES); + pSql->numOfSubs = size; + + tscTrace("%p submit data to %zu vnode(s)", pSql, size); + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); pState->numOfTotal = pSql->numOfSubs; pState->numOfRemain = pSql->numOfSubs; @@ -1920,12 +1921,14 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) { pNew->fetchFp = pNew->fp; pSql->pSubs[numOfSub] = pNew; - pRes->code = tscCopyDataBlockToPayload(pNew, pDataBlocks->pData[numOfSub++]); + STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, numOfSub); + pRes->code = tscCopyDataBlockToPayload(pNew, pTableDataBlock); if (pRes->code == TSDB_CODE_SUCCESS) { tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, numOfSub); + numOfSub++; } else { - tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%s", pSql, numOfSub, - pDataBlocks->nSize, tstrerror(pRes->code)); + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%zu, code:%s", pSql, numOfSub, + size, tstrerror(pRes->code)); goto _error; } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b56470e8da56d19becac72674b6ed852fa08dde5..324c042554c8b86daa77ab087ae91699af0d2b11 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#include "tscUtil.h" -#include "hash.h" #include "os.h" #include "qast.h" #include "taosmsg.h" @@ -29,6 +27,8 @@ #include "ttimer.h" #include "ttokendef.h" #include "tscLog.h" +#include "tscUtil.h" +#include "hash.h" static void freeQueryInfoImpl(SQueryInfo* pQueryInfo); static void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache); @@ -428,48 +428,18 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint return param; } -SDataBlockList* tscCreateBlockArrayList() { - const int32_t DEFAULT_INITIAL_NUM_OF_BLOCK = 16; - - SDataBlockList* pDataBlockArrayList = calloc(1, sizeof(SDataBlockList)); - if (pDataBlockArrayList == NULL) { +void* tscDestroyBlockArrayList(SArray* pDataBlockList) { + if (pDataBlockList == NULL) { return NULL; } - - pDataBlockArrayList->nAlloc = DEFAULT_INITIAL_NUM_OF_BLOCK; - pDataBlockArrayList->pData = calloc(1, POINTER_BYTES * pDataBlockArrayList->nAlloc); - if (pDataBlockArrayList->pData == NULL) { - free(pDataBlockArrayList); - return NULL; - } - - return pDataBlockArrayList; -} - -void tscAppendDataBlock(SDataBlockList* pList, STableDataBlocks* pBlocks) { - if (pList->nSize >= pList->nAlloc) { - pList->nAlloc = (pList->nAlloc) << 1U; - pList->pData = realloc(pList->pData, POINTER_BYTES * (size_t)pList->nAlloc); - // reset allocated memory - memset(pList->pData + pList->nSize, 0, POINTER_BYTES * (pList->nAlloc - pList->nSize)); + size_t size = taosArrayGetSize(pDataBlockList); + for (int32_t i = 0; i < size; i++) { + void* d = taosArrayGetP(pDataBlockList, i); + tscDestroyDataBlock(d); } - pList->pData[pList->nSize++] = pBlocks; -} - -void* tscDestroyBlockArrayList(SDataBlockList* pList) { - if (pList == NULL) { - return NULL; - } - - for (int32_t i = 0; i < pList->nSize; i++) { - tscDestroyDataBlock(pList->pData[i]); - } - - tfree(pList->pData); - tfree(pList); - + taosArrayDestroy(pDataBlockList); return NULL; } @@ -484,7 +454,7 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { // set the correct table meta object, the table meta has been locked in pDataBlocks, so it must be in the cache if (pTableMetaInfo->pTableMeta != pDataBlock->pTableMeta) { - strcpy(pTableMetaInfo->name, pDataBlock->tableId); + tstrncpy(pTableMetaInfo->name, pDataBlock->tableId, sizeof(pTableMetaInfo->name)); taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), false); pTableMetaInfo->pTableMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pTableMeta); @@ -497,31 +467,32 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { * the dataBlock only includes the RPC Header buffer and actual submit message body, space for digest needs * additional space. */ - int ret = tscAllocPayload(pCmd, pDataBlock->nAllocSize + 100); + int ret = tscAllocPayload(pCmd, pDataBlock->size + 100); if (TSDB_CODE_SUCCESS != ret) { return ret; } - memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->nAllocSize); + assert(pDataBlock->size <= pDataBlock->nAllocSize); + memcpy(pCmd->payload, pDataBlock->pData, pDataBlock->size); /* * the payloadLen should be actual message body size * the old value of payloadLen is the allocated payload size */ - pCmd->payloadLen = pDataBlock->nAllocSize - tsRpcHeadSize; + pCmd->payloadLen = pDataBlock->size; - assert(pCmd->allocSize >= pCmd->payloadLen + tsRpcHeadSize + 100 && pCmd->payloadLen > 0); + assert(pCmd->allocSize >= pCmd->payloadLen + 100 && pCmd->payloadLen > 0); return TSDB_CODE_SUCCESS; } -void tscFreeUnusedDataBlocks(SDataBlockList* pList) { - /* release additional memory consumption */ - for (int32_t i = 0; i < pList->nSize; ++i) { - STableDataBlocks* pDataBlock = pList->pData[i]; - pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size); - pDataBlock->nAllocSize = (uint32_t)pDataBlock->size; - } -} +//void tscFreeUnusedDataBlocks(SDataBlockList* pList) { +// /* release additional memory consumption */ +// for (int32_t i = 0; i < pList->nSize; ++i) { +// STableDataBlocks* pDataBlock = pList->pData[i]; +// pDataBlock->pData = realloc(pDataBlock->pData, pDataBlock->size); +// pDataBlock->nAllocSize = (uint32_t)pDataBlock->size; +// } +//} /** * create the in-memory buffer for each table to keep the submitted data block @@ -568,7 +539,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff return TSDB_CODE_SUCCESS; } -int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, int64_t id, int32_t size, +int32_t tscGetDataBlockFromList(void* pHashList, SArray* pDataBlockList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const char* tableId, STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { *dataBlocks = NULL; @@ -585,7 +556,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList, } taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES); - tscAppendDataBlock(pDataBlockList, *dataBlocks); + taosArrayPush(pDataBlockList, dataBlocks); } return TSDB_CODE_SUCCESS; @@ -634,14 +605,15 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock) { return len; } -int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) { +int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) { SSqlCmd* pCmd = &pSql->cmd; void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false); - SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList(); + SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); - for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) { - STableDataBlocks* pOneTableBlock = pTableDataBlockList->pData[i]; + size_t total = taosArrayGetSize(pTableDataBlockList); + for (int32_t i = 0; i < total; ++i) { + STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, i); STableDataBlocks* dataBuf = NULL; @@ -679,10 +651,10 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData; tscSortRemoveDataBlockDupRows(pOneTableBlock); - char* e = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); + char* ekey = (char*)pBlocks->data + pOneTableBlock->rowSize*(pBlocks->numOfRows-1); tscTrace("%p tableId:%s, sid:%d rows:%d sversion:%d skey:%" PRId64 ", ekey:%" PRId64, pSql, pOneTableBlock->tableId, - pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(e)); + pBlocks->tid, pBlocks->numOfRows, pBlocks->sversion, GET_INT64_VAL(pBlocks->data), GET_INT64_VAL(ekey)); int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + sizeof(int32_t) * 2); @@ -704,7 +676,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi // free the table data blocks; pCmd->pDataBlocks = pVnodeDataBlockList; - tscFreeUnusedDataBlocks(pCmd->pDataBlocks); +// tscFreeUnusedDataBlocks(pCmd->pDataBlocks); taosHashCleanup(pVnodeDataBlockHashList); return TSDB_CODE_SUCCESS; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index d8e21c159b08130dc8a4d3e35c0a438a72997391..8cc5e8259075f525eb760a49f6377c5622d2d58e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -244,7 +244,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_DEFAULT_PKT_SIZE 65480 //same as RPC_MAX_UDP_SIZE -#define TSDB_PAYLOAD_SIZE (TSDB_DEFAULT_PKT_SIZE - 100) +#define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_DEFAULT_PAYLOAD_SIZE 2048 // default payload size #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index ee18396b61e1e56985a829827f961c95931e7e42..5b4c54f306e77a7b4fe1ff48182555c9e2694189 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -105,8 +105,9 @@ typedef struct { void tsdbClearTableCfg(STableCfg *config); -void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes); -char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id); +void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes); +char* tsdbGetTableName(void *pTable); +STableId tsdbGetTableId(void *pTable); STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg); @@ -176,18 +177,16 @@ typedef struct SQueryRowCond { TSKEY ts; } SQueryRowCond; -typedef void *TsdbPosT; - /** * Get the data block iterator, starting from position according to the query condition * * @param tsdb tsdb handle * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param groupInfo tableId list in the form of set, seperated into different groups according to group by condition + * @param tableqinfoGroupInfo tableId list in the form of set, seperated into different groups according to group by condition * @param qinfo query info handle from query processor * @return */ -TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo); +TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo); /** * Get the last row of the given query time window for all the tables in STableGroupInfo object. @@ -197,12 +196,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab * @param tsdb tsdb handle * @param pCond query condition, including time window, result set order, and basic required columns for each * block - * @param groupInfo tableId list. + * @param tableqinfoGroupInfo tableId list. * @return */ -TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo); +TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo); -SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle); +/** + * get the queried table object list + * @param pHandle + * @return + */ +SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void *qinfo); @@ -247,37 +251,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta */ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); -/** - * todo remove this function later - * @param pQueryHandle - * @param pIdList - * @return - */ -SArray *tsdbRetrieveDataRow(TsdbQueryHandleT *pQueryHandle, SArray *pIdList, SQueryRowCond *pCond); - -/** - * Get iterator for super tables, of which tags values satisfy the tag filter info - * - * NOTE: the tagFilterStr is an bin-expression for tag filter, such as ((tag_col = 5) and (tag_col2 > 7)) - * The filter string is sent from client directly. - * The build of the tags filter expression from string is done in the iterator generating function. - * - * @param pCond query condition - * @param pTagFilterStr tag filter info - * @return - */ -TsdbQueryHandleT *tsdbQueryFromTagConds(STsdbQueryCond *pCond, int16_t stableId, const char *pTagFilterStr); - -/** - * Get the qualified tables for (super) table query. - * Used to handle the super table projection queries, the last_row query, the group by on normal columns query, - * the interpolation query, and timestamp-comp query for join processing. - * - * @param pQueryHandle - * @return table sid list. the invoker is responsible for the release of this the sid list. - */ -SArray *tsdbGetTableList(TsdbQueryHandleT *pQueryHandle); - /** * Get the qualified table id for a super table according to the tag query expression. * @param stableid. super table sid @@ -287,6 +260,12 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pT int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); +/** + * destory the created table group list, which is generated by tag query + * @param pGroupList + */ +void tsdbDestoryTableGroup(STableGroupInfo *pGroupList); + /** * create the table group result including only one table, used to handle the normal table query * diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c96477249621beb649aeca07b1ebf48ab56d5bf0..ee61f4f702fd1cf9860588cc8284abb26a857251 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -102,7 +102,7 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct int64_t tag; STimeWindow win; STSCursor cur; - STableId id; // for retrieve the page id list + void* pTable; // for retrieve the page id list SWindowResInfo windowResInfo; } STableQueryInfo; @@ -126,10 +126,10 @@ typedef struct SQueryCostInfo { uint64_t computTime; } SQueryCostInfo; -typedef struct SGroupItem { - STableId id; - STableQueryInfo* info; -} SGroupItem; +//typedef struct SGroupItem { +// void *pTable; +// STableQueryInfo *info; +//} SGroupItem; typedef struct SQuery { int16_t numOfCols; @@ -187,8 +187,8 @@ typedef struct SQInfo { void* tsdb; int32_t vgId; - STableGroupInfo tableIdGroupInfo; // table id list < only includes the STableId list> - STableGroupInfo groupInfo; // + STableGroupInfo tableGroupInfo; // table id list < only includes the STable list> + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; int32_t groupIndex; int32_t offset; // offset in group result set of subgroup, todo refactor diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 9f7a9dace4ce7279704a26fd8e37e56c597c8724..5a865667d0d3ff65f572da0ef432feca493225d1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -113,7 +113,7 @@ static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); static void buildTagQueryResult(SQInfo *pQInfo); -static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo); +static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo *pTableQueryInfo); static int32_t flushFromResultBuf(SQInfo *pQInfo); bool doFilterData(SQuery *pQuery, int32_t elemPos) { @@ -1495,6 +1495,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } static bool isQueryKilled(SQInfo *pQInfo) { + return false; return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -1781,7 +1782,7 @@ static int32_t getInitialPageNum(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { num = 128; } else if (isIntervalQuery(pQuery)) { // time window query, allocate one page for each table - size_t s = pQInfo->groupInfo.numOfTables; + size_t s = pQInfo->tableqinfoGroupInfo.numOfTables; num = MAX(s, INITIAL_RESULT_ROWS_VALUE); } else { // for super table query, one page for each subset num = 1; // pQInfo->pSidSet->numOfSubSet; @@ -1815,18 +1816,18 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi */ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { - // assert(taosHashGetSize(pQInfo->groupInfo) >= 1); + // assert(taosHashGetSize(pQInfo->tableqinfoGroupInfo) >= 1); } #if 0 - if (pQInfo == NULL || pQInfo->groupInfo.numOfTables == 1) { + if (pQInfo == NULL || pQInfo->tableqinfoGroupInfo.numOfTables == 1) { atomic_fetch_sub_32(&pQInfo->pObj->numOfQueries, 1); qTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pQInfo->pObj->vnode, pQInfo->pObj->sid, pQInfo->pObj->meterId, pQInfo->pObj->numOfQueries); } else { int32_t num = 0; - for (int32_t i = 0; i < pQInfo->groupInfo.numOfTables; ++i) { - SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); + for (int32_t i = 0; i < pQInfo->tableqinfoGroupInfo.numOfTables; ++i) { + SMeterObj *pMeter = getMeterObj(pQInfo->tableqinfoGroupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); atomic_fetch_sub_32(&(pMeter->numOfQueries), 1); if (pMeter->numOfQueries > 0) { @@ -1840,9 +1841,9 @@ UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { * in order to reduce log output, for all meters of which numOfQueries count are 0, * we do not output corresponding information */ - num = pQInfo->groupInfo.numOfTables - num; + num = pQInfo->tableqinfoGroupInfo.numOfTables - num; qTrace("QInfo:%p metric query is over, dec query ref for %d meters, numOfQueries on %d meters are 0", pQInfo, - pQInfo->groupInfo.numOfTables, num); + pQInfo->tableqinfoGroupInfo.numOfTables, num); } #endif } @@ -2188,16 +2189,16 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * set tag value in SQLFunctionCtx * e.g.,tag information into input buffer */ -static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { +static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { tVariantDestroy(tag); if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { - char* val = tsdbGetTableName(tsdb, pTableId); + char* val = tsdbGetTableName(pTable); assert(val != NULL); tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY); } else { - char* val = tsdbGetTableTagVal(tsdb, pTableId, tagColId, type, bytes); + char* val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); if (val == NULL) { tag->nType = TSDB_DATA_TYPE_NULL; return; @@ -2221,7 +2222,7 @@ static void doSetTagValueInParam(void *tsdb, STableId* pTableId, int32_t tagColI } } -void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { +void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) { SQuery *pQuery = pRuntimeEnv->pQuery; SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); @@ -2238,7 +2239,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { } } - doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); + doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); } else { // set tag value, by which the results are aggregated. for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) { @@ -2250,7 +2251,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { } // todo use tag column index to optimize performance - doSetTagValueInParam(tsdb, pTableId, pLocalExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag, + doSetTagValueInParam(tsdb, pTable, pLocalExprInfo->base.colInfo.colId, &pRuntimeEnv->pCtx[idx].tag, pLocalExprInfo->type, pLocalExprInfo->bytes); } @@ -2269,7 +2270,7 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, STableId* pTableId, void *tsdb) { } } - doSetTagValueInParam(tsdb, pTableId, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); + doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes); qTrace("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64, pRuntimeEnv->pCtx[0].tag.i64Key) } @@ -2474,10 +2475,10 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int64_t st = taosGetTimestampMs(); int32_t ret = TSDB_CODE_SUCCESS; - int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); while (pQInfo->groupIndex < numOfGroups) { - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; @@ -2510,9 +2511,9 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // check if all results has been sent to client - int32_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) { - pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; // set query completed + pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed return; } } @@ -2599,12 +2600,11 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) { // todo opt for the case of one table per group int32_t numOfTables = 0; for (int32_t i = 0; i < size; ++i) { - SGroupItem *item = taosArrayGet(pGroup, i); - STableQueryInfo *pInfo = item->info; + STableQueryInfo *item = taosArrayGetP(pGroup, i); - SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, pInfo->id.tid); - if (list.size > 0 && pInfo->windowResInfo.size > 0) { - pTableList[numOfTables] = pInfo; + SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tsdbGetTableId(item->pTable).tid); + if (list.size > 0 && item->windowResInfo.size > 0) { + pTableList[numOfTables] = item; numOfTables += 1; } } @@ -2845,15 +2845,15 @@ void disableFuncInReverseScan(SQInfo *pQInfo) { } } - int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); for(int32_t i = 0; i < numOfGroups; ++i) { - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); size_t t = taosArrayGetSize(group); for (int32_t j = 0; j < t; ++j) { - SGroupItem *item = taosArrayGet(group, j); - updateTableQueryInfoForReverseScan(pQuery, item->info); + STableQueryInfo *pCheckInfo = taosArrayGetP(group, j); + updateTableQueryInfoForReverseScan(pQuery, pCheckInfo); } } } @@ -3088,7 +3088,8 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + // add ref for table + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -3160,7 +3161,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); @@ -3236,17 +3237,13 @@ static bool hasMainOutput(SQuery *pQuery) { return false; } -static STableQueryInfo *createTableQueryInfo( - SQueryRuntimeEnv *pRuntimeEnv, - STableId tableId, - STimeWindow win -) { +static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) { STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; - pTableQueryInfo->id = tableId; + pTableQueryInfo->pTable = pTable; pTableQueryInfo->cur.vgroupIndex = -1; initWindowResInfo(&pTableQueryInfo->windowResInfo, pRuntimeEnv, 100, 100, TSDB_DATA_TYPE_INT); @@ -3275,7 +3272,7 @@ void setCurrentQueryTable(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTable * @param pRuntimeEnv * @param pDataBlockInfo */ -void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIndex, TSKEY nextKey) { +void setExecutionContext(SQInfo *pQInfo, void* pTable, int32_t groupIndex, TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; @@ -3302,7 +3299,7 @@ void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIndex, initCtxOutputBuf(pRuntimeEnv); pTableQueryInfo->lastKey = nextKey; - setAdditionalInfo(pQInfo, pTableId, pTableQueryInfo); + setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); } void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult) { @@ -3362,11 +3359,10 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult * } } -int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) { +int32_t setAdditionalInfo(SQInfo *pQInfo, void* pTable, STableQueryInfo *pTableQueryInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - //assert(pTableQueryInfo->lastKey >= TSKEY_INITIAL_VAL); - setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb); + setTagVal(pRuntimeEnv, pTable, pQInfo->tsdb); // both the master and supplement scan needs to set the correct ts comp start position if (pRuntimeEnv->pTSBuf != NULL) { @@ -3469,7 +3465,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) { totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo); } else { - totalSubset = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + totalSubset = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); } return totalSubset; @@ -3669,7 +3665,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data // all data returned, set query over if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQInfo->runtimeEnv.stableQuery) { - if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { + if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_OVER); } } else { @@ -3979,23 +3975,23 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) { }; if (!isSTableQuery - && (pQInfo->groupInfo.numOfTables == 1) + && (pQInfo->tableqinfoGroupInfo.numOfTables == 1) && (cond.order == TSDB_ORDER_ASC) && (!isIntervalQuery(pQuery)) && (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) && (!isFixedOutputQuery(pQuery)) ) { - SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - SGroupItem* pItem = taosArrayGet(pa, 0); - cond.twindow = pItem->info->win; + SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); + STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0); + cond.twindow = pCheckInfo->win; } if (isFirstLastRowQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + pRuntimeEnv->pQueryHandle = tsdbQueryLastRow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } else if (isPointInterpoQuery(pQuery)) { - pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + pRuntimeEnv->pQueryHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } else { - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); } } @@ -4132,18 +4128,18 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { STableQueryInfo *pTableQueryInfo = NULL; // todo opt performance using hash table - size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); for (int32_t i = 0; i < numOfGroup; ++i) { - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { - SGroupItem *item = taosArrayGet(group, j); - STableQueryInfo *pInfo = item->info; + STableQueryInfo *p = taosArrayGetP(group, j); - if (pInfo->id.tid == blockInfo.tid) { - assert(pInfo->id.uid == blockInfo.uid); - pTableQueryInfo = item->info; + STableId id = tsdbGetTableId(p->pTable); + if (id.tid == blockInfo.tid) { + assert(id.uid == blockInfo.uid); + pTableQueryInfo = p; break; } @@ -4163,11 +4159,11 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { if (!isIntervalQuery(pQuery)) { int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; - setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); + setExecutionContext(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); } else { // interval query TSKEY nextKey = blockInfo.window.skey; setIntervalQueryRange(pQInfo, nextKey); - /*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); + /*int32_t ret = */setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo); } } @@ -4187,16 +4183,17 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - SGroupItem* item = taosArrayGet(group, index); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); + STableQueryInfo* pCheckInfo = taosArrayGetP(group, index); - setTagVal(pRuntimeEnv, &item->id, pQInfo->tsdb); + setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb); + STableId id = tsdbGetTableId(pCheckInfo->pTable); qTrace("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index, - item->id.uid, item->id.tid, item->info->lastKey, item->info->win.ekey); + id.uid, id.tid, pCheckInfo->lastKey, pCheckInfo->win.ekey); STsdbQueryCond cond = { - .twindow = {item->info->lastKey, item->info->win.ekey}, + .twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, @@ -4204,9 +4201,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { // todo refactor SArray *g1 = taosArrayInit(1, POINTER_BYTES); - SArray *tx = taosArrayInit(1, sizeof(STableId)); + SArray *tx = taosArrayInit(1, POINTER_BYTES); - taosArrayPush(tx, &item->info->id); + taosArrayPush(tx, &pCheckInfo->pTable); taosArrayPush(g1, &tx); STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; @@ -4250,14 +4247,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SQuery * pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_COMPLETED); - size_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + size_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) { resetCtxOutputBuf(pRuntimeEnv); assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + SArray* group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); qTrace("QInfo:%p last_row query on group:%d, total group:%zu, current group:%p", pQInfo, pQInfo->groupIndex, numOfGroups, group); @@ -4289,10 +4286,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { initCtxOutputBuf(pRuntimeEnv); - SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle); + SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); - setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb); + setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); if (isFirstLastRowQuery(pQuery)) { assert(taosArrayGetSize(s) == 1); } @@ -4300,7 +4297,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { taosArrayDestroy(s); // here we simply set the first table as current table - pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; + pQuery->current = (STableQueryInfo*) taosArrayGet(group, 0); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); int64_t numOfRes = getNumOfResult(pRuntimeEnv); @@ -4322,7 +4319,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + SArray* group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex); qTrace("QInfo:%p group by normal columns group:%d, total group:%zu", pQInfo, pQInfo->groupIndex, numOfGroups); @@ -4347,10 +4344,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo); - SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle); + SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); - setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb); + setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); // here we simply set the first table as current table scanMultiTableDataBlocks(pQInfo); @@ -4406,25 +4403,23 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } // all data have returned already - if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { + if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { return; } resetCtxOutputBuf(pRuntimeEnv); resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && - 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); + assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && + 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); - while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { + while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) { if (isQueryKilled(pQInfo)) { return; } - SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex); - pQuery->current = item->info; - + pQuery->current = taosArrayGetP(group, pQInfo->tableIndex); if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; @@ -4444,7 +4439,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // the limitation of output result is reached, set the query completed if (limitResults(pRuntimeEnv)) { - pQInfo->tableIndex = pQInfo->groupInfo.numOfTables; + pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; break; } @@ -4461,8 +4456,10 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->tableIndex++; STableIdInfo tidInfo; - tidInfo.uid = item->id.uid; - tidInfo.tid = item->id.tid; + STableId id = tsdbGetTableId(pQuery->current->pTable); + + tidInfo.uid = id.uid; + tidInfo.tid = id.tid; tidInfo.key = pQuery->current->lastKey; taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); @@ -4485,7 +4482,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - if (pQInfo->tableIndex >= pQInfo->groupInfo.numOfTables) { + if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_COMPLETED); } } @@ -4511,7 +4508,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { qTrace( "QInfo %p numOfTables:%"PRIu64", index:%d, numOfGroups:%zu, %"PRId64" points returned, total:%"PRId64", offset:%" PRId64, - pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, + pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, pQuery->limit.offset); } @@ -4539,7 +4536,7 @@ static void doSaveContext(SQInfo *pQInfo) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableIdGroupInfo, pQInfo); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -4564,14 +4561,14 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (isIntervalQuery(pQuery)) { - size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); for (int32_t i = 0; i < numOfGroup; ++i) { - SArray *group = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { - SGroupItem* item = taosArrayGet(group, j); - closeAllTimeWindow(&item->info->windowResInfo); + STableQueryInfo* item = taosArrayGetP(group, j); + closeAllTimeWindow(&item->windowResInfo); } } } else { // close results for group result @@ -4732,8 +4729,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->current->lastKey, pQuery->window.ekey); } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { STableIdInfo tidInfo; - tidInfo.uid = pQuery->current->id.uid; - tidInfo.tid = pQuery->current->id.tid; + STableId id = tsdbGetTableId(pQuery->current); + + tidInfo.uid = id.uid; + tidInfo.tid = id.tid; tidInfo.key = pQuery->current->lastKey; taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); } @@ -4883,23 +4882,23 @@ static void tableQueryImpl(SQInfo *pQInfo) { pQuery->rec.rows = 0; int64_t st = taosGetTimestampUs(); - assert(pQInfo->groupInfo.numOfTables == 1); - SArray* g = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); - SGroupItem* item = taosArrayGet(g, 0); + assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); + SArray* g = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); + STableQueryInfo* item = taosArrayGetP(g, 0); // group by normal column, sliding window query, interval query are handled by interval query processor if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) - tableIntervalProcess(pQInfo, item->info); + tableIntervalProcess(pQInfo, item); } else if (isFixedOutputQuery(pQuery)) { - tableFixedOutputProcess(pQInfo, item->info); + tableFixedOutputProcess(pQInfo, item); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBuffer == 1); - tableMultiOutputProcess(pQInfo, item->info); + tableMultiOutputProcess(pQInfo, item); } // record the total elapsed time pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st); - assert(pQInfo->groupInfo.numOfTables == 1); + assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1); /* check if query is killed or not */ if (isQueryKilled(pQInfo)) { @@ -4931,7 +4930,7 @@ static void stableQueryImpl(SQInfo *pQInfo) { pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st); if (pQuery->rec.rows == 0) { - qTrace("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total); + qTrace("QInfo:%p over, %zu tables queried, %"PRId64" rows are returned", pQInfo, pQInfo->tableqinfoGroupInfo.numOfTables, pQuery->rec.total); } } @@ -5487,7 +5486,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } -static int compareTableIdInfo( const void* a, const void* b ) { +static int compareTableIdInfo(const void* a, const void* b) { const STableIdInfo* x = (const STableIdInfo*)a; const STableIdInfo* y = (const STableIdInfo*)b; if (x->uid > y->uid) return 1; @@ -5496,7 +5495,7 @@ static int compareTableIdInfo( const void* a, const void* b ) { } static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - STableGroupInfo *groupInfo, SColumnInfo* pTagCols) { + STableGroupInfo *tableqinfoGroupInfo, SColumnInfo* pTagCols) { SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo)); if (pQInfo == NULL) { return NULL; @@ -5582,39 +5581,41 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - pQInfo->tableIdGroupInfo = *groupInfo; - size_t numOfGroups = taosArrayGetSize(groupInfo->pGroupList); + pQInfo->tableGroupInfo = *tableqinfoGroupInfo; + size_t numOfGroups = taosArrayGetSize(tableqinfoGroupInfo->pGroupList); - pQInfo->groupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); - pQInfo->groupInfo.numOfTables = groupInfo->numOfTables; + pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); + pQInfo->tableqinfoGroupInfo.numOfTables = tableqinfoGroupInfo->numOfTables; int tableIndex = 0; STimeWindow window = pQueryMsg->window; taosArraySort(pTableIdList, compareTableIdInfo); for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* pa = taosArrayGetP(groupInfo->pGroupList, i); + SArray* pa = taosArrayGetP(tableqinfoGroupInfo->pGroupList, i); size_t s = taosArrayGetSize(pa); - SArray* p1 = taosArrayInit(s, sizeof(SGroupItem)); + SArray* p1 = taosArrayInit(s, POINTER_BYTES); for(int32_t j = 0; j < s; ++j) { - STableId id = *(STableId*) taosArrayGet(pa, j); - SGroupItem item = { .id = id }; + void* pTable = taosArrayGetP(pa, j); + // NOTE: compare STableIdInfo with STableId - STableIdInfo* pTableId = taosArraySearch( pTableIdList, &id, compareTableIdInfo); + STableId id = tsdbGetTableId(pTable); + STableIdInfo* pTableId = taosArraySearch(pTableIdList, &id, compareTableIdInfo); if (pTableId != NULL ) { window.skey = pTableId->key; } else { window.skey = pQueryMsg->window.skey; } - item.info = createTableQueryInfo(&pQInfo->runtimeEnv, item.id, window); - item.info->groupIndex = i; - item.info->tableIndex = tableIndex++; + + STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window); + item->groupIndex = i; + item->tableIndex = tableIndex++; taosArrayPush(p1, &item); } - taosArrayPush(pQInfo->groupInfo.pGroupList, &p1); + taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1); } pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); @@ -5693,7 +5694,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ return TSDB_CODE_SUCCESS; } - if (pQInfo->groupInfo.numOfTables == 0) { + if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) { qTrace("QInfo:%p no table qualified for tag filter, abort query", pQInfo); setQueryStatus(pQuery, QUERY_COMPLETED); @@ -5754,29 +5755,24 @@ static void freeQInfo(SQInfo *pQInfo) { } // todo refactor, extract method to destroytableDataInfo - int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); for (int32_t i = 0; i < numOfGroups; ++i) { - SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + SArray *p = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i); size_t num = taosArrayGetSize(p); for(int32_t j = 0; j < num; ++j) { - SGroupItem* item = taosArrayGet(p, j); - if (item->info != NULL) { - destroyTableQueryInfo(item->info, pQuery->numOfOutput); + STableQueryInfo* item = taosArrayGetP(p, j); + if (item != NULL) { + destroyTableQueryInfo(item, pQuery->numOfOutput); } } taosArrayDestroy(p); } - taosArrayDestroy(pQInfo->groupInfo.pGroupList); + taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList); - for(int32_t i = 0; i < numOfGroups; ++i) { - SArray* p = taosArrayGetP(pQInfo->tableIdGroupInfo.pGroupList, i); - taosArrayDestroy(p); - } - - taosArrayDestroy(pQInfo->tableIdGroupInfo.pGroupList); + tsdbDestoryTableGroup(&pQInfo->tableGroupInfo); taosArrayDestroy(pQInfo->arrTableIdInfo); if (pQuery->pGroupbyExpr != NULL) { @@ -5861,8 +5857,6 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { } return TSDB_CODE_SUCCESS; - - // todo if interpolation exists, the result may be dump to client by several rounds } int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) { @@ -5904,13 +5898,13 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } bool isSTableQuery = false; - STableGroupInfo groupInfo = {0}; + STableGroupInfo tableqinfoGroupInfo = {0}; if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) { STableIdInfo *id = taosArrayGet(pTableIdList, 0); qTrace("qmsg:%p query normal table, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid); - if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &tableqinfoGroupInfo)) != TSDB_CODE_SUCCESS) { goto _over; } } else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) { @@ -5927,25 +5921,25 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi numOfGroupByCols = 0; } - code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex, + code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableqinfoGroupInfo, pGroupColIndex, numOfGroupByCols); if (code != TSDB_CODE_SUCCESS) { goto _over; } } else { - groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); - groupInfo.numOfTables = taosArrayGetSize(pTableIdList); + tableqinfoGroupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES); + tableqinfoGroupInfo.numOfTables = taosArrayGetSize(pTableIdList); SArray* p = taosArrayClone(pTableIdList); - taosArrayPush(groupInfo.pGroupList, &p); + taosArrayPush(tableqinfoGroupInfo.pGroupList, &p); - qTrace("qmsg:%p query on %zu tables in one group from client", pQueryMsg, groupInfo.numOfTables); + qTrace("qmsg:%p query on %zu tables in one group from client", pQueryMsg, tableqinfoGroupInfo.numOfTables); } } else { assert(0); } - (*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo); + (*pQInfo) = createQInfoImpl(pQueryMsg, pTableIdList, pGroupbyExpr, pExprs, &tableqinfoGroupInfo, pTagColumnInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _over; @@ -6134,17 +6128,17 @@ static void buildTagQueryResult(SQInfo* pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - size_t numOfGroup = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList); assert(numOfGroup == 0 || numOfGroup == 1); if (numOfGroup == 0) { return; } - SArray* pa = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0); + SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0); size_t num = taosArrayGetSize(pa); - assert(num == pQInfo->groupInfo.numOfTables); + assert(num == pQInfo->tableqinfoGroupInfo.numOfTables); int32_t count = 0; int32_t functionId = pQuery->pSelectExpr[0].base.functionId; @@ -6168,26 +6162,28 @@ static void buildTagQueryResult(SQInfo* pQInfo) { while(pQInfo->tableIndex < num && count < pQuery->rec.capacity) { int32_t i = pQInfo->tableIndex++; - SGroupItem *item = taosArrayGet(pa, i); + STableQueryInfo *item = taosArrayGet(pa, i); char *output = pQuery->sdata[0]->data + i * rsize; varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); - *(int64_t *)output = item->id.uid; // memory align problem, todo serialize - output += sizeof(item->id.uid); + STableId id = tsdbGetTableId(item->pTable); + + *(int64_t *)output = id.uid; // memory align problem, todo serialize + output += sizeof(id.uid); - *(int32_t *)output = item->id.tid; - output += sizeof(item->id.tid); + *(int32_t *)output = id.tid; + output += sizeof(id.tid); *(int32_t *)output = pQInfo->vgId; output += sizeof(pQInfo->vgId); if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - char *data = tsdbGetTableName(pQInfo->tsdb, &item->id); + char *data = tsdbGetTableName(item->pTable); memcpy(output, data, varDataTLen(data)); } else { - char *val = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo->base.colInfo.colId, type, bytes); + char *val = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); // todo refactor if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { @@ -6223,18 +6219,18 @@ static void buildTagQueryResult(SQInfo* pQInfo) { int32_t i = pQInfo->tableIndex++; SExprInfo* pExprInfo = pQuery->pSelectExpr; - SGroupItem* item = taosArrayGet(pa, i); + STableQueryInfo* item = taosArrayGetP(pa, i); for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - char* data = tsdbGetTableName(pQInfo->tsdb, &item->id); + char* data = tsdbGetTableName(item->pTable); char* dst = pQuery->sdata[j]->data + count * tbnameSchema.bytes; memcpy(dst, data, varDataTLen(data)); } else {// todo refactor int16_t type = pExprInfo[j].type; int16_t bytes = pExprInfo[j].bytes; - char* data = tsdbGetTableTagVal(pQInfo->tsdb, &item->id, pExprInfo[j].base.colInfo.colId, type, bytes); + char* data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); char* dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { diff --git a/src/query/src/qresultBuf.c b/src/query/src/qresultBuf.c index 19353c9c5a6ea88fa536e9bb4e38c89c3fefd4f7..7841c0bbbe742a555e726d54dbe87e7cb137118a 100644 --- a/src/query/src/qresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -72,7 +72,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf if (ret != 0) { // dError("QInfo:%p failed to create intermediate result output file:%s. %s", pQInfo, pSupporter->extBufFile, // strerror(errno)); - return -TSDB_CODE_QRY_NO_DISKSPACE; + return TSDB_CODE_QRY_NO_DISKSPACE; } pResultBuf->totalBufSize = pResultBuf->numOfPages * DEFAULT_INTERN_BUF_PAGE_SIZE; @@ -80,7 +80,7 @@ static int32_t extendDiskFileSize(SDiskbasedResultBuf* pResultBuf, int32_t numOf if (pResultBuf->pBuf == MAP_FAILED) { // dError("QInfo:%p failed to map temp file: %s. %s", pQInfo, pSupporter->extBufFile, strerror(errno)); - return -TSDB_CODE_QRY_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index beb8f33052d66c46727208e876ca75293acdc565..0e9d793739abca90842554319537f59fe07b2dcd 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -157,18 +157,16 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { return 0; } -void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes) { +void *tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes) { // TODO: this function should be changed also - STsdbMeta *pMeta = tsdbGetMeta(repo); - STable * pTable = tsdbGetTableByUid(pMeta, id->uid); - STSchema *pSchema = tsdbGetTableTagSchema(pTable); + STSchema *pSchema = tsdbGetTableTagSchema((STable*) pTable); STColumn *pCol = tdGetColOfID(pSchema, colId); if (pCol == NULL) { return NULL; // No matched tag volumn } - char *val = tdGetKVRowValOfCol(pTable->tagVal, colId); + char *val = tdGetKVRowValOfCol(((STable*)pTable)->tagVal, colId); assert(type == pCol->type && bytes == pCol->bytes); if (val != NULL && IS_VAR_DATA_TYPE(type)) { @@ -178,20 +176,21 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i return val; } -char *tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id) { +char *tsdbGetTableName(void* pTable) { // TODO: need to change as thread-safe - STsdbRepo *pRepo = (STsdbRepo *)repo; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - STable *pTable = tsdbGetTableByUid(pMeta, id->uid); if (pTable == NULL) { return NULL; } else { - return (char *)pTable->name; + return (char*) (((STable *)pTable)->name); } } +STableId tsdbGetTableId(void *pTable) { + assert(pTable); + return ((STable*)pTable)->tableId; +} + STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { if (pMsg == NULL) return NULL; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index d1981943e6359334cd53c0d17176cc6785e3a617..8812dac45d055155e9699b7a82d96f01fb589a51 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -184,15 +184,17 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab assert(gsize > 0); for (int32_t j = 0; j < gsize; ++j) { - STableId* id = (STableId*) taosArrayGet(group, j); + STable* pTable = (STable*) taosArrayGetP(group, j); STableCheckInfo info = { .lastKey = pQueryHandle->window.skey, - .tableId = *id, - .pTableObj = tsdbGetTableByUid(pMeta, id->uid), + .tableId = pTable->tableId, + .pTableObj = pTable, }; - assert(info.pTableObj != NULL && info.pTableObj->tableId.tid == id->tid); + assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || + info.pTableObj->type == TSDB_CHILD_TABLE)); + taosArrayPush(pQueryHandle->pTableCheckInfo, &info); } } @@ -215,17 +217,17 @@ TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab return pQueryHandle; } -SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle) { +SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) { assert(pHandle != NULL); STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) pHandle; size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - SArray* res = taosArrayInit(size, sizeof(STableId)); + SArray* res = taosArrayInit(size, POINTER_BYTES); for(int32_t i = 0; i < size; ++i) { STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); - taosArrayPush(res, &pCheckInfo->tableId); + taosArrayPush(res, &pCheckInfo->pTableObj); } return res; @@ -1607,6 +1609,7 @@ void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle) { } if (index == -1) { + // todo add failure test cases return; } @@ -1856,42 +1859,19 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { SArray* tsdbRetrieveDataRow(TsdbQueryHandleT* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } -TsdbQueryHandleT* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) { - return NULL; -} - -SArray* tsdbGetTableList(TsdbQueryHandleT* pQueryHandle) { return NULL; } - -static int32_t getAllTableIdList(STable* pSuperTable, SArray* list) { +static int32_t getAllTableList(STable* pSuperTable, SArray* list) { SSkipListIterator* iter = tSkipListCreateIter(pSuperTable->pIndex); while (tSkipListIterNext(iter)) { SSkipListNode* pNode = tSkipListIterGet(iter); STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode); - taosArrayPush(list, &(*pTable)->tableId); + taosArrayPush(list, pTable); } tSkipListDestroyIter(iter); return TSDB_CODE_SUCCESS; } -/** - * convert the result pointer to table id instead of table object pointer - * todo remove it by using callback function to change the final result in-time. - * @param pRes - */ -static void convertQueryResult(SArray* pRes, SArray* pTableList) { - if (pTableList == NULL || taosArrayGetSize(pTableList) == 0) { - return; - } - - size_t size = taosArrayGetSize(pTableList); - for (int32_t i = 0; i < size; ++i) { // todo speedup by using reserve space. - STable* pTable = taosArrayGetP(pTableList, i); - taosArrayPush(pRes, &pTable->tableId); - } -} - static void destroyHelper(void* param) { if (param == NULL) { return; @@ -1960,16 +1940,13 @@ typedef struct STableGroupSupporter { int32_t numOfCols; SColIndex* pCols; STSchema* pTagSchema; - void* tsdbMeta; +// void* tsdbMeta; } STableGroupSupporter; int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param; - STableId* id1 = (STableId*) p1; - STableId* id2 = (STableId*) p2; - - STable *pTable1 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id1->uid); - STable *pTable2 = tsdbGetTableByUid(pTableGroupSupp->tsdbMeta, id2->uid); + STable* pTable1 = *(STable**) p1; + STable* pTable2 = *(STable**) p2; for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) { SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; @@ -2019,26 +1996,29 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { return 0; } -void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTables, STableGroupSupporter* pSupp, +void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp, __ext_compar_fn_t compareFn) { - STableId* pId = taosArrayGet(pTableIdList, 0); - - SArray* g = taosArrayInit(16, sizeof(STableId)); - taosArrayPush(g, pId); + STable* pTable = taosArrayGetP(pTableList, 0); + SArray* g = taosArrayInit(16, POINTER_BYTES); + taosArrayPush(g, &pTable); + tsdbRefTable(pTable); + for (int32_t i = 1; i < numOfTables; ++i) { - STableId* prev = taosArrayGet(pTableIdList, i - 1); - STableId* p = taosArrayGet(pTableIdList, i); + STable** prev = taosArrayGet(pTableList, i - 1); + STable** p = taosArrayGet(pTableList, i); int32_t ret = compareFn(prev, p, pSupp); assert(ret == 0 || ret == -1); + tsdbRefTable(*p); + assert((*p)->type == TSDB_CHILD_TABLE); + if (ret == 0) { taosArrayPush(g, p); } else { taosArrayPush(pGroups, &g); // current group is ended, start a new group - g = taosArrayInit(16, sizeof(STableId)); - + g = taosArrayInit(16, POINTER_BYTES); taosArrayPush(g, p); } } @@ -2046,8 +2026,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableIdList, size_t numOfTab taosArrayPush(pGroups, &g); } -SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, - TSDB_REPO_T* tsdb) { +SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) { assert(pTableList != NULL); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); @@ -2058,22 +2037,24 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC } if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table - SArray* sa = taosArrayInit(size, sizeof(STableId)); + SArray* sa = taosArrayInit(size, POINTER_BYTES); for(int32_t i = 0; i < size; ++i) { - STableId* tableId = taosArrayGet(pTableList, i); - taosArrayPush(sa, tableId); + STable** pTable = taosArrayGet(pTableList, i); + assert((*pTable)->type == TSDB_CHILD_TABLE); + + tsdbRefTable(*pTable); + taosArrayPush(sa, pTable); } taosArrayPush(pTableGroup, &sa); tsdbTrace("all %zu tables belong to one group", size); } else { STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter)); - pSupp->tsdbMeta = tsdbGetMeta(tsdb); pSupp->numOfCols = numOfOrderCols; pSupp->pTagSchema = pTagSchema; pSupp->pCols = pCols; - taosqsort(pTableList->pData, size, sizeof(STableId), pSupp, tableGroupComparFn); + taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn); createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn); tfree(pSupp); } @@ -2149,48 +2130,53 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) .pExtInfo = pSTable->tagSchema, }; - SArray* pTableList = taosArrayInit(8, POINTER_BYTES); - - tExprTreeTraverse(pExpr, pSTable->pIndex, pTableList, &supp); + tExprTreeTraverse(pExpr, pSTable->pIndex, pRes, &supp); tExprTreeDestroy(&pExpr, destroyHelper); - - convertQueryResult(pRes, pTableList); - taosArrayDestroy(pTableList); return TSDB_CODE_SUCCESS; } int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len, int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { + if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); if (pTable == NULL) { tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid); - return TSDB_CODE_TDB_INVALID_TABLE_ID; + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + tsdbUnlockRepoMeta(tsdb); + + goto _error; } if (pTable->type != TSDB_SUPER_TABLE) { tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s", tsdb, uid, pTable->tableId.tid, pTable->name->data); - - return TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client + terrno = TSDB_CODE_COM_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client + + tsdbUnlockRepoMeta(tsdb); + goto _error; } - - SArray* res = taosArrayInit(8, sizeof(STableId)); + + //NOTE: not add ref count for super table + SArray* res = taosArrayInit(8, POINTER_BYTES); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { - int32_t ret = getAllTableIdList(pTable, res); - if (ret == TSDB_CODE_SUCCESS) { - pGroupInfo->numOfTables = taosArrayGetSize(res); - pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); - - tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables); - } else { - // todo add error + int32_t ret = getAllTableList(pTable, res); + if (ret != TSDB_CODE_SUCCESS) { + tsdbUnlockRepoMeta(tsdb); + goto _error; } - + + pGroupInfo->numOfTables = taosArrayGetSize(res); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); + + tsdbTrace("%p no table name/tag condition, all tables belong to one group, numOfTables:%zu", tsdb, pGroupInfo->numOfTables); taosArrayDestroy(res); + + if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; return ret; } @@ -2227,31 +2213,45 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT doQueryTableList(pTable, res, expr); pGroupInfo->numOfTables = taosArrayGetSize(res); - pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb); + pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols); tsdbTrace("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%zu, belong to %zu groups", tsdb, pTable->tableId.tid, pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); taosArrayDestroy(res); + + if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; return ret; + + _error: + return terrno; } int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) { + if (tsdbRLockRepoMeta(tsdb) < 0) goto _error; + STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); if (pTable == NULL) { - return TSDB_CODE_TDB_INVALID_TABLE_ID; + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + goto _error; } - - //todo assert table type, add the table ref count + + assert(pTable->type == TSDB_CHILD_TABLE || pTable->type == TSDB_NORMAL_TABLE); + tsdbRefTable(pTable); + if (tsdbUnlockRepoMeta(tsdb) < 0) goto _error; + pGroupInfo->numOfTables = 1; pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); - SArray* group = taosArrayInit(1, sizeof(STableId)); + SArray* group = taosArrayInit(1, POINTER_BYTES); - taosArrayPush(group, &pTable->tableId); + taosArrayPush(group, &pTable); taosArrayPush(pGroupInfo->pGroupList, &group); return TSDB_CODE_SUCCESS; + + _error: + return terrno; } void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { @@ -2263,12 +2263,11 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo); for (int32_t i = 0; i < size; ++i) { STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i); + tSkipListDestroyIter(pTableCheckInfo->iter); tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->mem); tsdbUnRefMemTable(pQueryHandle->pTsdb, pTableCheckInfo->imem); - tSkipListDestroyIter(pTableCheckInfo->iter); - if (pTableCheckInfo->pDataCols != NULL) { tfree(pTableCheckInfo->pDataCols->buf); } @@ -2293,3 +2292,26 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) { tfree(pQueryHandle); } + +void tsdbDestoryTableGroup(STableGroupInfo *pGroupList) { + assert(pGroupList != NULL); + + size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); + + for(int32_t i = 0; i < numOfGroup; ++i) { + SArray* p = taosArrayGetP(pGroupList->pGroupList, i); + + size_t numOfTables = taosArrayGetSize(p); + for(int32_t j = 0; j < numOfTables; ++j) { + STable* pTable = taosArrayGetP(p, j); + assert(pTable != NULL); + + tsdbUnRefTable(pTable); + } + + taosArrayDestroy(p); + } + + taosArrayDestroy(pGroupList->pGroupList); +} + diff --git a/tests/script/general/parser/commit.sim b/tests/script/general/parser/commit.sim index ecf7113926049de29a27761dc3c317d701a5cebd..31f457cfae87d3670bc436dedcd36964e108d6c6 100644 --- a/tests/script/general/parser/commit.sim +++ b/tests/script/general/parser/commit.sim @@ -68,6 +68,7 @@ while $loop <= $loops while $i < 10 sql select count(*) from $stb where t1 = $i if $data00 != $rowNum then + print expect $rowNum, actual: $data00 return -1 endi $i = $i + 1 diff --git a/tests/script/general/parser/selectResNum.sim b/tests/script/general/parser/selectResNum.sim index ef9116cb3f8b3d25f2921dc3e70724ab3afa2e23..319e034c0c12edc74ec8a016dc5efcb99591ca4f 100644 --- a/tests/script/general/parser/selectResNum.sim +++ b/tests/script/general/parser/selectResNum.sim @@ -172,6 +172,7 @@ while $loop <= $loops endi sql select c8 from $stb where t1 = $i if $rows != $rowNum then + print expect $rowNum, actual: $rows return -1 endi sql select c9 from $stb where t1 = $i