提交 2566bd4a 编写于 作者: M Minglei Jin

row/iter: fix null column iter

上级 33fba6f9
...@@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { ...@@ -755,7 +755,7 @@ SColVal *tRowIterNext(SRowIter *pIter) {
} }
if (pIter->pRow->flag == HAS_NULL) { if (pIter->pRow->flag == HAS_NULL) {
pIter->cv = COL_VAL_NULL(pTColumn->type, pTColumn->colId); pIter->cv = COL_VAL_NULL(pTColumn->colId, pTColumn->type);
goto _exit; goto _exit;
} }
...@@ -2439,7 +2439,7 @@ _exit: ...@@ -2439,7 +2439,7 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) { char *data) {
int32_t code = 0; int32_t code = 0;
if(data == NULL){ if (data == NULL) {
for (int32_t i = 0; i < nRows; ++i) { for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
} }
...@@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt ...@@ -2453,8 +2453,9 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0); code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
} else { } else {
if(ASSERT(varDataTLen(data + offset) <= bytes)){ if (ASSERT(varDataTLen(data + offset) <= bytes)) {
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes); uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset),
bytes);
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
goto _exit; goto _exit;
} }
......
...@@ -195,7 +195,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta); ...@@ -195,7 +195,7 @@ void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, uint64_t suid, void **pReader, const char *idstr); SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids); SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader); void *tsdbCacherowsReaderClose(void *pReader);
......
...@@ -788,6 +788,7 @@ typedef struct SCacheRowsReader { ...@@ -788,6 +788,7 @@ typedef struct SCacheRowsReader {
char **transferBuf; // todo remove it soon char **transferBuf; // todo remove it soon
int32_t numOfCols; int32_t numOfCols;
SArray *pCidList; SArray *pCidList;
int32_t *pSlotIds;
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
STableKeyInfo *pTableList; // table id list STableKeyInfo *pTableList; // table id list
......
...@@ -259,10 +259,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow ...@@ -259,10 +259,7 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
} }
if (!COL_VAL_IS_NONE(pColVal)) { if (!COL_VAL_IS_NONE(pColVal)) {
SLastCol *pLastCol = NULL; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
if (NULL != values_list[i + num_keys]) {
pLastCol = tsdbCacheDeserialize(values_list[i + num_keys]);
}
if (NULL == pLastCol || pLastCol->ts <= keyTs) { if (NULL == pLastCol || pLastCol->ts <= keyTs) {
char *value = NULL; char *value = NULL;
...@@ -330,7 +327,24 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR ...@@ -330,7 +327,24 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
for (int i = 0; i < num_keys; ++i) { for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
if (pLastCol) {
SColVal *pColVal = &pLastCol->colVal;
if (IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
if (pColVal->value.nData) {
pColVal->value.pData = taosMemoryMalloc(pColVal->value.nData);
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
} else {
// recalc: load from tsdb
// still null, then make up a null col value
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
pLastCol = &(SLastCol){.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
// maybe store it back to rocks cache
}
taosArrayPush(pLastArray, pLastCol); taosArrayPush(pLastArray, pLastCol);
taosMemoryFree(values_list[i]); taosMemoryFree(values_list[i]);
......
...@@ -23,10 +23,9 @@ ...@@ -23,10 +23,9 @@
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) { const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
bool allNullRow = true;
for (int32_t i = 0; i < pReader->numOfCols; ++i) { for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
...@@ -42,6 +41,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p ...@@ -42,6 +41,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
varDataSetLen(p->buf, pColVal->colVal.value.nData); varDataSetLen(p->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
taosMemoryFree(pColVal->colVal.value.pData);
} else { } else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes; p->bytes = pReader->pSchema->columns[slotId].bytes;
...@@ -63,6 +63,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p ...@@ -63,6 +63,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal; SColVal* pVal = &pColVal->colVal;
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
continue;
}
allNullRow = false;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataSetNULL(pColInfoData, numOfRows); colDataSetNULL(pColInfoData, numOfRows);
...@@ -71,13 +75,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p ...@@ -71,13 +75,15 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
// tsdbError("buf:%p len:%d index:%d", pReader->transferBuf[slotId], pVal->value.nData, slotId); // tsdbError("buf:%p len:%d index:%d", pReader->transferBuf[slotId], pVal->value.nData, slotId);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
taosMemoryFree(pVal->value.pData);
} }
} else { } else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
} }
} }
pBlock->info.rows += 1; pBlock->info.rows += allNullRow ? 0 : 1;
// pBlock->info.rows += 1;
} else { } else {
tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr); tsdbError("invalid retrieve type:%d, %s", pReader->type, idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -117,7 +123,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id ...@@ -117,7 +123,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
} }
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, uint64_t suid, void** pReader, const char* idstr) { SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
*pReader = NULL; *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
...@@ -130,6 +136,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, ...@@ -130,6 +136,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}; p->verRange = (SVersionRange){.minVer = 0, .maxVer = UINT64_MAX};
p->numOfCols = numOfCols; p->numOfCols = numOfCols;
p->pCidList = pCidList; p->pCidList = pCidList;
p->pSlotIds = pSlotIds;
p->suid = suid; p->suid = suid;
if (numOfTables == 0) { if (numOfTables == 0) {
...@@ -294,6 +301,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -294,6 +301,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayClear(pRow); taosArrayClear(pRow);
continue; continue;
} }
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClear(pRow);
continue;
}
{ {
bool hasNotNullRow = true; bool hasNotNullRow = true;
...@@ -363,6 +375,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 ...@@ -363,6 +375,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
taosArrayClear(pRow); taosArrayClear(pRow);
continue; continue;
} }
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
if (COL_VAL_IS_NONE(&pColVal->colVal)) {
taosArrayClear(pRow);
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr); saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayClear(pRow); taosArrayClear(pRow);
......
...@@ -101,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe ...@@ -101,8 +101,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo); uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), pCidList, suid, &pInfo->pLastrowReader, taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, suid,
pTaskInfo->id.str); &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -237,7 +237,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ...@@ -237,7 +237,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
} }
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, suid, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid,
&pInfo->pLastrowReader, pTaskInfo->id.str); &pInfo->pLastrowReader, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1; pInfo->currentGroupIndex += 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册