未验证 提交 5ce5dd2d 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #15723 from taosdata/feature/TD-11274-3.0

enh: adjust numOfRows in SubmitBlk from int16_t to int32_t
...@@ -227,8 +227,7 @@ typedef struct SSubmitBlk { ...@@ -227,8 +227,7 @@ typedef struct SSubmitBlk {
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block int32_t numOfRows; // total number of rows in current submit block
int16_t padding; // TODO just for padding here
char data[]; char data[];
} SSubmitBlk; } SSubmitBlk;
...@@ -256,7 +255,7 @@ typedef struct { ...@@ -256,7 +255,7 @@ typedef struct {
int32_t sversion; // data schema version int32_t sversion; // data schema version
int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t dataLen; // data part length, not including the SSubmitBlk head
int32_t schemaLen; // schema length, if length is 0, no schema exists int32_t schemaLen; // schema length, if length is 0, no schema exists
int16_t numOfRows; // total number of rows in current submit block int32_t numOfRows; // total number of rows in current submit block
// head of SSubmitBlk // head of SSubmitBlk
int32_t numOfBlocks; int32_t numOfBlocks;
const void* pMsg; const void* pMsg;
......
...@@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) ...@@ -3144,10 +3144,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
blk->uid = htobe64(uid); blk->uid = htobe64(uid);
blk->suid = htobe64(suid); blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pTableMeta->sversion); blk->sversion = htonl(pTableMeta->sversion);
blk->schemaLen = htonl(schemaLen); blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows); blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen); blk->dataLen = htonl(dataLen);
subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen; subReq->length = sizeof(SSubmitReq) + sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks = 1; subReq->numOfBlocks = 1;
...@@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) { ...@@ -3373,10 +3372,9 @@ static int32_t tmqWriteRaw(TAOS* taos, void* data, int32_t dataLen) {
blk->uid = htobe64(uid); blk->uid = htobe64(uid);
blk->suid = htobe64(suid); blk->suid = htobe64(suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(pSW->version); blk->sversion = htonl(pSW->version);
blk->schemaLen = htonl(schemaLen); blk->schemaLen = htonl(schemaLen);
blk->numOfRows = htons(rows); blk->numOfRows = htonl(rows);
blk->dataLen = htonl(dataLen); blk->dataLen = htonl(dataLen);
subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen; subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
subReq->numOfBlocks++; subReq->numOfBlocks++;
......
...@@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks ...@@ -2028,11 +2028,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t dataLen = blk->dataLen; int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid); blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid); blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion); blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen); blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen); blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows); blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen); blk = (SSubmitBlk*)(blk->data + dataLen);
} }
} else { } else {
......
...@@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ...@@ -76,7 +76,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
pIter->sversion = htonl((*pPBlock)->sversion); pIter->sversion = htonl((*pPBlock)->sversion);
pIter->dataLen = htonl((*pPBlock)->dataLen); pIter->dataLen = htonl((*pPBlock)->dataLen);
pIter->schemaLen = htonl((*pPBlock)->schemaLen); pIter->schemaLen = htonl((*pPBlock)->schemaLen);
pIter->numOfRows = htons((*pPBlock)->numOfRows); pIter->numOfRows = htonl((*pPBlock)->numOfRows);
} }
return 0; return 0;
} }
......
...@@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo ...@@ -117,7 +117,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows); blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
// TODO // TODO
blkHead->suid = htobe64(suid); blkHead->suid = htobe64(suid);
......
...@@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -111,7 +111,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
// pBlock->sversion = htonl(pBlock->sversion); // pBlock->sversion = htonl(pBlock->sversion);
// pBlock->dataLen = htonl(pBlock->dataLen); // pBlock->dataLen = htonl(pBlock->dataLen);
// pBlock->schemaLen = htonl(pBlock->schemaLen); // pBlock->schemaLen = htonl(pBlock->schemaLen);
// pBlock->numOfRows = htons(pBlock->numOfRows); // pBlock->numOfRows = htonl(pBlock->numOfRows);
#if 0 #if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
......
...@@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { ...@@ -395,9 +395,8 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pBlk->uid = htobe64(tbUid); pBlk->uid = htobe64(tbUid);
pBlk->suid = htobe64(tbUid); pBlk->suid = htobe64(tbUid);
pBlk->sversion = htonl(schemaVer); pBlk->sversion = htonl(schemaVer);
pBlk->padding = htonl(0);
pBlk->schemaLen = htonl(0); pBlk->schemaLen = htonl(0);
pBlk->numOfRows = htons(mockRowNum); pBlk->numOfRows = htonl(mockRowNum);
pBlk->dataLen = htonl(mockRowNum * mockRowLen); pBlk->dataLen = htonl(mockRowNum * mockRowLen);
for (uint32_t r = 0; r < mockRowNum; ++r) { for (uint32_t r = 0; r < mockRowNum; ++r) {
pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen); pRow = (STSRow *)POINTER_SHIFT(pBlk, sizeof(SSubmitBlk) + r * mockRowLen);
......
...@@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) { ...@@ -225,7 +225,7 @@ int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
} }
blkHead->dataLen = htonl(dataLen); blkHead->dataLen = htonl(dataLen);
blkHead->numOfRows = htons(rows); blkHead->numOfRows = htonl(rows);
ret->length += sizeof(SSubmitBlk) + dataLen; ret->length += sizeof(SSubmitBlk) + dataLen;
blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen); blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen);
......
...@@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks * ...@@ -122,7 +122,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks *
pBlocks->sversion = dataBuf->pTableMeta->sversion; pBlocks->sversion = dataBuf->pTableMeta->sversion;
pBlocks->schemaLen = dataBuf->createTbReqLen; pBlocks->schemaLen = dataBuf->createTbReqLen;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { if (pBlocks->numOfRows + numOfRows >= INT32_MAX) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} else { } else {
pBlocks->numOfRows += numOfRows; pBlocks->numOfRows += numOfRows;
......
...@@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) { ...@@ -292,11 +292,10 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
int32_t schemaLen = blk->schemaLen; int32_t schemaLen = blk->schemaLen;
blk->uid = htobe64(blk->uid); blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid); blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion); blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen); blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen); blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows); blk->numOfRows = htonl(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen); blk = (SSubmitBlk*)(blk->data + schemaLen + dataLen);
} }
} }
...@@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da ...@@ -1267,7 +1266,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
} }
dataBuf->numOfTables = 1; dataBuf->numOfTables = 1;
...@@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa ...@@ -1339,7 +1338,7 @@ static int32_t parseDataFromFile(SInsertParseContext* pCxt, SToken filePath, STa
SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(dataBuf->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, dataBuf, numOfRows)) {
return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pCxt->msg, "too many rows in sql, total number of rows should be less than INT32_MAX");
} }
dataBuf->numOfTables = 1; dataBuf->numOfTables = 1;
...@@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in ...@@ -1986,7 +1985,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu ...@@ -2074,7 +2073,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, bind->num)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
} }
} }
...@@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols ...@@ -2444,7 +2443,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData); SSubmitBlk* pBlocks = (SSubmitBlk*)(pDataBlock->pData);
if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) { if (TSDB_CODE_SUCCESS != setBlockInfo(pBlocks, pDataBlock, rowNum)) {
return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than 32767"); return buildInvalidOperationMsg(&pBuf, "too many rows in sql, total number of rows should be less than INT32_MAX");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -110,15 +110,15 @@ class InsertTest : public Test { ...@@ -110,15 +110,15 @@ class InsertTest : public Test {
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
cout << "Block:" << i << endl; cout << "Block:" << i << endl;
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding) cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", sversion:" << ntohl(blk->sversion)
<< ", sversion:" << ntohl(blk->sversion) << ", dataLen:" << ntohl(blk->dataLen) << ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen)
<< ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl; << ", numOfRows:" << ntohl(blk->numOfRows) << endl;
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
} }
} }
} }
void checkReslut(int32_t numOfTables, int16_t numOfRows1, int16_t numOfRows2 = -1) { void checkReslut(int32_t numOfTables, int32_t numOfRows1, int32_t numOfRows2 = -1) {
SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_); SVnodeModifOpStmt* pStmt = getVnodeModifStmt(res_);
ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV); ASSERT_EQ(pStmt->payloadType, PAYLOAD_TYPE_KV);
ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT); ASSERT_EQ(pStmt->insertType, TSDB_QUERY_TYPE_INSERT);
...@@ -134,7 +134,7 @@ class InsertTest : public Test { ...@@ -134,7 +134,7 @@ class InsertTest : public Test {
int32_t numOfBlocks = ntohl(submit->numOfBlocks); int32_t numOfBlocks = ntohl(submit->numOfBlocks);
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1); SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
ASSERT_EQ(ntohs(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1))); ASSERT_EQ(ntohl(blk->numOfRows), (0 == i ? numOfRows1 : (numOfRows2 > 0 ? numOfRows2 : numOfRows1)));
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen)); blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册