From e7fb3fdc5356f9f2a2f3bfe0cf5268fd6e05b4bd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 17 Oct 2020 13:02:19 +0800 Subject: [PATCH] [td-225] refactor codes. --- src/tsdb/src/tsdbRead.c | 227 ++++++++++++++++++++-------------------- 1 file changed, 113 insertions(+), 114 deletions(-) diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1c2fe7d872..a95a12052a 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -182,7 +182,72 @@ static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS return pLocalIdList; } -TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { +static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) { + size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList); + assert(sizeOfGroup >= 1 && pMeta != NULL); + + // allocate buffer in order to load data blocks from file + SArray* pTableCheckInfo = taosArrayInit(pGroupList->numOfTables, sizeof(STableCheckInfo)); + if (pTableCheckInfo == NULL) { + return NULL; + } + + // todo apply the lastkey of table check to avoid to load header file + for (int32_t i = 0; i < sizeOfGroup; ++i) { + SArray* group = *(SArray**) taosArrayGet(pGroupList->pGroupList, i); + + size_t gsize = taosArrayGetSize(group); + assert(gsize > 0); + + for (int32_t j = 0; j < gsize; ++j) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); + + STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; + info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; + + assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || + info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); + + info.tableId.tid = info.pTableObj->tableId.tid; + info.tableId.uid = info.pTableObj->tableId.uid; + + if (ASCENDING_TRAVERSE(pQueryHandle->order)) { + assert(info.lastKey >= pQueryHandle->window.skey); + } else { + assert(info.lastKey <= pQueryHandle->window.skey); + } + + taosArrayPush(pTableCheckInfo, &info); + tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, + info.tableId.tid, info.lastKey, pQueryHandle->qinfo); + } + } + + taosArraySort(pTableCheckInfo, tsdbCheckInfoCompar); + return pTableCheckInfo; +} + +static SArray* createCheckInfoFromCheckInfo(SArray* pTableCheckInfo, TSKEY skey) { + size_t si = taosArrayGetSize(pTableCheckInfo); + SArray* pNew = taosArrayInit(si, sizeof(STableCheckInfo)); + if (pNew == NULL) { + return NULL; + } + + for (int32_t j = 0; j < si; ++j) { + STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pTableCheckInfo, j); + STableCheckInfo info = { .lastKey = skey, .pTableObj = pCheckInfo->pTableObj}; + + info.tableId = pCheckInfo->tableId; + taosArrayPush(pNew, &info); + } + + // it is ordered already, no need to sort again. + taosArraySort(pNew, tsdbCheckInfoCompar); + return pNew; +} + +static STsdbQueryHandle* tsdbQueryTablesImpl(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, void* qinfo) { STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); if (pQueryHandle == NULL) { goto out_of_memory; @@ -206,9 +271,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab } tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem); - - size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); - assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); + assert(pCond != NULL && pCond->numOfCols > 0); if (ASCENDING_TRAVERSE(pCond->order)) { assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey); @@ -217,21 +280,19 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab } // allocate buffer in order to load data blocks from file - int32_t numOfCols = pCond->numOfCols; - - pQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); + pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis)); if (pQueryHandle->statis == NULL) { goto out_of_memory; } - pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? + pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array? if (pQueryHandle->pColumns == NULL) { goto out_of_memory; } - for (int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {{0}, 0}; - + colInfo.info = pCond->colList[i]; colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes); if (colInfo.pData == NULL) { @@ -241,47 +302,10 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab pQueryHandle->statis[i].colId = colInfo.info.colId; } - pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); - if (pQueryHandle->pTableCheckInfo == NULL) { - goto out_of_memory; - } + pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); STsdbMeta* pMeta = tsdbGetMeta(tsdb); - assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); - - // todo apply the lastkey of table check to avoid to load header file - for (int32_t i = 0; i < sizeOfGroup; ++i) { - SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i); - - size_t gsize = taosArrayGetSize(group); - assert(gsize > 0); - - for (int32_t j = 0; j < gsize; ++j) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j); - - STableCheckInfo info = { .lastKey = pKeyInfo->lastKey, .pTableObj = pKeyInfo->pTable }; - info.tableId = ((STable*)(pKeyInfo->pTable))->tableId; - - assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE || - info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE)); - - info.tableId.tid = info.pTableObj->tableId.tid; - info.tableId.uid = info.pTableObj->tableId.uid; - - if (ASCENDING_TRAVERSE(pQueryHandle->order)) { - assert(info.lastKey >= pQueryHandle->window.skey); - } else { - assert(info.lastKey <= pQueryHandle->window.skey); - } - - taosArrayPush(pQueryHandle->pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid, - info.tableId.tid, info.lastKey, qinfo); - } - } - - taosArraySort(pQueryHandle->pTableCheckInfo, tsdbCheckInfoCompar); - pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true); + assert(pMeta != NULL); pQueryHandle->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pQueryHandle->pTsdb->config.maxRowsPerFileBlock); if (pQueryHandle->pDataCols == NULL) { @@ -290,19 +314,35 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab goto out_of_memory; } - tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); - tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo); tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo); return (TsdbQueryHandleT) pQueryHandle; -out_of_memory: + out_of_memory: tsdbCleanupQueryHandle(pQueryHandle); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } +TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, void* qinfo) { + STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qinfo); + + STsdbMeta* pMeta = tsdbGetMeta(tsdb); + assert(pMeta != NULL); + + // todo apply the lastkey of table check to avoid to load header file + pQueryHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pQueryHandle, groupList, pMeta); + if (pQueryHandle->pTableCheckInfo == NULL) { + tsdbCleanupQueryHandle(pQueryHandle); + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + tsdbDebug("%p total numOfTable:%" PRIzu " in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo); + return (TsdbQueryHandleT) pQueryHandle; +} + TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, void* qinfo) { pCond->twindow = changeTableGroupByLastrow(groupList); @@ -1920,77 +1960,33 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { pQueryHandle->type = TSDB_QUERY_TYPE_ALL; return true; } else { - STsdbQueryHandle* pSecQueryHandle = calloc(1, sizeof(STsdbQueryHandle)); - if (pSecQueryHandle == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; + STsdbQueryCond cond = { + .order = TSDB_ORDER_ASC, + .numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)), + .twindow = win}; + + cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return false; } - pSecQueryHandle->order = TSDB_ORDER_ASC; - pSecQueryHandle->window = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; - pSecQueryHandle->pTsdb = pQueryHandle->pTsdb; - pSecQueryHandle->type = TSDB_QUERY_TYPE_ALL; - pSecQueryHandle->cur.fid = -1; - pSecQueryHandle->cur.win = TSWINDOW_INITIALIZER; - pSecQueryHandle->checkFiles = true; - pSecQueryHandle->activeIndex = 0; - pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock; - - if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - free(pSecQueryHandle); - return false; + for(int32_t i = 0; i < cond.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); + memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); } - tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem); + STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo); - // allocate buffer in order to load data blocks from file - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + taosTFree(cond.colList); - pSecQueryHandle->statis = calloc(numOfCols, sizeof(SDataStatis)); - pSecQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - if (pSecQueryHandle->statis == NULL || pSecQueryHandle->pColumns == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); + if (pSecQueryHandle->pTableCheckInfo == NULL) { tsdbCleanupQueryHandle(pSecQueryHandle); return false; } - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData colInfo = {{0}, 0}; - SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - - colInfo.info = pCol->info; - colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCol->info.bytes); - if (colInfo.pData == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbCleanupQueryHandle(pSecQueryHandle); - return false; - } - - taosArrayPush(pSecQueryHandle->pColumns, &colInfo); - } - - size_t si = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - pSecQueryHandle->pTableCheckInfo = taosArrayInit(si, sizeof(STableCheckInfo)); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); - assert(pMeta != NULL); - - for (int32_t j = 0; j < si; ++j) { - STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); - STableCheckInfo info = { - .lastKey = pSecQueryHandle->window.skey, - .pTableObj = pCheckInfo->pTableObj, - }; - - info.tableId = pCheckInfo->tableId; - - taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info); - } - - tsdbInitDataBlockLoadInfo(&pSecQueryHandle->dataBlockLoadInfo); - tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo); - pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn); - if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { tsdbCleanupQueryHandle(pSecQueryHandle); return false; @@ -1999,6 +1995,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle)); + size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo); + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); @@ -2012,11 +2011,11 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); // it is ascending order - pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; + pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; pQueryHandle->cur.rows = 2; pQueryHandle->cur.mixBlock = true; - pQueryHandle->order = TSDB_ORDER_DESC; int32_t step = -1;// one step for ascending order traverse for (int32_t j = 0; j < si; ++j) { -- GitLab