提交 1c5c4a1d 编写于 作者: C Cary Xu

fix: merge dup rows in client

上级 dd6aa40f
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format on // clang-format off
#include "parInsertData.h" #include "parInsertData.h"
#include "catalog.h" #include "catalog.h"
...@@ -37,14 +37,14 @@ typedef struct SBlockKeyInfo { ...@@ -37,14 +37,14 @@ typedef struct SBlockKeyInfo {
typedef struct { typedef struct {
int32_t index; int32_t index;
SArray* rowArray; // array of merged rows(mem allocated by tRealloc) SArray* rowArray; // array of merged rows(mem allocated by tRealloc/free by tFree)
STSchema* pSchema; STSchema* pSchema;
int64_t tbUid; // suid for child table, uid for normal table
} SBlockRowMerger; } SBlockRowMerger;
static void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) { static FORCE_INLINE void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
if (pMerger) { if (pMerger) {
pMerger->index = -1; pMerger->index = -1;
taosMemoryFreeClear(pMerger->pSchema);
} }
} }
...@@ -57,6 +57,7 @@ static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) { ...@@ -57,6 +57,7 @@ static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
taosArrayDestroy(pMerger->rowArray); taosArrayDestroy(pMerger->rowArray);
taosMemoryFreeClear(pMerger->pSchema); taosMemoryFreeClear(pMerger->pSchema);
taosMemoryFree(pMerger);
} }
} }
...@@ -430,15 +431,15 @@ static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) { ...@@ -430,15 +431,15 @@ static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
return NULL; return NULL;
} }
static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows, static int32_t tdBlockRowMerge(STableMeta* pTableMeta, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
SBlockRowMerger** pBlkRowMerger, int32_t rowSize) { SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
ASSERT(nDupRows > 1); ASSERT(nDupRows > 1);
SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1); SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
ASSERT(pStartKeyTp->skey == pEndKeyTp->skey); ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);
STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
// TODO: optimization if end row is all normal // TODO: optimization if end row is all normal
#if 0 #if 0
STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
if(isNormal(pEndRow)) { // set the end row if it is normal and return directly if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr; pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -461,14 +462,25 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe ...@@ -461,14 +462,25 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe
} }
} }
if ((*pBlkRowMerger)->pSchema) {
if ((*pBlkRowMerger)->pSchema->version != pTableMeta->sversion) {
taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
} else {
if ((*pBlkRowMerger)->tbUid != (pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid)) {
taosMemoryFreeClear((*pBlkRowMerger)->pSchema);
}
}
}
if (!(*pBlkRowMerger)->pSchema) { if (!(*pBlkRowMerger)->pSchema) {
(*pBlkRowMerger)->pSchema = tdGetSTSChemaFromSSChema( (*pBlkRowMerger)->pSchema =
dataBuf->pTableMeta->schema, dataBuf->pTableMeta->tableInfo.numOfColumns, dataBuf->pTableMeta->sversion); tdGetSTSChemaFromSSChema(pTableMeta->schema, pTableMeta->tableInfo.numOfColumns, pTableMeta->sversion);
if (!(*pBlkRowMerger)->pSchema) { if (!(*pBlkRowMerger)->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
(*pBlkRowMerger)->tbUid = pTableMeta->suid > 0 ? pTableMeta->suid : pTableMeta->uid;
} }
void* pDestRow = NULL; void* pDestRow = NULL;
...@@ -491,7 +503,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe ...@@ -491,7 +503,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe
SArray* pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal)); SArray* pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
for (int32_t i = 0; i < pSchema->numOfCols; ++i) { for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
SColVal colVal = {0}; SColVal colVal = {0};
for (int32_t j = 0; j < nDupRows; ++i) { for (int32_t j = 0; j < nDupRows; ++j) {
tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal); tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
if (!colVal.isNone) { if (!colVal.isNone) {
break; break;
...@@ -512,6 +524,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe ...@@ -512,6 +524,7 @@ static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKe
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo, static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
SBlockRowMerger** ppBlkRowMerger) { SBlockRowMerger** ppBlkRowMerger) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
STableMeta* pTableMeta = dataBuf->pTableMeta;
int16_t nRows = pBlocks->numOfRows; int16_t nRows = pBlocks->numOfRows;
// size is less than the total size, since duplicated rows may be removed. // size is less than the total size, since duplicated rows may be removed.
...@@ -566,11 +579,13 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p ...@@ -566,11 +579,13 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
} }
if ((j - i) > 1) { if ((j - i) > 1) {
if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
(pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
hasDup = true; if (!hasDup) {
hasDup = true;
}
i = j; i = j;
} else { } else {
if (hasDup) { if (hasDup) {
...@@ -585,7 +600,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p ...@@ -585,7 +600,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
if ((j - i) > 1) { if ((j - i) > 1) {
ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey); ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) { if (tdBlockRowMerge(pTableMeta, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
(pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger); (pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
...@@ -594,7 +609,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p ...@@ -594,7 +609,7 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
} }
dataBuf->ordered = true; dataBuf->ordered = true;
pBlocks->numOfRows = i + 1; pBlocks->numOfRows = nextPos + 1;
} }
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize; dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册