未验证 提交 6ecd5b29 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20019 from taosdata/enh/3.0_planner_optimize

feat: compatible with older versions of wal
...@@ -3307,6 +3307,12 @@ typedef struct { ...@@ -3307,6 +3307,12 @@ typedef struct {
SArray* aSubmitTbData; // SArray<SSubmitTbData> SArray* aSubmitTbData; // SArray<SSubmitTbData>
} SSubmitReq2; } SSubmitReq2;
typedef struct {
SMsgHead header;
int64_t version;
char data[]; // SSubmitReq2
} SSubmitReq2Msg;
int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq);
int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq); int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2* pReq);
void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
...@@ -3323,6 +3329,7 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); ...@@ -3323,6 +3329,7 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2 #define TSDB_MSG_FLG_DECODE 0x2
#define TSDB_MSG_FLG_CMPT 0x3
typedef struct { typedef struct {
union { union {
......
...@@ -6995,9 +6995,13 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) { ...@@ -6995,9 +6995,13 @@ void tDestroySSubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
return; return;
} }
if (flag == TSDB_MSG_FLG_ENCODE) { if (flag == TSDB_MSG_FLG_ENCODE || flag == TSDB_MSG_FLG_CMPT) {
if (pTbData->pCreateTbReq) { if (pTbData->pCreateTbReq) {
tdDestroySVCreateTbReq(pTbData->pCreateTbReq); if (flag == TSDB_MSG_FLG_ENCODE) {
tdDestroySVCreateTbReq(pTbData->pCreateTbReq);
} else {
tDestroySVCreateTbReq(pTbData->pCreateTbReq, TSDB_MSG_FLG_DECODE);
}
taosMemoryFree(pTbData->pCreateTbReq); taosMemoryFree(pTbData->pCreateTbReq);
} }
......
...@@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
goto _end; goto _end;
...@@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) { if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
goto _end; goto _end;
} }
((SMsgHead *)pBuf)->vgId = TD_VID(pVnode); ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
((SMsgHead *)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/ /*vError("failed to encode submit req since %s", terrstr());*/
......
...@@ -683,13 +683,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -683,13 +683,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SPackedData submit = { SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SMsgHead)), .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
.msgLen = pHead->bodyLen - sizeof(SMsgHead), .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version, .ver = pHead->version,
}; };
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
TD_VID(pTq->pVnode), req.subKey); req.subKey);
return -1; return -1;
} }
if (taosxRsp.blockNum > 0 /* threshold */) { if (taosxRsp.blockNum > 0 /* threshold */) {
......
...@@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead)); void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SMsgHead); int32_t len = msgLen - sizeof(SSubmitReq2Msg);
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
TMSG_INFO(msgType), msg, pReq, len); TMSG_INFO(msgType), msg, pReq, len);
......
...@@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
return -1; return -1;
} }
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead)); void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead); int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version; int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0 #if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
...@@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
...@@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
...@@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table", "), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
...@@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
STSchema* pTschema = pReader->pSchema; STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t numOfRows = 0; int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol; SArray* pCols = pSubmitTbData->aCol;
...@@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
int32_t curRow = 0; int32_t curRow = 0;
int32_t lastRow = 0; int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned == NULL) return -1; if (assigned == NULL) return -1;
// convert and scan one block // convert and scan one block
...@@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
for (int32_t i = 0; i < numOfRows; i++) { for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false; bool buildNew = false;
for (int32_t j = 0; j < numOfCols; j++){ for (int32_t j = 0; j < numOfCols; j++) {
SColData* pCol = taosArrayGet(pCols, j); SColData* pCol = taosArrayGet(pCols, j);
SColVal colVal; SColVal colVal;
tColDataGetValue(pCol, i, &colVal); tColDataGetValue(pCol, i, &colVal);
if (curRow == 0) { if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal); assigned[j] = !COL_VAL_IS_NONE(&colVal);
...@@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
lastRow = curRow; lastRow = curRow;
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){ if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
...@@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
} else { } else {
SArray* pRows = pSubmitTbData->aRowP; SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) { for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i); SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false; bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++){ for (int32_t j = 0; j < pTschema->numOfCols; j++) {
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal); tRowGet(pRow, pTschema, j, &colVal);
if (curRow == 0) { if (curRow == 0) {
...@@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
lastRow = curRow; lastRow = curRow;
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){ if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
...@@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
int32_t colActual = blockDataGetNumOfCols(pBlock); int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) { while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal); tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) { if (colVal.cid < pColData->info.colId) {
...@@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
taosMemoryFree(assigned); taosMemoryFree(assigned);
return 0; return 0;
FAIL: FAIL:
taosMemoryFree(assigned); taosMemoryFree(assigned);
return -1; return -1;
} }
......
...@@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data; const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode; SVnode* pVnode = (SVnode*)vnode;
...@@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ...@@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
} }
static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) {
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret);
if (ret < 0) { if (ret < 0) {
...@@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v ...@@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v
((SMsgHead*)(*pBuf))->vgId = vgId; ((SMsgHead*)(*pBuf))->vgId = vgId;
((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen);
SEncoder coder = {0}; SEncoder coder = {0};
tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead));
if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) { if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) {
rpcFreeCont(*pBuf); rpcFreeCont(*pBuf);
*pBuf = NULL; *pBuf = NULL;
...@@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
goto _end; goto _end;
} }
for (int32_t rowId = 0; rowId < rows; rowId++) { for (int32_t rowId = 0; rowId < rows; rowId++) {
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SVCreateTbReq* pCreateTbReq = &createTbReq; SVCreateTbReq* pCreateTbReq = &createTbReq;
if (!pCreateTbReq) { if (!pCreateTbReq) {
goto _end; goto _end;
...@@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
} }
for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) {
SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId);
STagVal tagVal = { STagVal tagVal = {
.cid = pTSchema->numOfCols + step, .cid = pTSchema->numOfCols + step,
.type = pTagData->info.type, .type = pTagData->info.type,
}; };
void* pData = colDataGetData(pTagData, rowId); void* pData = colDataGetData(pTagData, rowId);
if (colDataIsNull_s(pTagData, rowId)) { if (colDataIsNull_s(pTagData, rowId)) {
...@@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
if (colDataIsNull_s(pTbColInfo, rowId)) { if (colDataIsNull_s(pTbColInfo, rowId)) {
SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX);
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData);
} else { } else {
void* pTbData = colDataGetData(pTbColInfo, rowId); void* pTbData = colDataGetData(pTbColInfo, rowId);
...@@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
taosArrayClear(pVals); taosArrayClear(pVals);
int32_t dataIndex = 0; int32_t dataIndex = 0;
for (int32_t k = 0; k < pTSchema->numOfCols; k++) { for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pCol = &pTSchema->columns[k]; const STColumn* pCol = &pTSchema->columns[k];
if (k == 0) { if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
} }
if (IS_SET_NULL(pCol)) { if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else{ } else {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) { if (colDataIsNull_s(pColData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
...@@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ...@@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void*
int32_t code; int32_t code;
tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code);
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
goto _end; goto _end;
} }
((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr()); tqError("failed to encode submit req since %s", terrstr());
......
...@@ -201,7 +201,7 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -201,7 +201,7 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SDecoder *pCoder = &(SDecoder){0}; SDecoder *pCoder = &(SDecoder){0};
tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SSubmitReq2Msg), pMsg->contLen - sizeof(SSubmitReq2Msg));
if (tStartDecode(pCoder) < 0) { if (tStartDecode(pCoder) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
...@@ -356,7 +356,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -356,7 +356,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
break; break;
/* TSDB */ /* TSDB */
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
if (vnodeProcessSubmitReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
break; break;
case TDMT_VND_DELETE: case TDMT_VND_DELETE:
if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; if (vnodeProcessDeleteReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
...@@ -981,6 +981,186 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, ...@@ -981,6 +981,186 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct SSubmitReqConvertCxt {
SSubmitMsgIter msgIter;
SSubmitBlk *pBlock;
SSubmitBlkIter blkIter;
STSRow *pRow;
STSRowIter rowIter;
SSubmitTbData *pTbData;
STSchema *pTbSchema;
SArray *pColValues;
} SSubmitReqConvertCxt;
static int32_t vnodeResetTableCxt(SMeta *pMeta, SSubmitReqConvertCxt *pCxt) {
taosMemoryFreeClear(pCxt->pTbSchema);
pCxt->pTbSchema = metaGetTbTSchema(pMeta, pCxt->msgIter.suid, pCxt->msgIter.sversion, 1);
if (NULL == pCxt->pTbSchema) {
return TSDB_CODE_INVALID_MSG;
}
tdSTSRowIterInit(&pCxt->rowIter, pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
if (NULL == pCxt->pTbData) {
pCxt->pTbData = taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (NULL == pCxt->pTbData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pCxt->pTbData->flags = 0;
pCxt->pTbData->suid = pCxt->msgIter.suid;
pCxt->pTbData->uid = pCxt->msgIter.uid;
pCxt->pTbData->sver = pCxt->msgIter.sversion;
pCxt->pTbData->pCreateTbReq = NULL;
pCxt->pTbData->aRowP = taosArrayInit(128, POINTER_BYTES);
if (NULL == pCxt->pTbData->aRowP) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayDestroy(pCxt->pColValues);
pCxt->pColValues = taosArrayInit(pCxt->pTbSchema->numOfCols, sizeof(SColVal));
if (NULL == pCxt->pColValues) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pCxt->pTbSchema->numOfCols; ++i) {
SColVal val = COL_VAL_NONE(pCxt->pTbSchema->columns[i].colId, pCxt->pTbSchema->columns[i].type);
taosArrayPush(pCxt->pColValues, &val);
}
return TSDB_CODE_SUCCESS;
}
static void vnodeDestroySubmitReqConvertCxt(SSubmitReqConvertCxt *pCxt) {
taosMemoryFreeClear(pCxt->pTbSchema);
tDestroySSubmitTbData(pCxt->pTbData, TSDB_MSG_FLG_ENCODE);
taosMemoryFreeClear(pCxt->pTbData);
taosArrayDestroy(pCxt->pColValues);
}
static int32_t vnodeCellValConvertToColVal(STColumn *pCol, SCellVal *pCellVal, SColVal *pColVal) {
if (tdValTypeIsNone(pCellVal->valType)) {
pColVal->flag = CV_FLAG_NONE;
return TSDB_CODE_SUCCESS;
}
if (tdValTypeIsNull(pCellVal->valType)) {
pColVal->flag = CV_FLAG_NULL;
return TSDB_CODE_SUCCESS;
}
if (IS_VAR_DATA_TYPE(pCol->type)) {
pColVal->value.nData = varDataLen(pCellVal->val);
pColVal->value.pData = varDataVal(pCellVal->val);
} else if (TSDB_DATA_TYPE_FLOAT == pCol->type) {
float f = GET_FLOAT_VAL(pCellVal->val);
memcpy(&pColVal->value.val, &f, sizeof(f));
} else if (TSDB_DATA_TYPE_DOUBLE == pCol->type) {
pColVal->value.val = *(int64_t *)pCellVal->val;
} else {
GET_TYPED_DATA(pColVal->value.val, int64_t, pCol->type, pCellVal->val);
}
pColVal->flag = CV_FLAG_VALUE;
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeTSRowConvertToColValArray(SSubmitReqConvertCxt *pCxt) {
int32_t code = TSDB_CODE_SUCCESS;
tdSTSRowIterReset(&pCxt->rowIter, pCxt->pRow);
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < pCxt->pTbSchema->numOfCols; ++i) {
STColumn *pCol = pCxt->pTbSchema->columns + i;
SCellVal cellVal = {0};
if (!tdSTSRowIterFetch(&pCxt->rowIter, pCol->colId, pCol->type, &cellVal)) {
break;
}
code = vnodeCellValConvertToColVal(pCol, &cellVal, (SColVal *)taosArrayGet(pCxt->pColValues, i));
}
return code;
}
static int32_t vnodeDecodeCreateTbReq(SSubmitReqConvertCxt *pCxt) {
if (pCxt->msgIter.schemaLen <= 0) {
return TSDB_CODE_SUCCESS;
}
pCxt->pTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (NULL == pCxt->pTbData->pCreateTbReq) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SDecoder decoder = {0};
tDecoderInit(&decoder, pCxt->pBlock->data, pCxt->msgIter.schemaLen);
int32_t code = tDecodeSVCreateTbReq(&decoder, pCxt->pTbData->pCreateTbReq);
tDecoderClear(&decoder);
return code;
}
static int32_t vnodeSubmitReqConvertToSubmitReq2(SVnode *pVnode, SSubmitReq *pReq, SSubmitReq2 *pReq2) {
pReq2->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
if (NULL == pReq2->aSubmitTbData) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSubmitReqConvertCxt cxt = {0};
int32_t code = tInitSubmitMsgIter(pReq, &cxt.msgIter);
while (TSDB_CODE_SUCCESS == code) {
code = tGetSubmitMsgNext(&cxt.msgIter, &cxt.pBlock);
if (TSDB_CODE_SUCCESS == code) {
if (NULL == cxt.pBlock) {
break;
}
code = vnodeResetTableCxt(pVnode->pMeta, &cxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = tInitSubmitBlkIter(&cxt.msgIter, cxt.pBlock, &cxt.blkIter);
}
if (TSDB_CODE_SUCCESS == code) {
code = vnodeDecodeCreateTbReq(&cxt);
}
while (TSDB_CODE_SUCCESS == code && (cxt.pRow = tGetSubmitBlkNext(&cxt.blkIter)) != NULL) {
code = vnodeTSRowConvertToColValArray(&cxt);
if (TSDB_CODE_SUCCESS == code) {
SRow **pNewRow = taosArrayReserve(cxt.pTbData->aRowP, 1);
code = tRowBuild(cxt.pColValues, cxt.pTbSchema, pNewRow);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pReq2->aSubmitTbData, cxt.pTbData) ? TSDB_CODE_OUT_OF_MEMORY : TSDB_CODE_SUCCESS);
}
if (TSDB_CODE_SUCCESS == code) {
taosMemoryFreeClear(cxt.pTbData);
}
}
vnodeDestroySubmitReqConvertCxt(&cxt);
return code;
}
static int32_t vnodeRebuildSubmitReqMsg(SSubmitReq2 *pSubmitReq, void **ppMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char *pMsg = NULL;
uint32_t msglen = 0;
tEncodeSize(tEncodeSSubmitReq2, pSubmitReq, msglen, code);
if (TSDB_CODE_SUCCESS == code) {
pMsg = taosMemoryMalloc(msglen);
if (NULL == pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
tEncoderInit(&encoder, pMsg, msglen);
code = tEncodeSSubmitReq2(&encoder, pSubmitReq);
tEncoderClear(&encoder);
}
if (TSDB_CODE_SUCCESS == code) {
*ppMsg = pMsg;
}
return code;
}
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
int32_t code = 0; int32_t code = 0;
terrno = 0; terrno = 0;
...@@ -993,14 +1173,27 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -993,14 +1173,27 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
// decode SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
SDecoder dc = {0}; if (0 == pMsg->version) {
tDecoderInit(&dc, pReq, len); code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) { if (TSDB_CODE_SUCCESS == code) {
code = TSDB_CODE_INVALID_MSG; code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
goto _exit; }
if (TSDB_CODE_SUCCESS != code) {
goto _exit;
}
} else {
// decode
pReq = POINTER_SHIFT(pReq, sizeof(SSubmitReq2Msg));
len -= sizeof(SSubmitReq2Msg);
SDecoder dc = {0};
tDecoderInit(&dc, pReq, len);
if (tDecodeSSubmitReq2(&dc, pSubmitReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
tDecoderClear(&dc);
} }
tDecoderClear(&dc);
for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) { for (int32_t i = 0; i < TARRAY_SIZE(pSubmitReq->aSubmitTbData); ++i) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i);
...@@ -1140,11 +1333,15 @@ _exit: ...@@ -1140,11 +1333,15 @@ _exit:
// clear // clear
taosArrayDestroy(newTbUids); taosArrayDestroy(newTbUids);
tDestroySSubmitReq2(pSubmitReq, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(pSubmitReq, 0 == pMsg->version ? TSDB_MSG_FLG_CMPT : TSDB_MSG_FLG_DECODE);
tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE);
if (code) terrno = code; if (code) terrno = code;
if (0 == pMsg->version) {
taosMemoryFree(pReq);
}
return code; return code;
} }
......
...@@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int ...@@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = taosMemoryMalloc(len); pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) { if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SMsgHead*)pBuf)->vgId = htonl(vgId); ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
......
...@@ -290,7 +290,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* ...@@ -290,7 +290,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
} }
int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode); int32_t code = createTableDataCxt(pTableMeta, pCreateTbReq, pTableCxt, colMode);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
void* pData = *pTableCxt; // deal scan coverity void* pData = *pTableCxt; // deal scan coverity
code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES); code = taosHashPut(pHash, id, idLen, &pData, POINTER_BYTES);
} }
return code; return code;
...@@ -501,14 +501,15 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin ...@@ -501,14 +501,15 @@ static int32_t buildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uin
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = taosMemoryMalloc(len); pBuf = taosMemoryMalloc(len);
if (NULL == pBuf) { if (NULL == pBuf) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
((SMsgHead*)pBuf)->vgId = htonl(vgId); ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId);
((SMsgHead*)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg*)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
code = tEncodeSSubmitReq2(&encoder, pReq); code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
...@@ -679,13 +680,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate ...@@ -679,13 +680,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
pStart += BitmapLen(numOfRows); pStart += BitmapLen(numOfRows);
} }
char* pData = pStart; char* pData = pStart;
// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c]));
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if(needChangeLength) { if (needChangeLength) {
pStart += htonl(colLength[c]); pStart += htonl(colLength[c]);
}else{ } else {
pStart += colLength[c]; pStart += colLength[c];
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册