diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 3ba8c0bc9e830a099eaa299549b8fda3f3ee189f..be894df89aa4b72ec9ba93cb7c142a8c7fd25524 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -857,12 +857,10 @@ static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32 if (flag) return code; for (int32_t iColData = 0; iColData < nColData; iColData++) { - SColData *pColData = &aColData[iColData]; - code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); - if (code) goto _exit; + code = tColDataAppendValueImpl[aColData[iColData].flag][CV_FLAG_NONE](&aColData[iColData], NULL, 0); + if (code) return code; } -_exit: return code; } static int32_t tRowNullUpsertColData(SColData *aColData, int32_t nColData, STSchema *pSchema, int32_t flag) { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 34dfe43b3e91e4f864090adad06ada61c3df722c..905bf6c411fd2758116a5fea94d8b898647dd3cc 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid); int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 16a255a1e0ca2243668a0b1cea0660703174dbcb..21caab8fee4fa67559554fbd02df83cc42f7f18e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -852,13 +852,13 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { TSDB_CHECK_CODE(code, lino, _exit); + pCompactor->tableId.suid = pRowInfo->suid; + pCompactor->tableId.uid = pRowInfo->uid; } while (pRowInfo) { - // write block data according to table id if necessary (TODO) - if ((pCompactor->tableId.suid != pRowInfo->suid) || - (pCompactor->tableId.uid != pRowInfo->uid && - (pRowInfo->suid == 0 || (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)))) { + // if suid changed + if (pCompactor->tableId.suid != pRowInfo->suid) { code = tsdbCompactWriteBlockData(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); @@ -867,10 +867,37 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); + pCompactor->tableId.suid = pRowInfo->suid; + pCompactor->tableId.uid = pRowInfo->uid; + } + + // if uid changed + if (pCompactor->tableId.uid != pRowInfo->uid) { + // if need to write the block data + bool init = false; + if (pCompactor->bData.suid == 0) { + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + init = true; + } else if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) { + code = tsdbCompactWriteBlockData(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // write SDataBlk + code = tsdbCompactWriteDataBlk(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); + + // init block data if need + if (init && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { + TSDB_CHECK_CODE(code, lino, _exit); + pCompactor->tableId.suid = pRowInfo->suid; + pCompactor->tableId.uid = pRowInfo->uid; + } } - // check if append/merge the row causes nRow exceed maxRows (TODO) - if (0 /* add the row causes row exceed maxRows */) { + // if append/merge the row causes nRow exceed maxRows + if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) { code = tsdbCompactWriteBlockData(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 5c0424de34ce7e35f2ac189741662b321ceb1d8a..2afa44374e775e9076fe6ecdd31477700b6a1bbf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1051,12 +1051,7 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) { cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type); - if (flag) { - code = tColDataUpdateValue(pColDataTo, &cv, flag > 0); - } else { - code = tColDataAppendValue(pColDataTo, &cv); - } - if (code) goto _exit; + if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit; } else { tColDataGetValue(pColDataFrom, iRow, &cv); @@ -1137,6 +1132,19 @@ _exit: return code; } +int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) { + if (pBlockData->nRow == 0) { + return 1; + } + + int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; + if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + return pBlockData->nRow; + } else { + return pBlockData->nRow + 1; + } +} + int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0;