提交 4936eb5b 编写于 作者: H Hongze Cheng

more code

上级 835c42e5
......@@ -109,7 +109,7 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData);
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);
// SRowIter ================================
int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter);
......@@ -137,7 +137,7 @@ void tColDataInit(SColData *pColData, int16_t cid, int8_t type, int8_t smaOn)
void tColDataClear(SColData *pColData);
void tColDataDeepClear(SColData *pColData);
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, int32_t flag);
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward);
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
......
......@@ -862,7 +862,7 @@ static int32_t tRowAppendNoneToColData(SColData *aColData, int32_t nColData) {
_exit:
return code;
}
static int32_t tRowAppendNullToColData(SColData *aColData, int32_t nColData, STSchema *pSchema) {
static int32_t tRowNullUpsertColData(SColData *aColData, int32_t nColData, STSchema *pSchema) {
int32_t code = 0;
int32_t iColData = 0;
......@@ -895,7 +895,7 @@ static int32_t tRowAppendNullToColData(SColData *aColData, int32_t nColData, STS
_exit:
return code;
}
static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
static int32_t tRowTupleUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
int32_t code = 0;
int32_t iColData = 0;
......@@ -1001,7 +1001,7 @@ static int32_t tRowAppendTupleToColData(SRow *pRow, STSchema *pTSchema, SColData
_exit:
return code;
}
static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
int32_t code = 0;
SKVIdx *pKVIdx = (SKVIdx *)pRow->data;
......@@ -1087,30 +1087,27 @@ static int32_t tRowAppendKVToColData(SRow *pRow, STSchema *pTSchema, SColData *a
_exit:
return code;
}
int32_t tRowAppendToColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData) {
/* flag > 0: forward update
* flag == 0: append
* flag < 0: backward update
*/
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag) {
ASSERT(pRow->sver == pTSchema->version);
ASSERT(nColData > 0);
int32_t code = 0;
if (pRow->flag == HAS_NONE) {
code = tRowAppendNoneToColData(aColData, nColData);
goto _exit;
if (flag) {
return TSDB_CODE_SUCCESS;
} else {
return tRowAppendNoneToColData(aColData, nColData);
}
} else if (pRow->flag == HAS_NULL) {
code = tRowAppendNullToColData(aColData, nColData, pTSchema);
goto _exit;
return tRowNullUpsertColData(aColData, nColData, pTSchema);
} else if (pRow->flag >> 4) { // KV row
return tRowKVUpsertColData(pRow, pTSchema, aColData, nColData);
} else { // TUPLE row
return tRowTupleUpsertColData(pRow, pTSchema, aColData, nColData);
}
if (pRow->flag >> 4) { // KV row
code = tRowAppendKVToColData(pRow, pTSchema, aColData, nColData);
if (code) goto _exit;
} else {
code = tRowAppendTupleToColData(pRow, pTSchema, aColData, nColData);
if (code) goto _exit;
}
_exit:
return code;
}
// STag ========================================
......@@ -1910,91 +1907,91 @@ int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal) {
pColVal->value.nData);
}
static int32_t tColDataUpdateValue10(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue10(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue11(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue11(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue12(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue12(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue20(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue20(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue21(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue21(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue22(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue22(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue30(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue31(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue31(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue32(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue32(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue40(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue40(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue41(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue41(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue42(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue42(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue50(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue51(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue51(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue52(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue52(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue60(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue61(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue61(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue62(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue62(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue71(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue71(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) {
static int32_t tColDataUpdateValue72(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) {
ASSERT(0);
return 0;
}
static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, int32_t flag) = {
static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward) = {
{NULL, NULL, NULL}, // 0
{tColDataUpdateValue10, tColDataUpdateValue11, tColDataUpdateValue12}, // HAS_NONE
{tColDataUpdateValue20, tColDataUpdateValue21, tColDataUpdateValue22}, // HAS_NULL
......@@ -2004,11 +2001,11 @@ static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pDa
{tColDataUpdateValue60, tColDataUpdateValue61, tColDataUpdateValue62}, // HAS_VALUE|HAS_NULL
{tColDataUpdateValue70, tColDataUpdateValue71, tColDataUpdateValue72}, // HAS_VALUE|HAS_NULL|HAS_NONE
};
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, int32_t flag) {
int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward) {
ASSERT(pColData->cid == pColVal->cid && pColData->type == pColVal->type);
return tColDataUpdateValueImpl[pColData->flag][pColVal->flag](
pColData, IS_VAR_DATA_TYPE(pColData->type) ? pColVal->value.pData : (uint8_t *)&pColVal->value.val,
pColVal->value.nData, flag);
pColVal->value.nData, forward);
}
static FORCE_INLINE void tColDataGetValue1(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NONE
......
......@@ -163,7 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData);
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
......
......@@ -855,7 +855,7 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
}
while (pRowInfo) {
// write block data according to table id if necessary
// write block data according to table id if necessary (TODO)
if ((pCompactor->tableId.suid != pRowInfo->suid) ||
(pCompactor->tableId.uid != pRowInfo->uid &&
(pRowInfo->suid == 0 || (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)))) {
......@@ -869,18 +869,17 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
TSDB_CHECK_CODE(code, lino, _exit);
}
// check if append/merge the row causes nRow exceed maxRows
// check if append/merge the row causes nRow exceed maxRows (TODO)
if (0 /* add the row causes row exceed maxRows */) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
// append/merge the row
// code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
// TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
// iter to the next row
code = tsdbCompactNextRow(pCompactor);
......
......@@ -1030,7 +1030,12 @@ void tBlockDataClear(SBlockData *pBlockData) {
}
}
static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) {
/* flag > 0: forward update
* flag == 0: insert
* flag < 0: backward update
*/
static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow,
int32_t flag) {
int32_t code = 0;
SColVal cv = {0};
......@@ -1045,12 +1050,21 @@ static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlo
}
if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type));
cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
if (flag) {
code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
} else {
code = tColDataAppendValue(pColDataTo, &cv);
}
if (code) goto _exit;
} else {
tColDataGetValue(pColDataFrom, iRow, &cv);
code = tColDataAppendValue(pColDataTo, &cv);
if (flag) {
code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
} else {
code = tColDataAppendValue(pColDataTo, &cv);
}
if (code) goto _exit;
pColDataFrom = (++iColDataFrom < pBlockDataFrom->nColData) ? &pBlockDataFrom->aColData[iColDataFrom] : NULL;
......@@ -1082,12 +1096,11 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
if (code) goto _exit;
pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow);
SColVal cv = {0};
if (pRow->type == TSDBROW_ROW_FMT) {
code = tRowAppendToColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData);
code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */);
if (code) goto _exit;
} else if (pRow->type == TSDBROW_COL_FMT) {
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */);
if (code) goto _exit;
} else {
ASSERT(0);
......@@ -1097,15 +1110,44 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
_exit:
return code;
}
static int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
// version
int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1];
int64_t rversion = TSDBROW_VERSION(pRow);
ASSERT(lversion != rversion);
if (rversion > lversion) {
pBlockData->aVersion[pBlockData->nRow - 1] = rversion;
}
int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
// update other rows
if (pRow->type == TSDBROW_ROW_FMT) {
code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData,
(rversion > lversion) ? 1 : -1 /* update */);
if (code) goto _exit;
} else if (pRow->type == TSDBROW_COL_FMT) {
code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1);
if (code) goto _exit;
} else {
ASSERT(0);
}
_exit:
return code;
}
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
int32_t code = 0;
ASSERT(pBlockData->suid || pBlockData->uid);
if (pBlockData->nRow == 0) {
pBlockData->uid = uid;
} else if (pBlockData->uid && pBlockData->uid != uid) {
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
}
if (pBlockData->uid && pBlockData->uid != uid) {
ASSERT(pBlockData->suid);
code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1));
......@@ -1118,7 +1160,13 @@ int32_t tBlockDataAppendRowEx(SBlockData *pBlockData, TSDBROW *pRow, STSchema *p
pBlockData->uid = 0;
}
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
// decide append/update row
int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
return tBlockDataUpdateRow(pBlockData, pRow, pTSchema);
} else {
return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid);
}
}
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册