diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index d6ab974c6ced8305ddad0ebc86f072a08bf9c978..66f539332aeca0df8ae690a30da93d6f63b98647 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { } if (pIter->pRow->flag == HAS_NULL) { - pIter->cv = COL_VAL_NULL(pTColumn->type, pTColumn->colId); + pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type); goto _exit; } @@ -2439,7 +2439,7 @@ _exit: int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data) { int32_t code = 0; - if(data == NULL){ + if (data == NULL) { for (int32_t i = 0; i < nRows; ++i) { code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); } @@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); if (code) goto _exit; } else { - if(ASSERT(varDataTLen(data + offset) <= bytes)){ - uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); + if (ASSERT(varDataTLen(data + offset) <= bytes)) { + uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), + bytes); code = TSDB_CODE_INVALID_PARA; goto _exit; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index cadf6d1db7d4181729b195bf38ffca01d98fb95a..ebb18ecc765772a4f4a8ae6098d00f5294454bdb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -195,7 +195,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray *pCidList, uint64_t suid, void **pReader, const char *idstr); + SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ee816bb1226c76ae3e4d4ebc85fc82c52d6e4152..08d4cbad1ac82cc324ae9e10a836ff344522fa03 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -788,6 +788,7 @@ typedef struct SCacheRowsReader { char **transferBuf; // todo remove it soon int32_t numOfCols; SArray *pCidList; + int32_t *pSlotIds; int32_t type; int32_t tableIndex; // currently returned result tables STableKeyInfo *pTableList; // table id list diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 04564f72c2d06ffab7ac5cb3a58185e961ff24d8..63dbb9a4d34353adc03d3d4ff0c61bbe9a12b876 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -259,10 +259,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } if (!COL_VAL_IS_NONE(pColVal)) { - SLastCol *pLastCol = NULL; - if (NULL != values_list[i + num_keys]) { - pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); - } + SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]); if (NULL == pLastCol || pLastCol->ts <= keyTs) { char *value = NULL; @@ -330,7 +327,24 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR for (int i = 0; i < num_keys; ++i) { SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); + if (pLastCol) { + SColVal *pColVal = &pLastCol->colVal; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + uint8_t *pVal = pColVal->value.pData; + if (pColVal->value.nData) { + pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData); + memcpy(pColVal->value.pData, pVal, pColVal->value.nData); + } + } + } else { + // recalc: load from tsdb + + // still null, then make up a null col value + int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); + pLastCol = &(SLastCol){.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; + // maybe store it back to rocks cache + } taosArrayPush(pLastArray, pLastCol); taosMemoryFree(values_list[i]); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 3d6bcd42e7c0c373421bf13b09fabc0dd7ea2f2c..48c72ca68546752c91c882d96790061a67e527c0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -23,10 +23,9 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; + bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { - bool allNullRow = true; - for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); @@ -42,6 +41,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p varDataSetLen(p->buf, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size + taosMemoryFree(pColVal->colVal.value.pData); } else { memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); p->bytes = pReader->pSchema->columns[slotId].bytes; @@ -63,6 +63,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); SColVal* pVal = &pColVal->colVal; + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + continue; + } + allNullRow = false; if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { colDataSetNULL(pColInfoData, numOfRows); @@ -71,13 +75,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p // tsdbError("buf:%p len:%d index:%d", pReader->transferBuf[slotId], pVal->value.nData, slotId); memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + taosMemoryFree(pVal->value.pData); } } else { colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); } } - pBlock->info.rows += 1; + pBlock->info.rows += allNullRow ? 0 : 1; + // pBlock->info.rows += 1; } else { tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); return TSDB_CODE_INVALID_PARA; @@ -117,7 +123,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray* pCidList, uint64_t suid, void** pReader, const char* idstr) { + SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -130,6 +136,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->numOfCols = numOfCols; p->pCidList = pCidList; + p->pSlotIds = pSlotIds; p->suid = suid; if (numOfTables == 0) { @@ -294,6 +301,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayClear(pRow); continue; } + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + taosArrayClear(pRow); + continue; + } { bool hasNotNullRow = true; @@ -363,6 +375,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 taosArrayClear(pRow); continue; } + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + if (COL_VAL_IS_NONE(&pColVal->colVal)) { + taosArrayClear(pRow); + continue; + } saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); taosArrayClear(pRow); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 2df24463b7609ac37fcb426190f1c24a06b6d67f..925e3054505af7039a6c3d00672880aebca0ebce 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -101,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, - taosArrayGetSize(pInfo->matchInfo.pList), pCidList, suid, &pInfo->pLastrowReader, - pTaskInfo->id.str); + taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, suid, + &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -237,7 +237,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, suid, + taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1;