提交 5e579398 编写于 作者: H Hongze Cheng

more code

上级 0d7f9ac5
...@@ -857,12 +857,10 @@ static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32 ...@@ -857,12 +857,10 @@ static int32_t tRowNoneUpsertColData(SColData *aColData, int32_t nColData, int32
if (flag) return code; if (flag) return code;
for (int32_t iColData = 0; iColData < nColData; iColData++) { for (int32_t iColData = 0; iColData < nColData; iColData++) {
SColData *pColData = &aColData[iColData]; code = tColDataAppendValueImpl[aColData[iColData].flag][CV_FLAG_NONE](&aColData[iColData], NULL, 0);
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); if (code) return code;
if (code) goto _exit;
} }
_exit:
return code; return code;
} }
static int32_t tRowNullUpsertColData(SColData *aColData, int32_t nColData, STSchema *pSchema, int32_t flag) { static int32_t tRowNullUpsertColData(SColData *aColData, int32_t nColData, STSchema *pSchema, int32_t flag) {
......
...@@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); ...@@ -163,6 +163,7 @@ void tBlockDataDestroy(SBlockData *pBlockData);
int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid);
void tBlockDataReset(SBlockData *pBlockData); void tBlockDataReset(SBlockData *pBlockData);
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid);
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData);
void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData);
......
...@@ -852,13 +852,13 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { ...@@ -852,13 +852,13 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) { if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
} }
while (pRowInfo) { while (pRowInfo) {
// write block data according to table id if necessary (TODO) // if suid changed
if ((pCompactor->tableId.suid != pRowInfo->suid) || if (pCompactor->tableId.suid != pRowInfo->suid) {
(pCompactor->tableId.uid != pRowInfo->uid &&
(pRowInfo->suid == 0 || (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)))) {
code = tsdbCompactWriteBlockData(pCompactor); code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -867,10 +867,37 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) { ...@@ -867,10 +867,37 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0); code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
}
// if uid changed
if (pCompactor->tableId.uid != pRowInfo->uid) {
// if need to write the block data
bool init = false;
if (pCompactor->bData.suid == 0) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
init = true;
} else if (pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
// write SDataBlk
code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
// init block data if need
if (init && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) {
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->tableId.suid = pRowInfo->suid;
pCompactor->tableId.uid = pRowInfo->uid;
}
} }
// check if append/merge the row causes nRow exceed maxRows (TODO) // if append/merge the row causes nRow exceed maxRows
if (0 /* add the row causes row exceed maxRows */) { if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) {
code = tsdbCompactWriteBlockData(pCompactor); code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
......
...@@ -1051,12 +1051,7 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo ...@@ -1051,12 +1051,7 @@ static int32_t tBlockDataUpsertBlockRow(SBlockData *pBlockData, SBlockData *pBlo
if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) { if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) {
cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type); cv = COL_VAL_NONE(pColDataTo->cid, pColDataTo->type);
if (flag) { if (flag == 0 && (code = tColDataAppendValue(pColDataTo, &cv))) goto _exit;
code = tColDataUpdateValue(pColDataTo, &cv, flag > 0);
} else {
code = tColDataAppendValue(pColDataTo, &cv);
}
if (code) goto _exit;
} else { } else {
tColDataGetValue(pColDataFrom, iRow, &cv); tColDataGetValue(pColDataFrom, iRow, &cv);
...@@ -1137,6 +1132,19 @@ _exit: ...@@ -1137,6 +1132,19 @@ _exit:
return code; return code;
} }
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) {
if (pBlockData->nRow == 0) {
return 1;
}
int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) {
return pBlockData->nRow;
} else {
return pBlockData->nRow + 1;
}
}
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) {
int32_t code = 0; int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册