提交 83fa9e63 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -313,6 +313,7 @@ typedef struct { ...@@ -313,6 +313,7 @@ typedef struct {
void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow); void tdSTSRowIterReset(STSRowIter *pIter, STSRow *pRow);
void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema); void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema);
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow);
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal); bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal);
bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal); bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colType, int32_t offset, SCellVal *pVal);
bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal); bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal);
......
...@@ -1204,6 +1204,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2 ...@@ -1204,6 +1204,112 @@ static FORCE_INLINE int32_t compareKvRowColId(const void *key1, const void *key2
} }
} }
int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
STColumn *pTColumn;
SColVal *pColVal;
int32_t nColVal = taosArrayGetSize(pArray);
int32_t varDataLen = 0;
int32_t maxVarDataLen = 0;
int32_t iColVal = 0;
void *varBuf = NULL;
ASSERT(nColVal > 1);
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
} else {
pColVal = NULL;
}
if (iColumn == 0) {
ASSERT(pColVal->cid == pTColumn->colId);
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
if (pColVal) {
varDataLen += (pColVal->value.nData + sizeof(VarDataLenT));
if (maxVarDataLen < pColVal->value.nData) {
maxVarDataLen = pColVal->value.nData;
}
} else {
varDataLen += sizeof(VarDataLenT);
if (pTColumn->type == TSDB_DATA_TYPE_VARCHAR) {
varDataLen += CHAR_BYTES;
if (maxVarDataLen < CHAR_BYTES) {
maxVarDataLen = CHAR_BYTES;
}
} else {
varDataLen += INT_BYTES;
if (maxVarDataLen < INT_BYTES) {
maxVarDataLen = INT_BYTES;
}
}
}
}
}
++iColVal;
}
*ppRow = (STSRow *)taosMemoryCalloc(
1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1));
if (!(*ppRow)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (maxVarDataLen > 0) {
varBuf = taosMemoryMalloc(maxVarDataLen + sizeof(VarDataLenT));
if (!varBuf) {
taosMemoryFreeClear(*ppRow);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetInfo(&rb, pTSchema->numOfCols, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, *ppRow);
iColVal = 0;
for (int32_t iColumn = 0; iColumn < pTSchema->numOfCols; ++iColumn) {
pTColumn = &pTSchema->columns[iColumn];
TDRowValT valType = TD_VTYPE_NORM;
const void *val = NULL;
if (iColVal < nColVal) {
pColVal = (SColVal *)taosArrayGet(pArray, iColVal);
if (pColVal->isNone) {
valType = TD_VTYPE_NONE;
} else if (pColVal->isNull) {
valType = TD_VTYPE_NULL;
} else if (IS_VAR_DATA_TYPE(pTColumn->type)) {
varDataSetLen(varBuf, pColVal->value.nData);
memcpy(varDataVal(varBuf), pColVal->value.pData, pColVal->value.nData);
val = varBuf;
} else {
val = (const void *)&pColVal->value.i64;
}
} else {
pColVal = NULL;
valType = TD_VTYPE_NONE;
}
tdAppendColValToRow(&rb, pTColumn->colId, pTColumn->type, valType, val, true, pTColumn->offset, iColVal);
++iColVal;
}
taosMemoryFreeClear(varBuf);
return 0;
}
bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) { bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCellVal *pVal) {
if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pVal->val = &pIter->pRow->ts; pVal->val = &pIter->pRow->ts;
...@@ -1888,4 +1994,4 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV ...@@ -1888,4 +1994,4 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value); *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, value);
} }
} }
\ No newline at end of file
...@@ -295,7 +295,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile ...@@ -295,7 +295,7 @@ static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFile
} else if (pIMemRow != NULL) { } else if (pIMemRow != NULL) {
} else { } else {
if (!tsdbKeyDeleted(key, pSkyline)) { if (!tsdbKeyDeleted(key, pSkyline)) {
*ppLastRow = buildTsrowFromTsdbrow(&row); code = buildTsrowFromTsdbrow(&row, ppLastRow);
goto _done; goto _done;
} else { } else {
continue; continue;
...@@ -475,15 +475,34 @@ _err: ...@@ -475,15 +475,34 @@ _err:
return code; return code;
} }
static STSRow *tsRowFromTsdbRow(TSDBROW *pRow) { static int32_t tsRowFromTsdbRow(STSchema *pTSchema, TSDBROW *pRow, STSRow **ppRow) {
// TODO: new tsrow from tsdbrow int32_t code = 0;
STSRow *ret = NULL;
SColVal *pColVal = &(SColVal){0};
if (pRow->type == 0) { if (pRow->type == 0) {
return tdRowDup(pRow->pTSRow); *ppRow = tdRowDup(pRow->pTSRow);
} else { } else {
SArray *pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
if (pArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
code = tdSTSRowNew(pArray, pTSchema, ppRow);
if (code) goto _exit;
} }
return ret; _exit:
return code;
} }
static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) { static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int *iSkyline) {
...@@ -584,6 +603,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { ...@@ -584,6 +603,8 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
input[1].next = true; input[1].next = true;
} }
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
do { do {
for (int i = 0; i < 3; ++i) { for (int i = 0; i < 3; ++i) {
if (input[i].next && !input[i].stop) { if (input[i].next && !input[i].stop) {
...@@ -640,13 +661,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { ...@@ -640,13 +661,12 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
// merge if nMerge > 1 // merge if nMerge > 1
if (nMerge > 0) { if (nMerge > 0) {
if (nMerge == 1) { if (nMerge == 1) {
*ppRow = tsRowFromTsdbRow(merge[nMerge]); code = tsRowFromTsdbRow(pTSchema, merge[nMerge - 1], ppRow);
if (code) goto _err;
} else { } else {
// merge 2 or 3 rows // merge 2 or 3 rows
SRowMerger merger = {0}; SRowMerger merger = {0};
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1);
tRowMergerInit(&merger, merge[0], pTSchema); tRowMergerInit(&merger, merge[0], pTSchema);
for (int i = 1; i < nMerge; ++i) { for (int i = 1; i < nMerge; ++i) {
tRowMerge(&merger, merge[i]); tRowMerge(&merger, merge[i]);
...@@ -657,9 +677,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { ...@@ -657,9 +677,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
} }
} while (*ppRow == NULL); } while (*ppRow == NULL);
return code; taosMemoryFreeClear(pTSchema);
return code;
_err: _err:
taosMemoryFreeClear(pTSchema);
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
return code; return code;
} }
......
...@@ -749,8 +749,9 @@ _exit: ...@@ -749,8 +749,9 @@ _exit:
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) { int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
int32_t code = 0; int32_t code = 0;
// TODO
ASSERT(0); code = tdSTSRowNew(pMerger->pArray, pMerger->pTSchema, ppRow);
return code; return code;
} }
...@@ -1278,4 +1279,4 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) { ...@@ -1278,4 +1279,4 @@ int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest) {
_exit: _exit:
return code; return code;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册