diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 7ec75e718813b98822773cedbeec24e614b7fe33..9679e909225cf97c39eaff2787eac95135d711bb 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -109,7 +109,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowDestroy(SRow *pRow); void tRowSort(SArray *aRowP); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); -int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData); +int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); @@ -137,7 +137,7 @@ void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn) void tColDataClear(SColData *pColData); void tColDataDeepClear(SColData *pColData); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); -int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, int32_t flag); +int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 72755dae3cc811a26ef7b2aaeb6a95cfa211aa29..4752eb4261d6a0c4220cef64a81573b073738ee8 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -862,7 +862,7 @@ static int32_t tRowAppendNoneToColData(SColData *aColData, int32_t nColData) { _exit: return code; } -static int32_t tRowAppendNullToColData(SColData *aColData, int32_t nColData, STSchema *pSchema) { +static int32_t tRowNullUpsertColData(SColData *aColData, int32_t nColData, STSchema *pSchema) { int32_t code = 0; int32_t iColData = 0; @@ -895,7 +895,7 @@ static int32_t tRowAppendNullToColData(SColData *aColData, int32_t nColData, STS _exit: return code; } -static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) { +static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) { int32_t code = 0; int32_t iColData = 0; @@ -1001,7 +1001,7 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData _exit: return code; } -static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) { +static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) { int32_t code = 0; SKVIdx *pKVIdx = (SKVIdx *)pRow->data; @@ -1087,30 +1087,27 @@ static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SColData *a _exit: return code; } -int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) { +/* flag > 0: forward update + * flag == 0: append + * flag < 0: backward update + */ +int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag) { ASSERT(pRow->sver == pTSchema->version); ASSERT(nColData > 0); - int32_t code = 0; - if (pRow->flag == HAS_NONE) { - code = tRowAppendNoneToColData(aColData, nColData); - goto _exit; + if (flag) { + return TSDB_CODE_SUCCESS; + } else { + return tRowAppendNoneToColData(aColData, nColData); + } } else if (pRow->flag == HAS_NULL) { - code = tRowAppendNullToColData(aColData, nColData, pTSchema); - goto _exit; + return tRowNullUpsertColData(aColData, nColData, pTSchema); + } else if (pRow->flag >> 4) { // KV row + return tRowKVUpsertColData(pRow, pTSchema, aColData, nColData); + } else { // TUPLE row + return tRowTupleUpsertColData(pRow, pTSchema, aColData, nColData); } - - if (pRow->flag >> 4) { // KV row - code = tRowAppendKVToColData(pRow, pTSchema, aColData, nColData); - if (code) goto _exit; - } else { - code = tRowAppendTupleToColData(pRow, pTSchema, aColData, nColData); - if (code) goto _exit; - } - -_exit: - return code; } // STag ======================================== @@ -1910,91 +1907,91 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) { pColVal->value.nData); } -static int32_t tColDataUpdateValue10(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue10(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue11(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue11(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue12(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue12(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue20(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue20(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue21(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue21(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue22(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue22(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue31(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue31(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue32(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue32(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue40(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue40(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue41(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue41(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue42(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue42(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue51(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue51(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue52(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue52(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue61(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue61(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue62(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue62(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue71(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue71(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) { +static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) { ASSERT(0); return 0; } -static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) = { +static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) = { {NULL, NULL, NULL}, // 0 {tColDataUpdateValue10, tColDataUpdateValue11, tColDataUpdateValue12}, // HAS_NONE {tColDataUpdateValue20, tColDataUpdateValue21, tColDataUpdateValue22}, // HAS_NULL @@ -2004,11 +2001,11 @@ static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pDa {tColDataUpdateValue60, tColDataUpdateValue61, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL {tColDataUpdateValue70, tColDataUpdateValue71, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE }; -int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, int32_t flag) { +int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward) { ASSERT(pColData->cid == pColVal->cid && pColData->type == pColVal->type); return tColDataUpdateValueImpl[pColData->flag][pColVal->flag]( pColData, IS_VAR_DATA_TYPE(pColData->type) ? pColVal->value.pData : (uint8_t *)&pColVal->value.val, - pColVal->value.nData, flag); + pColVal->value.nData, forward); } static FORCE_INLINE void tColDataGetValue1(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NONE diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index de764045b22a0aa1e2326ca4b8e895cd8bf5b3a8..34dfe43b3e91e4f864090adad06ada61c3df722c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -163,7 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); -int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 85d0b8223cdf7057c36a013b1edca5765bf07f61..16a255a1e0ca2243668a0b1cea0660703174dbcb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -855,7 +855,7 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { } while (pRowInfo) { - // write block data according to table id if necessary + // write block data according to table id if necessary (TODO) if ((pCompactor->tableId.suid != pRowInfo->suid) || (pCompactor->tableId.uid != pRowInfo->uid && (pRowInfo->suid == 0 || (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)))) { @@ -869,18 +869,17 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { TSDB_CHECK_CODE(code, lino, _exit); } - // check if append/merge the row causes nRow exceed maxRows + // check if append/merge the row causes nRow exceed maxRows (TODO) if (0 /* add the row causes row exceed maxRows */) { code = tsdbCompactWriteBlockData(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); } // append/merge the row - // code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); - // TSDB_CHECK_CODE(code, lino, _exit); - pCompactor->tableId.suid = pRowInfo->suid; pCompactor->tableId.uid = pRowInfo->uid; + code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); + TSDB_CHECK_CODE(code, lino, _exit); // iter to the next row code = tsdbCompactNextRow(pCompactor); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index b47d49dfb7966f9b3173ebbd87d5410e4deab272..5c0424de34ce7e35f2ac189741662b321ceb1d8a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1030,7 +1030,12 @@ void tBlockDataClear(SBlockData *pBlockData) { } } -static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) { +/* flag > 0: forward update + * flag == 0: insert + * flag < 0: backward update + */ +static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow, + int32_t flag) { int32_t code = 0; SColVal cv = {0}; @@ -1045,12 +1050,21 @@ static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlo } if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) { - code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type)); + cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type); + if (flag) { + code = tColDataUpdateValue(pColDataTo, &cv, flag > 0); + } else { + code = tColDataAppendValue(pColDataTo, &cv); + } if (code) goto _exit; } else { tColDataGetValue(pColDataFrom, iRow, &cv); - code = tColDataAppendValue(pColDataTo, &cv); + if (flag) { + code = tColDataUpdateValue(pColDataTo, &cv, flag > 0); + } else { + code = tColDataAppendValue(pColDataTo, &cv); + } if (code) goto _exit; pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL; @@ -1082,12 +1096,11 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS if (code) goto _exit; pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); - SColVal cv = {0}; if (pRow->type == TSDBROW_ROW_FMT) { - code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData); + code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */); if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { - code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); + code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */); if (code) goto _exit; } else { ASSERT(0); @@ -1097,15 +1110,44 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS _exit: return code; } +static int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + + // version + int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1]; + int64_t rversion = TSDBROW_VERSION(pRow); + ASSERT(lversion != rversion); + if (rversion > lversion) { + pBlockData->aVersion[pBlockData->nRow - 1] = rversion; + } -int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { + // update other rows + if (pRow->type == TSDBROW_ROW_FMT) { + code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, + (rversion > lversion) ? 1 : -1 /* update */); + if (code) goto _exit; + } else if (pRow->type == TSDBROW_COL_FMT) { + code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1); + if (code) goto _exit; + } else { + ASSERT(0); + } + +_exit: + return code; +} + +int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0; ASSERT(pBlockData->suid || pBlockData->uid); if (pBlockData->nRow == 0) { pBlockData->uid = uid; - } else if (pBlockData->uid && pBlockData->uid != uid) { + return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); + } + + if (pBlockData->uid && pBlockData->uid != uid) { ASSERT(pBlockData->suid); code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); @@ -1118,7 +1160,13 @@ int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *p pBlockData->uid = 0; } - return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); + // decide append/update row + int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; + if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + return tBlockDataUpdateRow(pBlockData, pRow, pTSchema); + } else { + return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); + } } void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {