提交 0f5983ad 编写于 作者: C Cary Xu

fix: merge dup rows in client

上级 9ce366f0
......@@ -438,7 +438,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaW
return 0;
}
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
STSchema* tdGetSTSChemaFromSSChema(SSchema* pSchema, int32_t nCols, int32_t sver);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......
......@@ -4941,14 +4941,14 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
return 0;
}
STSchema *tdGetSTSChemaFromSSChema(SSchema **pSchema, int32_t nCols) {
STSchema *tdGetSTSChemaFromSSChema(SSchema *pSchema, int32_t nCols, int32_t sver) {
STSchemaBuilder schemaBuilder = {0};
if (tdInitTSchemaBuilder(&schemaBuilder, 1) < 0) {
if (tdInitTSchemaBuilder(&schemaBuilder, sver) < 0) {
return NULL;
}
for (int i = 0; i < nCols; i++) {
SSchema *schema = *pSchema + i;
SSchema *schema = pSchema + i;
if (tdAddColToSchema(&schemaBuilder, schema->type, schema->flags, schema->colId, schema->bytes) < 0) {
tdDestroyTSchemaBuilder(&schemaBuilder);
return NULL;
......
......@@ -568,6 +568,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
int32_t maxVarDataLen = 0;
int32_t iColVal = 0;
void *varBuf = NULL;
bool isAlloc = false;
ASSERT(nColVal > 1);
......@@ -610,8 +611,11 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
++iColVal;
}
*ppRow = (STSRow *)taosMemoryCalloc(
1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1));
if (!(*ppRow)) {
*ppRow = (STSRow *)taosMemoryCalloc(
1, sizeof(STSRow) + pTSchema->flen + varDataLen + TD_BITMAP_BYTES(pTSchema->numOfCols - 1));
isAlloc = true;
}
if (!(*ppRow)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -621,7 +625,9 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
if (maxVarDataLen > 0) {
varBuf = taosMemoryMalloc(maxVarDataLen);
if (!varBuf) {
taosMemoryFreeClear(*ppRow);
if(isAlloc) {
taosMemoryFreeClear(*ppRow);
}
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -1323,7 +1329,7 @@ void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColV
SCellVal cv;
SValue value;
ASSERT(iCol > 0);
// ASSERT(iCol > 0);
if (TD_IS_TP_ROW(pRow)) {
tdSTpRowGetVal(pRow, pTColumn->colId, pTColumn->type, pTSchema->flen, pTColumn->offset, iCol - 1, &cv);
......
......@@ -116,7 +116,7 @@ STSchema *genSTSchema(int16_t nCols) {
}
STSchema *pResult = NULL;
pResult = tdGetSTSChemaFromSSChema(&pSchema, nCols);
pResult = tdGetSTSChemaFromSSChema(pSchema, nCols, 1);
taosMemoryFree(pSchema);
return pResult;
......
......@@ -614,7 +614,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);
pTask->tbSink.pTSchema =
tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols);
tdGetSTSChemaFromSSChema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1);
ASSERT(pTask->tbSink.pTSchema);
}
......
......@@ -1200,7 +1200,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
*gotRow = true;
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&schema, spd->numOfCols);
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(schema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
......@@ -1970,7 +1970,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
}
}
#ifdef TD_DEBUG_PRINT_ROW
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
#endif
......@@ -2055,7 +2055,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
#ifdef TD_DEBUG_PRINT_ROW
if (rowEnd) {
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(&pSchema, spd->numOfCols);
STSchema* pSTSchema = tdGetSTSChemaFromSSChema(pSchema, spd->numOfCols, 1);
tdSRowPrint(row, pSTSchema, __func__);
taosMemoryFree(pSTSchema);
}
......
......@@ -12,13 +12,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
// clang-format on
#include "parInsertData.h"
#include "catalog.h"
#include "parInt.h"
#include "parUtil.h"
#include "querynodes.h"
#include "tRealloc.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
......@@ -34,6 +35,31 @@ typedef struct SBlockKeyInfo {
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
typedef struct {
int32_t index;
SArray* rowArray; // array of merged rows(mem allocated by tRealloc)
STSchema* pSchema;
} SBlockRowMerger;
static void tdResetSBlockRowMerger(SBlockRowMerger* pMerger) {
if (pMerger) {
pMerger->index = -1;
taosMemoryFreeClear(pMerger->pSchema);
}
}
static void tdFreeSBlockRowMerger(SBlockRowMerger* pMerger) {
if (pMerger) {
int32_t size = taosArrayGetSize(pMerger->rowArray);
for (int32_t i = 0; i < size; ++i) {
tFree(*(void**)taosArrayGet(pMerger->rowArray, i));
}
taosArrayDestroy(pMerger->rowArray);
taosMemoryFreeClear(pMerger->pSchema);
}
}
static int32_t rowDataCompar(const void* lhs, const void* rhs) {
TSKEY left = *(TSKEY*)lhs;
TSKEY right = *(TSKEY*)rhs;
......@@ -328,7 +354,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
}
// data block is disordered, sort it in ascending order
int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
int16_t nRows = pBlocks->numOfRows;
......@@ -396,6 +422,187 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
return 0;
}
static void* tdGetCurRowFromBlockMerger(SBlockRowMerger* pBlkRowMerger) {
if (pBlkRowMerger && (pBlkRowMerger->index >= 0)) {
ASSERT(pBlkRowMerger->index < taosArrayGetSize(pBlkRowMerger->rowArray));
return *(void**)taosArrayGet(pBlkRowMerger->rowArray, pBlkRowMerger->index);
}
return NULL;
}
static int32_t tdBlockRowMerge(STableDataBlocks* dataBuf, SBlockKeyTuple* pEndKeyTp, int32_t nDupRows,
SBlockRowMerger** pBlkRowMerger, int32_t rowSize) {
ASSERT(nDupRows > 1);
SBlockKeyTuple* pStartKeyTp = pEndKeyTp - (nDupRows - 1);
ASSERT(pStartKeyTp->skey == pEndKeyTp->skey);
STSRow* pEndRow = (STSRow*)pEndKeyTp->payloadAddr;
// TODO: optimization if end row is all normal
#if 0
if(isNormal(pEndRow)) { // set the end row if it is normal and return directly
pStartKeyTp->payloadAddr = pEndKeyTp->payloadAddr;
return TSDB_CODE_SUCCESS;
}
#endif
if (!(*pBlkRowMerger)) {
(*pBlkRowMerger) = taosMemoryCalloc(1, sizeof(**pBlkRowMerger));
if (!(*pBlkRowMerger)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
(*pBlkRowMerger)->index = -1;
if (!(*pBlkRowMerger)->rowArray) {
(*pBlkRowMerger)->rowArray = taosArrayInit(1, sizeof(void*));
if (!(*pBlkRowMerger)->rowArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
}
if (!(*pBlkRowMerger)->pSchema) {
(*pBlkRowMerger)->pSchema = tdGetSTSChemaFromSSChema(
dataBuf->pTableMeta->schema, dataBuf->pTableMeta->tableInfo.numOfColumns, dataBuf->pTableMeta->sversion);
if (!(*pBlkRowMerger)->pSchema) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
}
void* pDestRow = NULL;
++((*pBlkRowMerger)->index);
if ((*pBlkRowMerger)->index < taosArrayGetSize((*pBlkRowMerger)->rowArray)) {
void* pAlloc = *(void**)taosArrayGet((*pBlkRowMerger)->rowArray, (*pBlkRowMerger)->index);
if (tRealloc((uint8_t**)&pAlloc, rowSize) != 0) {
return TSDB_CODE_FAILED;
}
pDestRow = pAlloc;
} else {
if (tRealloc((uint8_t**)&pDestRow, rowSize) != 0) {
return TSDB_CODE_FAILED;
}
taosArrayPush((*pBlkRowMerger)->rowArray, &pDestRow);
}
// merge rows to pDestRow
STSchema* pSchema = (*pBlkRowMerger)->pSchema;
SArray* pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
for (int32_t i = 0; i < pSchema->numOfCols; ++i) {
SColVal colVal = {0};
for (int32_t j = 0; j < nDupRows; ++i) {
tTSRowGetVal((pEndKeyTp - j)->payloadAddr, pSchema, i, &colVal);
if (!colVal.isNone) {
break;
}
}
taosArrayPush(pArray, &colVal);
}
if (tdSTSRowNew(pArray, pSchema, (STSRow**)&pDestRow) < 0) {
taosArrayDestroy(pArray);
return TSDB_CODE_FAILED;
}
taosArrayDestroy(pArray);
return TSDB_CODE_SUCCESS;
}
// data block is disordered, sort it in ascending order, and merge dup rows if exists
static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo,
SBlockRowMerger** ppBlkRowMerger) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
int16_t nRows = pBlocks->numOfRows;
// size is less than the total size, since duplicated rows may be removed.
// allocate memory
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
char* tmp = taosMemoryRealloc(pBlkKeyInfo->pKeyTuple, nAlloc);
if (tmp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple*)tmp;
pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc;
}
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
tdResetSBlockRowMerger(*ppBlkRowMerger);
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
SBlockKeyTuple* pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
char* pBlockData = pBlocks->data + pBlocks->schemaLen;
int n = 0;
while (n < nRows) {
pBlkKeyTuple->skey = TD_ROW_KEY((STSRow*)pBlockData);
pBlkKeyTuple->payloadAddr = pBlockData;
pBlkKeyTuple->index = n;
// next loop
pBlockData += extendedRowSize;
++pBlkKeyTuple;
++n;
}
if (!dataBuf->ordered) {
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
bool hasDup = false;
int32_t nextPos = 0;
int32_t i = 0;
int32_t j = 1;
while (j < nRows) {
TSKEY ti = (pBlkKeyTuple + i)->skey;
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
++j;
continue;
}
if ((j - i) > 1) {
if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
return TSDB_CODE_FAILED;
}
(pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
hasDup = true;
i = j;
} else {
if (hasDup) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
}
++i;
}
++nextPos;
++j;
}
if ((j - i) > 1) {
ASSERT((pBlkKeyTuple + i)->skey == (pBlkKeyTuple + j - 1)->skey);
if (tdBlockRowMerge(dataBuf, (pBlkKeyTuple + j - 1), j - i, ppBlkRowMerger, extendedRowSize) < 0) {
return TSDB_CODE_FAILED;
}
(pBlkKeyTuple + nextPos)->payloadAddr = tdGetCurRowFromBlockMerger(*ppBlkRowMerger);
} else if (hasDup) {
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + i, sizeof(SBlockKeyTuple));
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
}
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
dataBuf->prevTS = INT64_MIN;
return TSDB_CODE_SUCCESS;
}
// Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple,
bool isRawPayload) {
......@@ -464,6 +671,8 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
STableDataBlocks** p = taosHashIterate(pHashObj, NULL);
STableDataBlocks* pOneTableBlock = *p;
SBlockKeyInfo blkKeyInfo = {0}; // share by pOneTableBlock
SBlockRowMerger *pBlkRowMerger = NULL;
while (pOneTableBlock) {
SSubmitBlk* pBlocks = (SSubmitBlk*)pOneTableBlock->pData;
if (pBlocks->numOfRows > 0) {
......@@ -473,6 +682,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
getDataBlockFromList(pVnodeDataBlockHashList, &pOneTableBlock->vgId, sizeof(pOneTableBlock->vgId), TSDB_PAYLOAD_SIZE, INSERT_HEAD_SIZE, 0,
pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList, NULL);
if (ret != TSDB_CODE_SUCCESS) {
tdFreeSBlockRowMerger(pBlkRowMerger);
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
......@@ -490,6 +700,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
if (tmp != NULL) {
dataBuf->pData = tmp;
} else { // failed to allocate memory, free already allocated memory and return error code
tdFreeSBlockRowMerger(pBlkRowMerger);
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
taosMemoryFreeClear(dataBuf->pData);
......@@ -501,7 +712,8 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
if (isRawPayload) {
sortRemoveDataBlockDupRowsRaw(pOneTableBlock);
} else {
if ((code = sortRemoveDataBlockDupRows(pOneTableBlock, &blkKeyInfo)) != 0) {
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
tdFreeSBlockRowMerger(pBlkRowMerger);
taosHashCleanup(pVnodeDataBlockHashList);
destroyBlockArrayList(pVnodeDataBlockList);
taosMemoryFreeClear(dataBuf->pData);
......@@ -529,6 +741,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** p
}
// free the table data blocks;
tdFreeSBlockRowMerger(pBlkRowMerger);
taosHashCleanup(pVnodeDataBlockHashList);
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
*pVgDataBlocks = pVnodeDataBlockList;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册