未验证 提交 90645bd4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17622 from taosdata/enh/TD-19783-D

enh: remove obsolete codes for raw data row
...@@ -290,6 +290,7 @@ typedef struct { ...@@ -290,6 +290,7 @@ typedef struct {
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) (IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL)) #define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
#if 0
// TODO remove this function // TODO remove this function
static FORCE_INLINE bool isNull(const void *val, int32_t type) { static FORCE_INLINE bool isNull(const void *val, int32_t type) {
switch (type) { switch (type) {
...@@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) { ...@@ -325,6 +326,7 @@ static FORCE_INLINE bool isNull(const void *val, int32_t type) {
return false; return false;
}; };
} }
#endif
typedef struct tDataTypeDescriptor { typedef struct tDataTypeDescriptor {
int16_t type; int16_t type;
......
...@@ -347,11 +347,6 @@ typedef struct SInsertStmt { ...@@ -347,11 +347,6 @@ typedef struct SInsertStmt {
uint8_t precision; uint8_t precision;
} SInsertStmt; } SInsertStmt;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks { typedef struct SVgDataBlocks {
SVgroupInfo vg; SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block int32_t numOfTables; // number of tables in current submit block
...@@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt { ...@@ -363,7 +358,6 @@ typedef struct SVnodeModifOpStmt {
ENodeType nodeType; ENodeType nodeType;
ENodeType sqlNodeType; ENodeType sqlNodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>. SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position const char* sql; // current sql statement position
} SVnodeModifOpStmt; } SVnodeModifOpStmt;
......
...@@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -1369,7 +1369,6 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); SVgDataBlocks* dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks));
...@@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { ...@@ -1625,7 +1624,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
int32_t numOfVg = taosHashGetSize(pVgHash); int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
...@@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) ...@@ -1929,7 +1927,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot); SVnodeModifOpStmt* nodeStmt = (SVnodeModifOpStmt*)(pQuery->pRoot);
nodeStmt->payloadType = PAYLOAD_TYPE_KV;
int32_t numOfVg = taosHashGetSize(pVgHash); int32_t numOfVg = taosHashGetSize(pVgHash);
nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES);
......
...@@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr ...@@ -1531,7 +1531,6 @@ static SSmlHandle *smlBuildSmlInfo(STscObj *pTscObj, SRequestObj *request, SMLPr
uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id); uError("SML:0x%" PRIx64 " create info->pQuery->pRoot error", info->id);
goto cleanup; goto cleanup;
} }
((SVnodeModifOpStmt *)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV;
if (pTscObj) { if (pTscObj) {
info->taos = pTscObj; info->taos = pTscObj;
......
...@@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC ...@@ -152,7 +152,7 @@ int32_t insInitRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataC
int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset, int32_t insGetDataBlockFromList(SHashObj *pHashList, void *id, int32_t idLen, int32_t size, int32_t startOffset,
int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks, int32_t rowSize, STableMeta *pTableMeta, STableDataBlocks **dataBlocks,
SArray *pBlockList, SVCreateTbReq *pCreateTbReq); SArray *pBlockList, SVCreateTbReq *pCreateTbReq);
int32_t insMergeTableDataBlocks(SHashObj *pHashObj, uint8_t payloadType, SArray **pVgDataBlocks); int32_t insMergeTableDataBlocks(SHashObj *pHashObj, SArray **pVgDataBlocks);
int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq); int32_t insBuildCreateTbMsg(STableDataBlocks *pBlocks, SVCreateTbReq *pCreateTbReq);
int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize); int32_t insAllocateMemForSize(STableDataBlocks *pDataBlock, int32_t allSize);
int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf); int32_t insCreateSName(SName *pName, struct SToken *pTableName, int32_t acctId, const char *dbName, SMsgBuf *pMsgBuf);
......
...@@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { ...@@ -1369,7 +1369,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// merge according to vgId // merge according to vgId
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
} }
return insBuildOutput(pCxt); return insBuildOutput(pCxt);
} }
...@@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) { ...@@ -1390,7 +1390,7 @@ static int32_t parseInsertBodyAgain(SInsertParseContext* pCxt) {
parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum); parserDebug("0x%" PRIx64 " insert again input rows: %d", pCxt->pComCxt->requestId, pCxt->totalNum);
// merge according to vgId // merge according to vgId
if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) { if (taosHashGetSize(pCxt->pTableBlockHashObj) > 0) {
CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, pCxt->pOutput->payloadType, &pCxt->pVgDataBlocks)); CHECK_CODE(insMergeTableDataBlocks(pCxt->pTableBlockHashObj, &pCxt->pVgDataBlocks));
} }
return insBuildOutput(pCxt); return insBuildOutput(pCxt);
} }
...@@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache ...@@ -1472,8 +1472,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery, SParseMetaCache
} }
} }
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (!context.pComCxt->needMultiParse) { if (!context.pComCxt->needMultiParse) {
code = skipInsertInto(&context.pSql, &context.msg); code = skipInsertInto(&context.pSql, &context.msg);
......
...@@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash ...@@ -40,8 +40,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
// merge according to vgId // merge according to vgId
if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) { if (taosHashGetSize(insertCtx.pTableBlockHashObj) > 0) {
CHECK_CODE( CHECK_CODE(insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, &insertCtx.pVgDataBlocks));
insMergeTableDataBlocks(insertCtx.pTableBlockHashObj, modifyNode->payloadType, &insertCtx.pVgDataBlocks));
} }
CHECK_CODE(insBuildOutput(&insertCtx)); CHECK_CODE(insBuildOutput(&insertCtx));
......
...@@ -21,9 +21,6 @@ ...@@ -21,9 +21,6 @@
#include "querynodes.h" #include "querynodes.h"
#include "tRealloc.h" #include "tRealloc.h"
#define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
typedef struct SBlockKeyTuple { typedef struct SBlockKeyTuple {
TSKEY skey; TSKEY skey;
void* payloadAddr; void* payloadAddr;
...@@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in ...@@ -315,7 +312,7 @@ int32_t insGetDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
static int32_t getRowExpandSize(STableMeta* pTableMeta) { static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY); int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY);
int32_t columns = getNumOfColumns(pTableMeta); int32_t columns = getNumOfColumns(pTableMeta);
...@@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) { ...@@ -328,6 +325,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
result += (int32_t)TD_BITMAP_BYTES(columns - 1); result += (int32_t)TD_BITMAP_BYTES(columns - 1);
return result; return result;
} }
#endif
void insDestroyBlockArrayList(SArray* pDataBlockList) { void insDestroyBlockArrayList(SArray* pDataBlockList) {
if (pDataBlockList == NULL) { if (pDataBlockList == NULL) {
...@@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) { ...@@ -359,6 +357,7 @@ void insDestroyBlockHashmap(SHashObj* pDataBlockHash) {
taosHashCleanup(pDataBlockHash); taosHashCleanup(pDataBlockHash);
} }
#if 0
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData; SSubmitBlk* pBlocks = (SSubmitBlk*)dataBuf->pData;
...@@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) { ...@@ -401,6 +400,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
dataBuf->prevTS = INT64_MIN; dataBuf->prevTS = INT64_MIN;
} }
#endif
// data block is disordered, sort it in ascending order // data block is disordered, sort it in ascending order
static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) { static int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo) {
...@@ -667,68 +667,31 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p ...@@ -667,68 +667,31 @@ static int sortMergeDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* p
} }
// Erase the empty space reserved for binary data // Erase the empty space reserved for binary data
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple, static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SBlockKeyTuple* blkKeyTuple) {
bool isRawPayload) {
// TODO: optimize this function, handle the case while binary is not presented // TODO: optimize this function, handle the case while binary is not presented
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = getTableInfo(pTableMeta);
SSchema* pSchema = getTableColumnSchema(pTableMeta);
int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen; int32_t nonDataLen = sizeof(SSubmitBlk) + pTableDataBlock->createTbReqLen;
SSubmitBlk* pBlock = pDataBlock; SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen); memcpy(pDataBlock, pTableDataBlock->pData, nonDataLen);
pDataBlock = (char*)pDataBlock + nonDataLen; pDataBlock = (char*)pDataBlock + nonDataLen;
int32_t flen = 0; // original total length of row
if (isRawPayload) {
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
flen += TYPE_BYTES[pSchema[j].type];
}
}
pBlock->schemaLen = pTableDataBlock->createTbReqLen; pBlock->schemaLen = pTableDataBlock->createTbReqLen;
char* p = pTableDataBlock->pData + nonDataLen;
pBlock->dataLen = 0; pBlock->dataLen = 0;
int32_t numOfRows = pBlock->numOfRows;
if (isRawPayload) { int32_t numOfRows = pBlock->numOfRows;
SRowBuilder builder = {0}; for (int32_t i = 0; i < numOfRows; ++i) {
void* payload = (blkKeyTuple + i)->payloadAddr;
tdSRowInit(&builder, pTableMeta->sversion); TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen); memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
for (int32_t i = 0; i < numOfRows; ++i) { pBlock->dataLen += rowTLen;
tdSRowResetBuf(&builder, pDataBlock);
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
int8_t colType = pSchema[j].type;
uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM;
tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j);
toffset += TYPE_BYTES[colType];
p += pSchema[j].bytes;
}
tdSRowEnd(&builder);
int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock);
pDataBlock = (char*)pDataBlock + rowLen;
pBlock->dataLen += rowLen;
}
} else {
for (int32_t i = 0; i < numOfRows; ++i) {
void* payload = (blkKeyTuple + i)->payloadAddr;
TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload);
memcpy(pDataBlock, payload, rowTLen);
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
pBlock->dataLen += rowTLen;
}
} }
return pBlock->dataLen + pBlock->schemaLen; return pBlock->dataLen + pBlock->schemaLen;
} }
int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray** pVgDataBlocks) { int32_t insMergeTableDataBlocks(SHashObj* pHashObj, SArray** pVgDataBlocks) {
const int INSERT_HEAD_SIZE = sizeof(SSubmitReq); const int INSERT_HEAD_SIZE = sizeof(SSubmitReq);
int code = 0; int code = 0;
bool isRawPayload = IS_RAW_PAYLOAD(payloadType);
SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SHashObj* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES); SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
...@@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray* ...@@ -754,8 +717,7 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
} }
ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0); ASSERT(pOneTableBlock->pTableMeta->tableInfo.rowSize > 0);
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format // the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0; int64_t destSize = dataBuf->size + pOneTableBlock->size +
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) + sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta) +
pOneTableBlock->createTbReqLen; pOneTableBlock->createTbReqLen;
...@@ -774,23 +736,18 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray* ...@@ -774,23 +736,18 @@ int32_t insMergeTableDataBlocks(SHashObj* pHashObj, uint8_t payloadType, SArray*
} }
} }
if (isRawPayload) { if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) {
sortRemoveDataBlockDupRowsRaw(pOneTableBlock); tdFreeSBlockRowMerger(pBlkRowMerger);
} else { taosHashCleanup(pVnodeDataBlockHashList);
if ((code = sortMergeDataBlockDupRows(pOneTableBlock, &blkKeyInfo, &pBlkRowMerger)) != 0) { insDestroyBlockArrayList(pVnodeDataBlockList);
tdFreeSBlockRowMerger(pBlkRowMerger); taosMemoryFreeClear(dataBuf->pData);
taosHashCleanup(pVnodeDataBlockHashList); taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
insDestroyBlockArrayList(pVnodeDataBlockList); return code;
taosMemoryFreeClear(dataBuf->pData);
taosMemoryFreeClear(blkKeyInfo.pKeyTuple);
return code;
}
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
} }
ASSERT(blkKeyInfo.pKeyTuple != NULL && pBlocks->numOfRows > 0);
// erase the empty space reserved for binary data // erase the empty space reserved for binary data
int32_t finalLen = int32_t finalLen = trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple);
trimDataBlock(dataBuf->pData + dataBuf->size, pOneTableBlock, blkKeyInfo.pKeyTuple, isRawPayload);
dataBuf->size += (finalLen + sizeof(SSubmitBlk)); dataBuf->size += (finalLen + sizeof(SSubmitBlk));
assert(dataBuf->size <= dataBuf->nAllocSize); assert(dataBuf->size <= dataBuf->nAllocSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册