diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 808c7c8fa648dc00bf18f88ad11f8f869df57b7c..fd00ba6a823316c1fb8b644de97df99a4ab5a92b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -163,6 +163,12 @@ typedef struct { int32_t vgVersion; } SBuildUseDBInput; +typedef struct SField { + char name[TSDB_COL_NAME_LEN]; + uint8_t type; + int32_t bytes; +} SField; + #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -252,16 +258,23 @@ typedef struct SSchema { typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; - int32_t numOfTags; int32_t numOfColumns; - SSchema pSchemas[]; + int32_t numOfTags; + SArray* pColumns; + SArray* pTags; } SMCreateStbReq; +int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq); +void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropStbReq; +int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq); +void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq); + typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index d245d04166d90f7015239e5eec0459fb812c9838..292b4ab2c913b9967743531a10841268dd1c5e3c 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -37,12 +37,6 @@ typedef struct SQueryNode { #define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type) -typedef struct SField { - char name[TSDB_COL_NAME_LEN]; - uint8_t type; - int32_t bytes; -} SField; - typedef struct SFieldInfo { int16_t numOfOutput; // number of column in result SField *final; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6f094b20f805664d965a6e9b0cd3e4c429c573ce..6e3bd5fb669cca384e8c163720e60f873a02a713 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -336,3 +336,82 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) { buf = taosDecodeFixedU8(buf, &pReq->type); return buf; } + +int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) { + int32_t tlen = 0; + + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeFixedI32(buf, pReq->numOfColumns); + tlen += taosEncodeFixedI32(buf, pReq->numOfTags); + + for (int32_t i = 0; i < pReq->numOfColumns; ++i) { + SField *pField = taosArrayGet(pReq->pColumns, i); + tlen += taosEncodeFixedI8(buf, pField->type); + tlen += taosEncodeFixedI32(buf, pField->bytes); + tlen += taosEncodeString(buf, pField->name); + } + + for (int32_t i = 0; i < pReq->numOfTags; ++i) { + SField *pField = taosArrayGet(pReq->pTags, i); + tlen += taosEncodeFixedI8(buf, pField->type); + tlen += taosEncodeFixedI32(buf, pField->bytes); + tlen += taosEncodeString(buf, pField->name); + } + + return tlen; +} + +void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) { + buf = taosDecodeStringTo(buf, pReq->name); + buf = taosDecodeFixedI8(buf, &pReq->igExists); + buf = taosDecodeFixedI32(buf, &pReq->numOfColumns); + buf = taosDecodeFixedI32(buf, &pReq->numOfTags); + + pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField)); + pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField)); + if (pReq->pColumns == NULL || pReq->pTags == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < pReq->numOfColumns; ++i) { + SField field = {0}; + buf = taosDecodeFixedI8(buf, &field.type); + buf = taosDecodeFixedI32(buf, &field.bytes); + buf = taosDecodeStringTo(buf, field.name); + if (taosArrayPush(pReq->pColumns, &field) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + + for (int32_t i = 0; i < pReq->numOfTags; ++i) { + SField field = {0}; + buf = taosDecodeFixedI8(buf, &field.type); + buf = taosDecodeFixedI32(buf, &field.bytes); + buf = taosDecodeStringTo(buf, field.name); + if (taosArrayPush(pReq->pTags, &field) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + + return buf; +} + +int32_t tSerializeSMDropStbReq(void **buf, SMDropStbReq *pReq) { + int32_t tlen = 0; + + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedI8(buf, pReq->igNotExists); + + return tlen; +} + +void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) { + buf = taosDecodeStringTo(buf, pReq->name); + buf = taosDecodeFixedI8(buf, &pReq->igNotExists); + + return buf; +} diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c9228ae78552b3c8ebd663629fe8ee981bf4c1e8..b91dc03e48bc1a278b5d330422d872d473284bf6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -304,8 +304,8 @@ typedef struct { int32_t nextColId; int32_t numOfColumns; int32_t numOfTags; - SSchema* pTags; SSchema* pColumns; + SSchema* pTags; SRWLatch lock; } SStbObj; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7b21f33702889d96986f83a507d9c26c5b17ebe8..30f3f550fbbfb36069c441d663bae288503fb349 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -262,7 +262,7 @@ static SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { return mndAcquireDb(pMnode, db); } -static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { +static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { SName name = {0}; tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -295,7 +295,7 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb return pHead; } -static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { +static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen) { SName name = {0}; tNameFromString(&name, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -323,14 +323,6 @@ static void *mndBuildDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, } static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { - pCreate->numOfColumns = htonl(pCreate->numOfColumns); - pCreate->numOfTags = htonl(pCreate->numOfTags); - int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags; - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pCreate->pSchemas[i]; - pSchema->bytes = htonl(pSchema->bytes); - } - if (pCreate->igExists < 0 || pCreate->igExists > 1) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; @@ -346,18 +338,39 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) { return -1; } - int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS); - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pCreate->pSchemas[i]; - if (pSchema->type < 0) { + SField *pField = taosArrayGet(pCreate->pColumns, 0) ; + if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + + for (int32_t i = 0; i < pCreate->numOfColumns; ++i) { + SField *pField = taosArrayGet(pCreate->pColumns, i); + if (pField->type < 0) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; } - if (pSchema->bytes <= 0) { + if (pField->bytes <= 0) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; } - if (pSchema->name[0] == 0) { + if (pField->name[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + } + + for (int32_t i = 0; i < pCreate->numOfTags; ++i) { + SField *pField = taosArrayGet(pCreate->pTags, i); + if (pField->type < 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + if (pField->bytes <= 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + if (pField->name[0] == 0) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; return -1; } @@ -404,7 +417,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -440,7 +453,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pVgroup->dbUid != pDb->uid) continue; int32_t contLen = 0; - void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -485,16 +498,23 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr return -1; } - memcpy(stbObj.pColumns, pCreate->pSchemas, stbObj.numOfColumns * sizeof(SSchema)); - memcpy(stbObj.pTags, pCreate->pSchemas + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema)); - for (int32_t i = 0; i < stbObj.numOfColumns; ++i) { - stbObj.pColumns[i].colId = stbObj.nextColId; + SField *pField = taosArrayGet(pCreate->pColumns, i); + SSchema *pSchema = &stbObj.pColumns[i]; + pSchema->type = pField->type; + pSchema->bytes = pField->bytes; + memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); + pSchema->colId = stbObj.nextColId; stbObj.nextColId++; } for (int32_t i = 0; i < stbObj.numOfTags; ++i) { - stbObj.pTags[i].colId = stbObj.nextColId; + SField *pField = taosArrayGet(pCreate->pTags, i); + SSchema *pSchema = &stbObj.pTags[i]; + pSchema->type = pField->type; + pSchema->bytes = pField->bytes; + memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); + pSchema->colId = stbObj.nextColId; stbObj.nextColId++; } @@ -519,57 +539,60 @@ CREATE_STB_OVER: } static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMCreateStbReq *pCreate = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SStbObj *pTopicStb = NULL; + SStbObj *pStb = NULL; + SDbObj *pDb = NULL; + SMCreateStbReq createReq = {0}; - mDebug("stb:%s, start to create", pCreate->name); + if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER; - if (mndCheckCreateStbReq(pCreate) != 0) { - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } + mDebug("stb:%s, start to create", createReq.name); + if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER; - SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); + pStb = mndAcquireStb(pMnode, createReq.name); if (pStb != NULL) { - mndReleaseStb(pMnode, pStb); - if (pCreate->igExists) { - mDebug("stb:%s, already exist, ignore exist is set", pCreate->name); - return 0; + if (createReq.igExists) { + mDebug("stb:%s, already exist, ignore exist is set", createReq.name); + code = 0; + goto CREATE_STB_OVER; } else { terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto CREATE_STB_OVER; } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto CREATE_STB_OVER; } - // topic should have different name with stb - SStbObj *pTopicStb = mndAcquireStb(pMnode, pCreate->name); + pTopicStb = mndAcquireStb(pMnode, createReq.name); if (pTopicStb != NULL) { - mndReleaseStb(pMnode, pTopicStb); terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC; - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto CREATE_STB_OVER; } - SDbObj *pDb = mndAcquireDbByStb(pMnode, pCreate->name); + pDb = mndAcquireDbByStb(pMnode, createReq.name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + goto CREATE_STB_OVER; } - int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb); - mndReleaseDb(pMnode, pDb); + code = mndCreateStb(pMnode, pReq, &createReq, pDb); +CREATE_STB_OVER: if (code != 0) { - mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; + mError("stb:%s, failed to create since %s", createReq.name, terrstr()); + } else { + code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + mndReleaseStb(pMnode, pStb); + mndReleaseStb(pMnode, pTopicStb); + mndReleaseDb(pMnode, pDb); + taosArrayClear(createReq.pColumns); + taosArrayClear(createReq.pTags); + + return code; } static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { @@ -891,7 +914,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -1049,7 +1072,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pVgroup->dbUid != pDb->uid) continue; int32_t contLen = 0; - void *pReq = mndBuildDropStbReq(pMnode, pVgroup, pStb, &contLen); + void *pReq = mndBuildVDropStbReq(pMnode, pVgroup, pStb, &contLen); if (pReq == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -1095,28 +1118,30 @@ DROP_STB_OVER: } static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMDropStbReq *pDrop = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + + SMDropStbReq dropReq = {0}; + tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq); - mDebug("stb:%s, start to drop", pDrop->name); + mDebug("stb:%s, start to drop", dropReq.name); - SStbObj *pStb = mndAcquireStb(pMnode, pDrop->name); + SStbObj *pStb = mndAcquireStb(pMnode, dropReq.name); if (pStb == NULL) { - if (pDrop->igNotExists) { - mDebug("stb:%s, not exist, ignore not exist is set", pDrop->name); + if (dropReq.igNotExists) { + mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name); return 0; } else { terrno = TSDB_CODE_MND_STB_NOT_EXIST; - mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } } - SDbObj *pDb = mndAcquireDbByStb(pMnode, pDrop->name); + SDbObj *pDb = mndAcquireDbByStb(pMnode, dropReq.name); if (pDb == NULL) { mndReleaseStb(pMnode, pStb); terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } @@ -1125,7 +1150,7 @@ static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { mndReleaseStb(pMnode, pStb); if (code != 0) { - mError("stb:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("stb:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 402795eb039bd8992545f690318ca9aabbf02acb..951964a8e1a1a44b17e2093d90dafa6b8f8fd2a9 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -22,19 +22,19 @@ class MndTestStb : public ::testing::Test { void SetUp() override {} void TearDown() override {} - SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen); - SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen); - SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen); - SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen); - SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen); - SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname, - int32_t* pContLen); - SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, - int32_t* pContLen); - SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen); - SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen); - SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, - int32_t* pContLen); + SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen); + SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen); + void* BuildCreateStbReq(const char* stbname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname, + int32_t* pContLen); + SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, + int32_t* pContLen); + SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen); + SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, + int32_t* pContLen); }; Testbase MndTestStb::test; @@ -78,53 +78,62 @@ SDropDbReq* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) { return pReq; } -SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { - int32_t cols = 2; - int32_t tags = 3; - int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq); - - SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen); - strcpy(pReq->name, stbname); - pReq->numOfTags = htonl(tags); - pReq->numOfColumns = htonl(cols); +void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) { + SMCreateStbReq createReq = {0}; + createReq.numOfColumns = 2; + createReq.numOfTags = 3; + createReq.igExists = 0; + createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField)); + createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField)); + strcpy(createReq.name, stbname); { - SSchema* pSchema = &pReq->pSchemas[0]; - pSchema->bytes = htonl(8); - pSchema->type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema->name, "ts"); + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(field.name, "ts"); + taosArrayPush(createReq.pColumns, &field); } { - SSchema* pSchema = &pReq->pSchemas[1]; - pSchema->bytes = htonl(12); - pSchema->type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema->name, "col1"); + SField field = {0}; + field.bytes = 12; + field.type = TSDB_DATA_TYPE_BINARY; + strcpy(field.name, "col1"); + taosArrayPush(createReq.pColumns, &field); } { - SSchema* pSchema = &pReq->pSchemas[2]; - pSchema->bytes = htonl(2); - pSchema->type = TSDB_DATA_TYPE_TINYINT; - strcpy(pSchema->name, "tag1"); + SField field = {0}; + field.bytes = 2; + field.type = TSDB_DATA_TYPE_TINYINT; + strcpy(field.name, "tag1"); + taosArrayPush(createReq.pTags, &field); } { - SSchema* pSchema = &pReq->pSchemas[3]; - pSchema->bytes = htonl(8); - pSchema->type = TSDB_DATA_TYPE_BIGINT; - strcpy(pSchema->name, "tag2"); + SField field = {0}; + field.bytes = 8; + field.type = TSDB_DATA_TYPE_BIGINT; + strcpy(field.name, "tag2"); + taosArrayPush(createReq.pTags, &field); } { - SSchema* pSchema = &pReq->pSchemas[4]; - pSchema->bytes = htonl(16); - pSchema->type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema->name, "tag3"); + SField field = {0}; + field.bytes = 16; + field.type = TSDB_DATA_TYPE_BINARY; + strcpy(field.name, "tag3"); + taosArrayPush(createReq.pTags, &field); } - *pContLen = contLen; - return pReq; + int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); + void* pHead = rpcMallocCont(tlen); + + void* pBuf = pHead; + tSerializeSMCreateStbReq(&pBuf, &createReq); + *pContLen = tlen; + return pHead; } SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) { @@ -260,9 +269,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { } { - int32_t contLen = 0; - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + int32_t contLen = 0; + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } @@ -379,12 +388,15 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { } { - int32_t contLen = sizeof(SMDropStbReq); + SMDropStbReq dropReq = {0}; + strcpy(dropReq.name, stbname); - SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen); - strcpy(pReq->name, stbname); + int32_t contLen = tSerializeSMDropStbReq(NULL, &dropReq); + void* pHead = rpcMallocCont(contLen); + void* pBuf = pHead; + tSerializeSMDropStbReq(&pBuf, &dropReq); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pHead, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } @@ -418,8 +430,8 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } @@ -483,8 +495,8 @@ TEST_F(MndTestStb, 03_Alter_Stb_DropTag) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); } @@ -529,8 +541,8 @@ TEST_F(MndTestStb, 04_Alter_Stb_AlterTagName) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); } @@ -598,8 +610,8 @@ TEST_F(MndTestStb, 05_Alter_Stb_AlterTagBytes) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); } @@ -656,8 +668,8 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); } @@ -721,8 +733,8 @@ TEST_F(MndTestStb, 07_Alter_Stb_DropColumn) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); } @@ -786,8 +798,8 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) { } { - SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen); - SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); + void* pReq = BuildCreateStbReq(stbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0); } diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index 8f2c2ad4b34399da900e2f3960459eb0e1e94b58..82648e0bfa56d6c40705d30eb05d9f578133ffe3 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -10,8 +10,8 @@ SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf); -SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); -SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); +char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); +char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index c7a1fd26a05741c7a4195703df11ab911cc2e02e..e1828b6a26d44b5aef648723b1aeab605ff94d5d 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -249,112 +249,64 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx return pCreateMsg; } -SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { - SSchema* pSchema; - - int32_t numOfTags = 0; - int32_t numOfCols = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pColumns); - if (pCreateTableSql->colInfo.pTagColumns != NULL) { - numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns); +char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { + SMCreateStbReq createReq = {0}; + createReq.igExists = pCreateTableSql->existCheck ? 1 : 0; + createReq.pColumns = pCreateTableSql->colInfo.pColumns; + createReq.pTags = pCreateTableSql->colInfo.pTagColumns; + createReq.numOfColumns = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pColumns); + createReq.numOfTags = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns); + + SName n = {0}; + if (createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf) != 0) { + return NULL; } - SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema)); - if (pCreateStbMsg == NULL) { + if (tNameExtractFullName(&n, createReq.name) != 0) { + buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified"); return NULL; } - char* pMsg = NULL; -#if 0 - int32_t tableType = pCreateTableSql->type; - if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value - SArray* list = pInfo->pCreateTableInfo->childTableInfo; - - int32_t numOfTables = (int32_t)taosArrayGetSize(list); - pCreateStbMsg->numOfTables = htonl(numOfTables); - - pMsg = (char*)pCreateMsg; - for (int32_t i = 0; i < numOfTables; ++i) { - SCreateTableMsg* pCreate = (SCreateTableMsg*)pMsg; - - pCreate->numOfColumns = htons(pCmd->numOfCols); - pCreate->numOfTags = htons(pCmd->count); - pMsg += sizeof(SCreateTableMsg); - - SCreatedTableInfo* p = taosArrayGet(list, i); - strcpy(pCreate->tableName, p->fullname); - pCreate->igExists = (p->igExist) ? 1 : 0; - - // use dbinfo from table id without modifying current db info - pMsg = serializeTagData(&p->tagdata, pMsg); - - int32_t len = (int32_t)(pMsg - (char*)pCreate); - pCreate->len = htonl(len); - } - - } else { -#endif - // create (super) table - SName n = {0}; - int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf); - if (code != 0) { - return NULL; - } - - code = tNameExtractFullName(&n, pCreateStbMsg->name); - if (code != 0) { - buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified"); - return NULL; - } - - pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0; - pCreateStbMsg->numOfColumns = htonl(numOfCols); - pCreateStbMsg->numOfTags = htonl(numOfTags); - - pSchema = (SSchema*)pCreateStbMsg->pSchemas; - for (int i = 0; i < numOfCols; ++i) { - SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i); - pSchema->type = pField->type; - pSchema->bytes = htonl(pField->bytes); - strcpy(pSchema->name, pField->name); - - pSchema++; - } - - for(int32_t i = 0; i < numOfTags; ++i) { - SField* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i); - pSchema->type = pField->type; - pSchema->bytes = htonl(pField->bytes); - strcpy(pSchema->name, pField->name); - - pSchema++; - } - - pMsg = (char*)pSchema; - - int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg); - *len = msgLen; + int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq); + void* req = malloc(tlen); + if (req == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - return pCreateStbMsg; + void* buf = req; + tSerializeSMCreateStbReq(&buf, &createReq); + *len = tlen; + return req; } -SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { +char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0); - SName name = {0}; + SName name = {0}; int32_t code = createSName(&name, tableName, pParseCtx, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { terrno = buildInvalidOperationMsg(pMsgBuf, "invalid table name"); return NULL; } - SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq)); + SMDropStbReq dropReq = {0}; + code = tNameExtractFullName(&name, dropReq.name); - code = tNameExtractFullName(&name, pDropTableMsg->name); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); + dropReq.igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; + + int32_t tlen = tSerializeSMDropStbReq(NULL, &dropReq); + void* req = malloc(tlen); + if (req == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; - *len = sizeof(SMDropStbReq); - return pDropTableMsg; + void* buf = req; + tSerializeSMDropStbReq(&buf, &dropReq); + *len = tlen; + return req; } SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 50ae3bfd262812be8cec8e2d33d467994875a0f1..4b70d2c6c381d777c7475c4f638d60db383482f2 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -924,13 +924,13 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch goto _error; } - pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); + pDcl->pMsg = buildCreateStbReq(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = TDMT_MND_CREATE_STB; break; } case TSDB_SQL_DROP_TABLE: { - pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); + pDcl->pMsg = buildDropStableReq(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { goto _error; } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a6040a3873cc0a4cb65c9407c6bb3ac595dcb3f7..5fc937bccdc5307bcc70bdb5a9dc6ef15a687426 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->parent = pInit->parent; return pRpc; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3d93049c6a7eb96d0527502fda56b85d03ace10c..f5eeae26e68bf462f8aef9d5affb1096cd6bc945 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; if (pCtx->pSem == NULL) { - tDebug("conn %p handle resp", conn); - (pRpc->cfp)(NULL, &rpcMsg, NULL); + tDebug("client conn %p handle resp, ", conn); + (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); } else { - tDebug("conn %p handle resp", conn); + tDebug("client conn(sync) %p handle resp", conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } @@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - tDebug("conn %p start to destroy", pConn); + tDebug("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; destroyUserdata(&pMsg->msg); @@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; if (pCtx->pSem == NULL) { // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); + (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); } else { memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); // SRpcMsg rpcMsg @@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tDebug("timeout, try to remove expire conn from conn pool"); + tDebug("client conn timeout, try to remove expire conn from conn pool"); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); + tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf pBuf->len += nread; if (clientReadComplete(pBuf)) { uv_read_stop((uv_stream_t*)conn->stream); - tDebug("conn %p read complete", conn); + tDebug("client conn %p read complete", conn); clientHandleResp(conn); } else { - tDebug("conn %p read partial packet, continue to read", conn); + tDebug("client conn %p read partial packet, continue to read", conn); } return; } @@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0 || nread == UV_EOF) { - tError("conn %p read error: %s", conn, uv_err_name(nread)); + tError("client conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } // tDebug("Read error %s\n", uv_err_name(nread)); @@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tDebug("conn %p remove from conn pool", conn); + tDebug("client conn %p remove from conn pool", conn); QUEUE_REMOVE(&conn->conn); - tDebug("conn %p remove from conn pool successfully", conn); + tDebug("client conn %p remove from conn pool successfully", conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); } @@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) { free(conn->stream); free(conn->writeReq); - tDebug("conn %p destroy successfully", conn); + tDebug("client conn %p destroy successfully", conn); free(conn); // clientConnDestroy(conn, false); @@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tDebug("conn %p data already was written out", pConn); + tDebug("client conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) { } destroyUserdata(&pMsg->msg); } else { - tError("conn %p failed to write: %s", pConn, uv_err_name(status)); + tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } @@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen); + tDebug("client conn %p data write out, msgType : %s, len: %d", pConn, TMSG_INFO(pHead->msgType), msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliConn* pConn = req->data; if (status != 0) { // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("conn %p failed to connect server: %s", pConn, uv_strerror(status)); + tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status)); clientHandleExcept(pConn); return; } - tDebug("conn %p create", pConn); + tDebug("client conn %p create", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tDebug("msg tran time cost: %" PRIu64 "", el); + tDebug("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later - tDebug("conn %p get from conn pool", conn); + tDebug("client get conn %p from pool", conn); conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 4d2ac434dde79a1749770e79fa3f20533090ab9b..f3cfd254089d33460287194be7d5c2f56d620711 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; + tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); + tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); if (readComplete(pBuf)) { tDebug("conn %p alread read complete packet", conn); uvHandleReq(conn); @@ -717,6 +718,9 @@ void taosCloseServer(void* arg) { } void rpcSendResponse(const SRpcMsg* pMsg) { + if (pMsg->handle == NULL) { + return; + } SSrvConn* pConn = pMsg->handle; SWorkThrdObj* pThrd = pConn->hostThrd; diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 09aeff5ff3f749ab7292700ccd42f9be7c6e4dc8..3e6f97c0c3bcdc30d39f08dd469babd3199a1fa9 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -204,7 +204,12 @@ void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } -size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; } +size_t taosArrayGetSize(const SArray* pArray) { + if (pArray == NULL) { + return 0; + } + return pArray->size; +} void taosArraySetSize(SArray* pArray, size_t size) { assert(size <= pArray->capacity); @@ -296,7 +301,7 @@ SArray* taosArrayDup(const SArray* pSrc) { } void taosArrayClear(SArray* pArray) { - assert( pArray != NULL ); + if (pArray == NULL) return; pArray->size = 0; }