diff --git a/include/common/trow.h b/include/common/trow.h index c596134fcdc37c09ef150e3edb1b91ac876fc610..b8b2637bbcf58161f9f27b3808dd1caa07a0fd8c 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -313,6 +313,7 @@ typedef struct { void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow); void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema); +int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow); bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal); bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal); bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal); diff --git a/source/common/src/trow.c b/source/common/src/trow.c index deb99271c1bc2cc8357670d2d5fda50260beb755..167aa2da5f868e93789540426a4ba0d3edfced2d 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -1204,6 +1204,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2 } } +int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) { + STColumn *pTColumn; + SColVal *pColVal; + int32_t nColVal = taosArrayGetSize(pArray); + int32_t varDataLen = 0; + int32_t maxVarDataLen = 0; + int32_t iColVal = 0; + void *varBuf = NULL; + + ASSERT(nColVal > 1); + + for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) { + pTColumn = &pTSchema->columns[iColumn]; + if (iColVal < nColVal) { + pColVal = (SColVal *)taosArrayGet(pArray, iColVal); + } else { + pColVal = NULL; + } + + if (iColumn == 0) { + ASSERT(pColVal->cid == pTColumn->colId); + ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); + ASSERT(pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID); + } else { + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + if (pColVal) { + varDataLen += (pColVal->value.nData + sizeof(VarDataLenT)); + if (maxVarDataLen < pColVal->value.nData) { + maxVarDataLen = pColVal->value.nData; + } + } else { + varDataLen += sizeof(VarDataLenT); + if (pTColumn->type == TSDB_DATA_TYPE_VARCHAR) { + varDataLen += CHAR_BYTES; + if (maxVarDataLen < CHAR_BYTES) { + maxVarDataLen = CHAR_BYTES; + } + } else { + varDataLen += INT_BYTES; + if (maxVarDataLen < INT_BYTES) { + maxVarDataLen = INT_BYTES; + } + } + } + } + } + + ++iColVal; + } + + *ppRow = (STSRow *)taosMemoryCalloc( + 1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1)); + + if (!(*ppRow)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (maxVarDataLen > 0) { + varBuf = taosMemoryMalloc(maxVarDataLen + sizeof(VarDataLenT)); + if (!varBuf) { + taosMemoryFreeClear(*ppRow); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetInfo(&rb, pTSchema->numOfCols, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, *ppRow); + + iColVal = 0; + for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) { + pTColumn = &pTSchema->columns[iColumn]; + + TDRowValT valType = TD_VTYPE_NORM; + const void *val = NULL; + if (iColVal < nColVal) { + pColVal = (SColVal *)taosArrayGet(pArray, iColVal); + if (pColVal->isNone) { + valType = TD_VTYPE_NONE; + } else if (pColVal->isNull) { + valType = TD_VTYPE_NULL; + } else if (IS_VAR_DATA_TYPE(pTColumn->type)) { + varDataSetLen(varBuf, pColVal->value.nData); + memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData); + val = varBuf; + } else { + val = (const void *)&pColVal->value.i64; + } + } else { + pColVal = NULL; + valType = TD_VTYPE_NONE; + } + + tdAppendColValToRow(&rb, pTColumn->colId, pTColumn->type, valType, val, true, pTColumn->offset, iColVal); + + ++iColVal; + } + + taosMemoryFreeClear(varBuf); + + return 0; +} + bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { pVal->val = &pIter->pRow->ts; @@ -1888,4 +1994,4 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value); } -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 6132a47ac393b3b3f73aebe2a9abc1be9337ca8e..fe9d320f6f5467a43a4076c22421d186cd9adc1a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -295,7 +295,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile } else if (pIMemRow != NULL) { } else { if (!tsdbKeyDeleted(key, pSkyline)) { - *ppLastRow = buildTsrowFromTsdbrow(&row); + code = buildTsrowFromTsdbrow(&row, ppLastRow); goto _done; } else { continue; @@ -475,15 +475,34 @@ _err: return code; } -static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) { - // TODO: new tsrow from tsdbrow - STSRow *ret = NULL; +static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) { + int32_t code = 0; + + SColVal *pColVal = &(SColVal){0}; + if (pRow->type == 0) { - return tdRowDup(pRow->pTSRow); + *ppRow = tdRowDup(pRow->pTSRow); } else { + SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); + if (pArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { + tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); + if (taosArrayPush(pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + code = tdSTSRowNew(pArray, pTSchema, ppRow); + if (code) goto _exit; } - return ret; +_exit: + return code; } static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) { @@ -584,6 +603,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { input[1].next = true; } + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); + do { for (int i = 0; i < 3; ++i) { if (input[i].next && !input[i].stop) { @@ -640,13 +661,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { // merge if nMerge > 1 if (nMerge > 0) { if (nMerge == 1) { - *ppRow = tsRowFromTsdbRow(merge[nMerge]); + code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow); + if (code) goto _err; } else { // merge 2 or 3 rows SRowMerger merger = {0}; - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1); - tRowMergerInit(&merger, merge[0], pTSchema); for (int i = 1; i < nMerge; ++i) { tRowMerge(&merger, merge[i]); @@ -657,9 +677,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { } } while (*ppRow == NULL); - return code; + taosMemoryFreeClear(pTSchema); + return code; _err: + taosMemoryFreeClear(pTSchema); tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index e90338538d47fc3520eab414849c70b44e0aed9a..c529a042805d96e91cc009362f86c1316821af57 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -749,8 +749,9 @@ _exit: int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) { int32_t code = 0; - // TODO - ASSERT(0); + + code = tdSTSRowNew(pMerger->pArray, pMerger->pTSchema, ppRow); + return code; } @@ -1278,4 +1279,4 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) { _exit: return code; -} \ No newline at end of file +}