diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8009f8999e52029ab92883f754a0fa3b53efcf08..fdf64b7af2bcaf19d8c08c4e700c13beef10078d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -133,36 +133,36 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -typedef struct SKv { +typedef struct { int32_t keyLen; int32_t valueLen; void* key; void* value; } SKv; -typedef struct SClientHbKey { +typedef struct { int32_t connId; int32_t hbType; } SClientHbKey; -typedef struct SClientHbReq { +typedef struct { SClientHbKey connKey; SHashObj* info; // hash } SClientHbReq; -typedef struct SClientHbBatchReq { +typedef struct { int64_t reqId; SArray* reqs; // SArray } SClientHbBatchReq; -typedef struct SClientHbRsp { +typedef struct { SClientHbKey connKey; int32_t status; int32_t bodyLen; void* body; } SClientHbRsp; -typedef struct SClientHbBatchRsp { +typedef struct { int64_t reqId; int64_t rspId; SArray* rsps; // SArray @@ -220,13 +220,13 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) return buf; } -typedef struct SBuildTableMetaInput { +typedef struct { int32_t vgId; char* dbName; char* tableFullName; } SBuildTableMetaInput; -typedef struct SBuildUseDBInput { +typedef struct { char db[TSDB_TABLE_FNAME_LEN]; int32_t vgVersion; } SBuildUseDBInput; @@ -234,17 +234,12 @@ typedef struct SBuildUseDBInput { #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta -typedef struct { - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; -} SEpAddrMsg; - typedef struct { char fqdn[TSDB_FQDN_LEN]; uint16_t port; } SEpAddr; -typedef struct SMsgHead { +typedef struct { int32_t contLen; int32_t vgId; } SMsgHead; @@ -262,7 +257,7 @@ typedef struct SSubmitBlk { } SSubmitBlk; // Submit message for this TSDB -typedef struct SSubmitMsg { +typedef struct { SMsgHead header; int64_t version; int32_t length; @@ -301,7 +296,7 @@ typedef struct { int32_t failedRows; // number of failed records (exclude duplicate records) int32_t numOfFailedBlocks; SShellSubmitRspBlock failedBlocks[]; -} SShellSubmitRspMsg; +} SShellSubmitRsp; typedef struct SSchema { int8_t type; @@ -310,98 +305,24 @@ typedef struct SSchema { char name[TSDB_COL_NAME_LEN]; } SSchema; -typedef struct { - int32_t contLen; - int32_t vgId; - int8_t tableType; - int16_t numOfColumns; - int16_t numOfTags; - int32_t tid; - int32_t sversion; - int32_t tversion; - int32_t tagDataLen; - int32_t sqlDataLen; - uint64_t uid; - uint64_t superTableUid; - uint64_t createdTime; - char tableFname[TSDB_TABLE_FNAME_LEN]; - char stbFname[TSDB_TABLE_FNAME_LEN]; - char data[]; -} SMDCreateTableMsg; - -// typedef struct { -// int32_t len; // one create table message -// char tableName[TSDB_TABLE_FNAME_LEN]; -// int16_t numOfColumns; -// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string -// int8_t igExists; -// int8_t rspMeta; -// int8_t reserved[16]; -// char schema[]; -//} SCreateTableMsg; - -typedef struct { - char tableName[TSDB_TABLE_FNAME_LEN]; - int16_t numOfColumns; - int16_t numOfTags; - int8_t igExists; - int8_t rspMeta; - char schema[]; -} SCreateCTableMsg; - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; int32_t numOfTags; int32_t numOfColumns; SSchema pSchema[]; -} SCreateStbMsg, SCreateTableMsg; +} SMCreateStbReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; -} SDropStbMsg; +} SMDropStbReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; SSchema schema; -} SAlterStbMsg; - -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - uint64_t suid; -} SVDropStbReq; - -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - int8_t type; /* operation type */ - int32_t numOfCols; /* number of schema */ - int32_t numOfTags; - char data[]; -} SAlterTableMsg; - -typedef struct { - SMsgHead head; - char name[TSDB_TABLE_FNAME_LEN]; - int8_t ignoreNotExists; -} SDropTableMsg; - -typedef struct { - SMsgHead head; - int64_t uid; - int32_t tid; - int16_t tversion; - int16_t colId; - int8_t type; - int16_t bytes; - int32_t tagValLen; - int16_t numOfTags; - int32_t schemaLen; - char data[]; -} SUpdateTableTagValMsg; +} SMAlterStbReq; typedef struct { int32_t pid; @@ -470,28 +391,13 @@ typedef struct { } SCreateUserReq, SAlterUserReq; typedef struct { - int32_t contLen; - int32_t vgId; - int32_t tid; - uint64_t uid; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SMDDropTableMsg; - -typedef struct { - int32_t contLen; - int32_t vgId; - uint64_t uid; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SDropSTableMsg; - -typedef struct SColIndex { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t flag; // denote if it is a tag or a normal column char name[TSDB_DB_FNAME_LEN]; } SColIndex; -typedef struct SColumnFilterInfo { +typedef struct { int16_t lowerRelOptr; int16_t upperRelOptr; int16_t filterstr; // denote if current column is char(binary/nchar) @@ -512,7 +418,7 @@ typedef struct SColumnFilterInfo { }; } SColumnFilterInfo; -typedef struct SColumnFilterList { +typedef struct { int16_t numOfFilters; union { int64_t placeholder; @@ -523,14 +429,14 @@ typedef struct SColumnFilterList { * for client side struct, we only need the column id, type, bytes are not necessary * But for data in vnode side, we need all the following information. */ -typedef struct SColumnInfo { +typedef struct { int16_t colId; int16_t type; int16_t bytes; SColumnFilterList flist; } SColumnInfo; -typedef struct STableIdInfo { +typedef struct { uint64_t uid; TSKEY key; // last accessed ts, for subscription } STableIdInfo; @@ -547,7 +453,7 @@ typedef struct { int32_t tsOrder; // ts comp block order } STsBufInfo; -typedef struct SInterval { +typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; @@ -606,7 +512,7 @@ typedef struct { int32_t udfContentOffset; int32_t udfContentLen; SColumnInfo tableCols[]; -} SQueryTableMsg; +} SQueryTableReq; typedef struct { int32_t code; @@ -623,7 +529,7 @@ typedef struct { int8_t free; } SRetrieveTableReq; -typedef struct SRetrieveTableRsp { +typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; @@ -655,7 +561,7 @@ typedef struct { int8_t update; int8_t cacheLastRow; int8_t ignoreExist; -} SCreateDbMsg; +} SCreateDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; @@ -768,7 +674,7 @@ typedef struct { typedef struct { int32_t reserved; -} STransMsg; +} STransReq; typedef struct { int32_t dnodeId; @@ -843,7 +749,7 @@ typedef struct { SMsgHead header; char dbFname[TSDB_DB_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN]; -} STableInfoMsg; +} STableInfoReq; typedef struct { int8_t metaClone; // create local clone of the cached table meta @@ -851,7 +757,7 @@ typedef struct { int32_t numOfTables; int32_t numOfUdfs; char tableNames[]; -} SMultiTableInfoMsg; +} SMultiTableInfoReq; typedef struct SVgroupInfo { int32_t vgId; @@ -859,19 +765,19 @@ typedef struct SVgroupInfo { uint32_t hashEnd; int8_t inUse; int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SVgroupInfo; typedef struct { int32_t vgId; int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SVgroupMsg; typedef struct { int32_t numOfVgroups; SVgroupMsg vgroups[]; -} SVgroupsMsg, SVgroupsInfo; +} SVgroupsInfo; typedef struct { char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name @@ -888,9 +794,9 @@ typedef struct { uint64_t tuid; int32_t vgId; SSchema pSchema[]; -} STableMetaMsg; +} STableMetaRsp; -typedef struct SMultiTableMeta { +typedef struct { int32_t numOfTables; int32_t numOfVgroup; int32_t numOfUdf; @@ -934,9 +840,9 @@ typedef struct { int32_t vgid[]; } SCompactReq; -typedef struct SShowRsp { +typedef struct { int64_t showId; - STableMetaMsg tableMeta; + STableMetaRsp tableMeta; } SShowRsp; typedef struct { @@ -975,17 +881,6 @@ typedef struct { int32_t dnodeId; } SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq; -typedef struct { - int32_t dnodeId; - int32_t vgId; - int32_t tid; -} SConfigTableMsg; - -typedef struct { - int32_t dnodeId; - int32_t vgId; -} SConfigVnodeMsg; - typedef struct { char sql[TSDB_SHOW_SQL_LEN]; int32_t queryId; @@ -1083,46 +978,6 @@ typedef struct { } SSubmitReqReader; typedef struct { - /* data */ -} SCreateTableReq; - -typedef struct { - /* data */ -} SCreateTableRsp; - -typedef struct { - /* data */ -} SDropTableReq; - -typedef struct { - /* data */ -} SDropTableRsp; - -typedef struct { - /* data */ -} SAlterTableReq; - -typedef struct { - /* data */ -} SAlterTableRsp; - -typedef struct { - /* data */ -} SDropStableReq; - -typedef struct { - /* data */ -} SDropStableRsp; - -typedef struct { - /* data */ -} SUpdateTagValReq; - -typedef struct { - /* data */ -} SUpdateTagValRsp; - -typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; @@ -1131,59 +986,59 @@ typedef struct SSubQueryMsg { char msg[]; } SSubQueryMsg; -typedef struct SResReadyMsg { +typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; -} SResReadyMsg; +} SResReadyReq; -typedef struct SResReadyRsp { +typedef struct { int32_t code; } SResReadyRsp; -typedef struct SResFetchMsg { +typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; -} SResFetchMsg; +} SResFetchReq; -typedef struct SSchTasksStatusMsg { +typedef struct { SMsgHead header; uint64_t sId; -} SSchTasksStatusMsg; +} SSchTasksStatusReq; -typedef struct STaskStatus { +typedef struct { uint64_t queryId; uint64_t taskId; int8_t status; } STaskStatus; -typedef struct SSchedulerStatusRsp { +typedef struct { uint32_t num; STaskStatus status[]; } SSchedulerStatusRsp; -typedef struct STaskCancelMsg { +typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; -} STaskCancelMsg; +} STaskCancelReq; -typedef struct STaskCancelRsp { +typedef struct { int32_t code; } STaskCancelRsp; -typedef struct STaskDropMsg { +typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; -} STaskDropMsg; +} STaskDropReq; -typedef struct STaskDropRsp { +typedef struct { int32_t code; } STaskDropRsp; @@ -1334,18 +1189,18 @@ typedef struct { void* executor; int32_t sqlLen; char* sql; -} SCreateTopicMsg; +} SCreateTopicReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; -} SDropTopicMsg; +} SDropTopicReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; SSchema schema; -} SAlterTopicMsg; +} SAlterTopicReq; typedef struct { SMsgHead head; @@ -1356,13 +1211,13 @@ typedef struct { char* executor; int32_t sqlLen; char* sql; -} SDCreateTopicMsg; +} SDCreateTopicReq; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t tuid; -} SDDropTopicMsg; +} SDDropTopicReq; typedef struct SVCreateTbReq { uint64_t ver; // use a general definition @@ -1402,24 +1257,63 @@ void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); -typedef struct SVCreateTbRsp { +typedef struct { + SMsgHead head; } SVCreateTbRsp; -typedef struct SVShowTablesReq { +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + int8_t ignoreNotExists; +} SVAlterTbReq; + +typedef struct { + SMsgHead head; +} SVAlterTbRsp; + +typedef struct { + SMsgHead head; + char name[TSDB_TABLE_FNAME_LEN]; + int8_t ignoreNotExists; +} SVDropTbReq; + +typedef struct { + SMsgHead head; +} SVDropTbRsp; + +typedef struct { + SMsgHead head; + int64_t uid; + int32_t tid; + int16_t tversion; + int16_t colId; + int8_t type; + int16_t bytes; + int32_t tagValLen; + int16_t numOfTags; + int32_t schemaLen; + char data[]; +} SUpdateTagValReq; + +typedef struct { + SMsgHead head; +} SUpdateTagValRsp; + +typedef struct { SMsgHead head; } SVShowTablesReq; -typedef struct SVShowTablesRsp { +typedef struct { int64_t id; - STableMetaMsg metaInfo; + STableMetaRsp metaInfo; } SVShowTablesRsp; -typedef struct SVShowTablesFetchReq { +typedef struct { SMsgHead head; int32_t id; } SVShowTablesFetchReq; -typedef struct SVShowTablesFetchRsp { +typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index c5da68f0a63eba0314a887767a917b02b4a3f390..da70f2149896e68310b0417ed1fbf9facc57e163 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -114,7 +114,7 @@ typedef struct SProjectPhyNode { typedef struct SExchangePhyNode { SPhyNode node; uint64_t srcTemplateId; // template id of datasource suplans - SArray *pSrcEndPoints; // SEpAddrMsg, scheduler fill by calling qSetSuplanExecutionNode + SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhyNode; typedef struct SSubplanId { diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index e28098dfbf08124ff58f81ee6c30843e739bca69..25e295f980e3de14cfedff8c480fdfb5be62cf41 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -101,11 +101,11 @@ typedef struct SMsgSendInfo { SDataBuf msgInfo; } SMsgSendInfo; -typedef struct SQueryNodeAddr{ - int32_t nodeId; //vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +typedef struct SQueryNodeAddr { + int32_t nodeId; // vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddr epAddr[TSDB_MAX_REPLICA]; } SQueryNodeAddr; bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index fb1b97104956090de4cece3fd2c53a25c8dcf281..85f3fb06a786244f10dd0a4231e3198fd87f27a5 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -130,7 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) { SShowRsp* pShow = (SShowRsp *)pMsg->pData; pShow->showId = htobe64(pShow->showId); - STableMetaMsg *pMetaMsg = &(pShow->tableMeta); + STableMetaRsp *pMetaMsg = &(pShow->tableMeta); pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); SSchema* pSchema = pMetaMsg->pSchema; diff --git a/source/dnode/mgmt/impl/test/sut/inc/sut.h b/source/dnode/mgmt/impl/test/sut/inc/sut.h index fd9c1cd93ee609bc7f495b8f67408b0580bdefec..9f724faeb9a8d985a6c52ce1ba396634927b0ed9 100644 --- a/source/dnode/mgmt/impl/test/sut/inc/sut.h +++ b/source/dnode/mgmt/impl/test/sut/inc/sut.h @@ -53,7 +53,7 @@ class Testbase { void SendShowMetaReq(int8_t showType, const char* db); void SendShowRetrieveReq(); - STableMetaMsg* GetShowMeta(); + STableMetaRsp* GetShowMeta(); SRetrieveTableRsp* GetRetrieveRsp(); int32_t GetMetaNum(); @@ -74,7 +74,7 @@ class Testbase { private: int64_t showId; - STableMetaMsg* pMeta; + STableMetaRsp* pMeta; SRetrieveTableRsp* pRetrieveRsp; char* pData; int32_t pos; diff --git a/source/dnode/mgmt/impl/test/sut/src/sut.cpp b/source/dnode/mgmt/impl/test/sut/src/sut.cpp index 65c7d6725449b765ab746373474dc711660c1955..ba530dbdd00f17ed47526c48ee5dfdffbcd1ead8 100644 --- a/source/dnode/mgmt/impl/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/sut.cpp @@ -179,6 +179,6 @@ const char* Testbase::GetShowBinary(int32_t len) { int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; } -STableMetaMsg* Testbase::GetShowMeta() { return pMeta; } +STableMetaRsp* Testbase::GetShowMeta() { return pMeta; } SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 89fb2e318929d050cd3c58346edd058211798e68..d2107b9d07cdff201c4a0f39cb97d25430206205 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -32,7 +32,7 @@ extern "C" { typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); -typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index dbe925992defd419178f1d0628e78671e6a89c45..189cbfea6d725a207a790ef0f29805bf09e10873 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -31,7 +31,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq); static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq); static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter); @@ -391,7 +391,7 @@ static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 92b9017627689d8167888018f90115a74241a722..5f806723696fb752542d91d6d19ab852e83b8295 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -26,7 +26,7 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster); static int32_t mndCreateDefaultCluster(SMnode *pMnode); -static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter); @@ -163,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchema; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 828eb4a5b714dcd14de3045d09e85c630da86cc6..54e640d8b791bea1a64c253049c4b17bc3d913f5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -39,7 +39,7 @@ static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); @@ -395,7 +395,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + STableInfoReq *pInfo = pMsg->rpcMsg.pCont; mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); @@ -417,9 +417,9 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { taosRLockLatch(&pConsumer->lock); int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; - int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - STableMetaMsg *pMeta = rpcMallocCont(contLen); + STableMetaRsp *pMeta = rpcMallocCont(contLen); if (pMeta == NULL) { taosRUnLockLatch(&pConsumer->lock); mndReleaseDb(pMnode, pDb); @@ -483,7 +483,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO return 0; } -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c7360471e02a028d61eaecb5054ce0dbb8b472f6..f2f8931aa1dc534cbae5e92714be8f146de583b9 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -35,7 +35,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq); static int32_t mndProcessUseDbReq(SMnodeMsg *pReq); static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq); static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq); -static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); @@ -378,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) { +static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) { SDbObj dbObj = {0}; memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN); memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); @@ -449,7 +449,7 @@ CREATE_DB_OVER: static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; - SCreateDbMsg *pCreate = pReq->rpcMsg.pCont; + SCreateDbReq *pCreate = pReq->rpcMsg.pCont; pCreate->numOfVgroups = htonl(pCreate->numOfVgroups); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -858,7 +858,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { pInfo->numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEpAddrMsg *pEpArrr = &pInfo->epAddr[gid]; + SEpAddr *pEpArrr = &pInfo->epAddr[gid]; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode != NULL) { memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); @@ -915,7 +915,7 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) { return 0; } -static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1b62a47c917f77014e342b27360390f1b6f66f85..4bc570c11d5fb2d46c8f18c79eb8b513e3a5c230 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -53,10 +53,10 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq); static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessStatusReq(SMnodeMsg *pReq); -static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); -static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); @@ -582,7 +582,7 @@ static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) { mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle); } -static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchema; @@ -656,7 +656,7 @@ static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {} -static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index c22c7b5885c6a0153e9994c2fd037cc5ea78d7b2..d406247bc1e345a5f767dd7d0651b6523749aad5 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -31,7 +31,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc); static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg); static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg); -static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter); @@ -395,7 +395,7 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 2c09b298725ea673c24615109ac462186bb10569..75ed5b0a1ea2fda3ed49d91e8ced97985216880e 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -33,7 +33,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq); static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); @@ -579,7 +579,7 @@ static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fcc2eec0288e77bbc097665437aea45057d460e9..fced3facbec8443552b3ec5f90803f23a0da7f7b 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -51,9 +51,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq); static int32_t mndProcessConnectReq(SMnodeMsg *pReq); static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq); static int32_t mndProcessKillConnReq(SMnodeMsg *pReq); -static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); -static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); @@ -389,7 +389,7 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) { } } -static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -518,7 +518,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in return numOfRows; } -static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 88cade08eddd6153ca8d904d83ff6ca5d9a06c8e..6077a95a7b21f88972c9869c9f2cf3043d8b00ba 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -31,7 +31,7 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq); static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq); static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); @@ -391,7 +391,7 @@ static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index dabd1f0142716691e9f8ed1f02b74164708b6018..7688ea76abd2f3cec2ec975cb5727391ef93e848 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -31,7 +31,7 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq); static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq); static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pRsp); static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveSnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter); @@ -393,7 +393,7 @@ static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b01c291ce5b64699e4ba1229bd7becce6cb0755a..7d77e29d7472b26ca1976a3f074041d1e8acd607 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -32,14 +32,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb); -static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg); -static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg); +static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg); +static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg); +static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg); static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -52,9 +52,9 @@ int32_t mndInitStb(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndStbActionUpdate, .deleteFp = (SdbDeleteFp)mndStbActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbMsg); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbMsg); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbMsg); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp); @@ -264,10 +264,10 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb return buf; } -static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { - int32_t contLen = sizeof(SVDropStbReq); +static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) { + int32_t contLen = sizeof(SVDropTbReq); - SVDropStbReq *pDrop = calloc(1, contLen); + SVDropTbReq *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -276,12 +276,12 @@ static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj pDrop->head.contLen = htonl(contLen); pDrop->head.vgId = htonl(pVgroup->vgId); memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN); - pDrop->suid = htobe64(pStb->uid); + // pDrop->suid = htobe64(pStb->uid); return pDrop; } -static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) { +static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) { pCreate->numOfColumns = htonl(pCreate->numOfColumns); pCreate->numOfTags = htonl(pCreate->numOfTags); int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags; @@ -398,7 +398,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj if (pIter == NULL) break; if (pVgroup->dbUid != pDb->uid) continue; - SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); + SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb); if (pMsg == NULL) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); @@ -409,7 +409,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pMsg; - action.contLen = sizeof(SVDropStbReq); + action.contLen = sizeof(SVDropTbReq); action.msgType = TDMT_VND_DROP_STB; if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); @@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) { +static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -494,9 +494,9 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { +static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont; + SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to create", pCreate->name); @@ -551,7 +551,7 @@ static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { return 0; } -static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { +static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) { SSchema *pSchema = &pAlter->schema; pSchema->colId = htonl(pSchema->colId); pSchema->bytes = htonl(pSchema->bytes); @@ -578,9 +578,9 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; } -static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { +static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont; + SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to alter", pAlter->name); @@ -692,9 +692,9 @@ DROP_STB_OVER: return 0; } -static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { +static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SDropStbMsg *pDrop = pMsg->rpcMsg.pCont; + SMDropStbReq *pDrop = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to drop", pDrop->name); @@ -729,7 +729,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + STableInfoReq *pInfo = pMsg->rpcMsg.pCont; mDebug("stb:%s, start to retrieve meta", pInfo->tableFname); @@ -750,9 +750,9 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { taosRLockLatch(&pStb->lock); int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; - int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - STableMetaMsg *pMeta = rpcMallocCont(contLen); + STableMetaRsp *pMeta = rpcMallocCont(contLen); if (pMeta == NULL) { taosRUnLockLatch(&pStb->lock); mndReleaseDb(pMnode, pDb); @@ -818,7 +818,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs return 0; } -static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 57b7c16c39fd968f4e713926a2dfd4c032b41f10..acdc718f20c3176313bbed172daffc55f7892bba 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,7 +35,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -186,10 +186,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) { return mndAcquireDb(pMnode, db); } -static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { - int32_t contLen = sizeof(SDDropTopicMsg); +static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) { + int32_t contLen = sizeof(SDDropTopicReq); - SDDropTopicMsg *pDrop = calloc(1, contLen); + SDDropTopicReq *pDrop = calloc(1, contLen); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -274,7 +274,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SDropTopicMsg *pDrop = pMsg->rpcMsg.pCont; + SDropTopicReq *pDrop = pMsg->rpcMsg.pCont; mDebug("topic:%s, start to drop", pDrop->name); @@ -309,7 +309,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - STableInfoMsg *pInfo = pMsg->rpcMsg.pCont; + STableInfoReq *pInfo = pMsg->rpcMsg.pCont; mDebug("topic:%s, start to retrieve meta", pInfo->tableFname); @@ -331,9 +331,9 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { taosRLockLatch(&pTopic->lock); int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags; - int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema); + int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); - STableMetaMsg *pMeta = rpcMallocCont(contLen); + STableMetaRsp *pMeta = rpcMallocCont(contLen); if (pMeta == NULL) { taosRUnLockLatch(&pTopic->lock); mndReleaseDb(pMnode, pDb); @@ -397,7 +397,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 8c153982157d3130780708425cf766b1a2c026bc..ad378953eb669a583c6bdd1aea3ad317211f2267 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -32,7 +32,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq); static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq); static int32_t mndProcessDropUserReq(SMnodeMsg *pReq); -static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveUsers(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); @@ -432,7 +432,7 @@ static int32_t mndProcessDropUserReq(SMnodeMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { +static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index c5c58a075dfc829597ed9a9f7fe1e2e44e92da3f..4b7b37037192d31c75aed47edc33ccd5e36ca3b4 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -27,19 +27,19 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); +static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp); +static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); -static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); -static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); -static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { @@ -164,14 +164,14 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) { return 0; } -static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { - mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup); - pOldVgroup->updateTime = pNewVgroup->updateTime; - pOldVgroup->version = pNewVgroup->version; - pOldVgroup->hashBegin = pNewVgroup->hashBegin; - pOldVgroup->hashEnd = pNewVgroup->hashEnd; - pOldVgroup->replica = pNewVgroup->replica; - memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid)); +static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { + mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew); + pOld->updateTime = pNew->updateTime; + pOld->version = pNew->version; + pOld->hashBegin = pNew->hashBegin; + pOld->hashEnd = pNew->hashEnd; + pOld->replica = pNew->replica; + memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid)); return 0; } @@ -427,24 +427,24 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { return epset; } -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp) { return 0; } static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; @@ -475,8 +475,8 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return 0; } -static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) { @@ -526,8 +526,8 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg return 0; } -static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SVgObj *pVgroup = NULL; @@ -593,8 +593,8 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { return numOfVnodes; } -static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; @@ -633,8 +633,8 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * return 0; } -static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SVgObj *pVgroup = NULL; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 7c740df06dfe1d1fd1a1f4a8d42df438dbe4fa4d..d70c93e7585431924345dd29ac2aea59fd254bc2 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -61,8 +61,8 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) { static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { - STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg)); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)}; + STransReq *pMsg = rpcMallocCont(sizeof(STransReq)); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 1a089e29321e2c415485a93caa7a3b5a3a57b9f5..4f0ba9b0e7d278fc1562ae2c5d9e2cfa14118bb2 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -64,9 +64,9 @@ TEST_F(MndTestDb, 01_ShowDb) { TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { { - int32_t contLen = sizeof(SCreateDbMsg); + int32_t contLen = sizeof(SCreateDbReq); - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen); + SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen); strcpy(pReq->db, "1.d1"); pReq->numOfVgroups = htonl(2); pReq->cacheBlockSize = htonl(16); @@ -224,9 +224,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { { - int32_t contLen = sizeof(SCreateDbMsg); + int32_t contLen = sizeof(SCreateDbReq); - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen); + SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen); strcpy(pReq->db, "1.d2"); pReq->numOfVgroups = htonl(2); pReq->cacheBlockSize = htonl(16); @@ -290,7 +290,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); EXPECT_EQ(pInfo->inUse, 0); EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddrMsg* pAddr = &pInfo->epAddr[0]; + SEpAddr* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); @@ -306,7 +306,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); EXPECT_EQ(pInfo->inUse, 0); EXPECT_EQ(pInfo->numOfEps, 1); - SEpAddrMsg* pAddr = &pInfo->epAddr[0]; + SEpAddr* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); diff --git a/source/dnode/mnode/impl/test/show/show.cpp b/source/dnode/mnode/impl/test/show/show.cpp index eabcbc7eb449d76133890812c4095430961879d5..bfdc0f42b638fe1b2ead423d0c922b4c222c609d 100644 --- a/source/dnode/mnode/impl/test/show/show.cpp +++ b/source/dnode/mnode/impl/test/show/show.cpp @@ -63,7 +63,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) { test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, ""); - STableMetaMsg* pMeta = test.GetShowMeta(); + STableMetaRsp* pMeta = test.GetShowMeta(); EXPECT_STREQ(pMeta->tbFname, "show connections"); EXPECT_EQ(pMeta->numOfTags, 0); EXPECT_EQ(pMeta->numOfColumns, 7); diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 55cc0301229465b64282feba41643f13b90b398d..beb52d68c1349a2a987805ddf2a87e8fa1eac4ea 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -27,9 +27,9 @@ Testbase MndTestStb::test; TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { { - int32_t contLen = sizeof(SCreateDbMsg); + int32_t contLen = sizeof(SCreateDbReq); - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen); + SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen); strcpy(pReq->db, "1.d1"); pReq->numOfVgroups = htonl(2); pReq->cacheBlockSize = htonl(16); @@ -59,9 +59,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { { int32_t cols = 2; int32_t tags = 3; - int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg); + int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq); - SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(contLen); + SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen); strcpy(pReq->name, "1.d1.stb"); pReq->numOfTags = htonl(tags); pReq->numOfColumns = htonl(cols); @@ -123,16 +123,16 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { // ----- meta ------ { - int32_t contLen = sizeof(STableInfoMsg); + int32_t contLen = sizeof(STableInfoReq); - STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen); + STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen); strcpy(pReq->tableFname, "1.d1.stb"); SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont; + STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont; pRsp->numOfTags = htonl(pRsp->numOfTags); pRsp->numOfColumns = htonl(pRsp->numOfColumns); pRsp->sversion = htonl(pRsp->sversion); @@ -214,9 +214,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { CheckInt32(3); { - int32_t contLen = sizeof(SDropStbMsg); + int32_t contLen = sizeof(SMDropStbReq); - SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(contLen); + SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen); strcpy(pReq->name, "1.d1.stb"); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen); diff --git a/source/dnode/vnode/impl/inc/vnodeRequest.h b/source/dnode/vnode/impl/inc/vnodeRequest.h index 93b4589bad78153a3853a5d6c759a639d5f5540e..52f4281eeac470a923a4bc75293dfe1d35ab7d46 100644 --- a/source/dnode/vnode/impl/inc/vnodeRequest.h +++ b/source/dnode/vnode/impl/inc/vnodeRequest.h @@ -22,9 +22,9 @@ extern "C" { #endif -// SVDropTableReq -// int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq); -// void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq); +// SVDropTbReq +// int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq); +// void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index a32ee50df530940383649cb35e0c2345f8c06f5a..29b6984937e3171c2b6447d63e4ff3017139bca6 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -53,14 +53,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont); + STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont); STbCfg * pTbCfg = NULL; STbCfg * pStbCfg = NULL; tb_uid_t uid; int32_t nCols; int32_t nTagCols; SSchemaWrapper *pSW; - STableMetaMsg * pTbMetaMsg = NULL; + STableMetaRsp * pTbMetaMsg = NULL; SSchema * pTagSchema; SRpcMsg rpcMsg; int msgLen = 0; @@ -94,8 +94,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTagSchema = NULL; } - msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); - pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); + msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols); + pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { goto _exit; } @@ -167,7 +167,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { // SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen); - memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen); + memset(pFetchRsp, 0, sizeof(SVShowTablesFetchRsp) + payloadLen); char *p = pFetchRsp->data; for (int32_t i = 0; i < numOfTables; ++i) { diff --git a/source/dnode/vnode/impl/src/vnodeRequest.c b/source/dnode/vnode/impl/src/vnodeRequest.c index 4b481bf3995af03e272253213b214ae12476fe30..5367c9e0910ec8dc0842955c6ab751195edf3868 100644 --- a/source/dnode/vnode/impl/src/vnodeRequest.c +++ b/source/dnode/vnode/impl/src/vnodeRequest.c @@ -108,12 +108,12 @@ static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) { return buf; } -int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) { +int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq) { // TODO return 0; } -void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) { +void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq) { // TODO } #endif \ No newline at end of file diff --git a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h index 00ae8ab9581327a5312e7856c4dc9d7a43a19a74..d2e3d488d00905f49e56999254414c20975c0d2a 100644 --- a/source/dnode/vnode/tsdb/inc/tsdbMemTable.h +++ b/source/dnode/vnode/tsdb/inc/tsdbMemTable.h @@ -54,7 +54,7 @@ typedef struct STsdbMemTable { STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb); void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable); -int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp); int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); diff --git a/source/dnode/vnode/tsdb/src/tsdbMemTable.c b/source/dnode/vnode/tsdb/src/tsdbMemTable.c index 4d0f436f49570915bc0cf6dd6c808f3688861b57..b16b3581df448a93427b3092946a54d84d762a53 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb/src/tsdbMemTable.c @@ -73,7 +73,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { } } -int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { +int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp) { SSubmitBlk * pBlock = NULL; SSubmitMsgIter msgIter = {0}; int32_t affectedrows = 0, numOfRows = 0; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 49e3ef532fb193166bc3886ad252661393eeb314..87a3000d093cecaf97731198bc0df1f3539bae5b 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -68,7 +68,7 @@ char *ctgTestSTablename = "stable1"; void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); + SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(sizeof(SCreateDbReq)); strcpy(pReq->db, "1.db1"); pReq->numOfVgroups = htonl(2); pReq->cacheBlockSize = htonl(16); @@ -92,7 +92,7 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SCreateDbMsg); + rpcMsg.contLen = sizeof(SCreateDbReq); rpcMsg.msgType = TDMT_MND_CREATE_DB; SRpcMsg rpcRsp = {0}; @@ -200,7 +200,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) { vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1; vgInfo.inUse = i % vgInfo.numOfEps; for (int32_t n = 0; n < vgInfo.numOfEps; ++n) { - SEpAddrMsg *addr = &vgInfo.epAddr[n]; + SEpAddr *addr = &vgInfo.epAddr[n]; strcpy(addr->fqdn, "a0"); addr->port = htons(n + 22); } @@ -234,7 +234,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM vg->numOfEps = i % TSDB_MAX_REPLICA + 1; vg->inUse = i % vg->numOfEps; for (int32_t n = 0; n < vg->numOfEps; ++n) { - SEpAddrMsg *addr = &vg->epAddr[n]; + SEpAddr *addr = &vg->epAddr[n]; strcpy(addr->fqdn, "a0"); addr->port = htons(n + 22); } @@ -249,12 +249,12 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - STableMetaMsg *rspMsg = NULL; //todo + STableMetaRsp *rspMsg = NULL; //todo pRsp->code =0; - pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->pCont = calloc(1, pRsp->contLen); - rspMsg = (STableMetaMsg *)pRsp->pCont; + rspMsg = (STableMetaRsp *)pRsp->pCont; sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename); rspMsg->numOfTags = 0; rspMsg->numOfColumns = htonl(ctgTestColNum); @@ -285,12 +285,12 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - STableMetaMsg *rspMsg = NULL; //todo + STableMetaRsp *rspMsg = NULL; //todo pRsp->code =0; - pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->pCont = calloc(1, pRsp->contLen); - rspMsg = (STableMetaMsg *)pRsp->pCont; + rspMsg = (STableMetaRsp *)pRsp->pCont; sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename); sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); rspMsg->numOfTags = htonl(ctgTestTagNum); @@ -329,12 +329,12 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - STableMetaMsg *rspMsg = NULL; //todo + STableMetaRsp *rspMsg = NULL; //todo pRsp->code =0; - pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->pCont = calloc(1, pRsp->contLen); - rspMsg = (STableMetaMsg *)pRsp->pCont; + rspMsg = (STableMetaRsp *)pRsp->pCont; sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename); rspMsg->numOfTags = htonl(ctgTestTagNum); @@ -372,13 +372,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc } void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - STableMetaMsg *rspMsg = NULL; //todo + STableMetaRsp *rspMsg = NULL; //todo static int32_t idx = 1; pRsp->code =0; - pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); + pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema); pRsp->pCont = calloc(1, pRsp->contLen); - rspMsg = (STableMetaMsg *)pRsp->pCont; + rspMsg = (STableMetaRsp *)pRsp->pCont; sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx); rspMsg->numOfTags = htonl(ctgTestTagNum); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a268215d3dae476750b494904004fac2900bb9c8..74ee4637c10eda303c26025cc5da7e88f6e21b7f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -593,17 +593,17 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); void freeParam(STaskParam *param); -int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param); +int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param); int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, SSqlExpr **pExpr, SExprInfo *prevExpr, struct SUdfInfo *pUdfInfo); int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters); -SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); -SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, +SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code); +SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo); int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index b98b7fef5ce84b94be20e7b8518b86d725de3d1e..968380ea01435f53aaf3e89c949698042478c322 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -86,7 +86,7 @@ int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTas } if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) { - qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg); + qError("qmsg:%p, SQueryTableReq wrong format", pQueryMsg); code = TSDB_CODE_QRY_INVALID_MSG; goto _over; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1d2740f0e02e36c42749c83afaa8829b0c6b0b88..b00d37f828161486ba0074e023823dbe1401f220 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2458,7 +2458,7 @@ static void doUpdateLastKey(STaskAttr* pQueryAttr) { } } -static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) { +static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) { STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; // in case of point-interpolation query, use asc order scan @@ -7060,7 +7060,7 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SCol return j != INT32_MIN; } -static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) { +static bool validateQueryMsg(SQueryTableReq *pQueryMsg) { if (pQueryMsg->interval.interval < 0) { //qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval); return false; @@ -7126,7 +7126,7 @@ static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pEx return true; } -static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) { +static char *createTableIdList(SQueryTableReq *pQueryMsg, char *pMsg, SArray **pTableIdList) { assert(pQueryMsg->numOfTables > 0); *pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo)); @@ -7268,7 +7268,7 @@ int32_t parseTaskInfo(const char* msg, int32_t len) { * @param pExpr * @return */ -int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param) { +int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param) { int32_t code = TSDB_CODE_SUCCESS; // if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) { @@ -7843,7 +7843,7 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) { // todo refactor -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo *pUdfInfo) { // *pExprInfo = NULL; // int32_t code = TSDB_CODE_SUCCESS; @@ -7901,7 +7901,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu return TSDB_CODE_SUCCESS; } -SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) { +SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code) { if (pQueryMsg->numOfGroupCols == 0) { return NULL; } @@ -8069,7 +8069,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { return ((SQInfo *)qHandle)->qId == qId; } -SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, +SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) { int16_t numOfCols = pQueryMsg->numOfCols; diff --git a/source/libs/parser/inc/astToMsg.h b/source/libs/parser/inc/astToMsg.h index 1b7fe5ebc5aadd52a67c769241f272cff74ebcad..83683c2bfcb745acd5ca64faa613559378f1bed7 100644 --- a/source/libs/parser/inc/astToMsg.h +++ b/source/libs/parser/inc/astToMsg.h @@ -9,9 +9,9 @@ SCreateUserReq* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen); SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen); -SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); -SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); -SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); +SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf); +SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); +SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf); SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 1ae45556b456decc14e92a8840728efdb1f0d6ac..5f45ce824e55bc23d8169eba2624e22e29827b66 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -112,7 +112,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf, return pShowMsg; } -static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) { +static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) { const char* msg1 = "invalid number of keep options"; const char* msg2 = "invalid keep value"; const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2"; @@ -151,7 +151,7 @@ static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb, return TSDB_CODE_SUCCESS; } -static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) { +static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) { const char* msg = "invalid time precision"; pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default @@ -178,7 +178,7 @@ static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreate return TSDB_CODE_SUCCESS; } -static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) { +static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) { pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize); pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks); pMsg->daysPerFile = htonl(pCreateDb->daysPerFile); @@ -196,7 +196,7 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) { pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups); } -int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) { +int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) { doSetDbOptions(pCreateDbMsg, pCreateDbSql); if (setKeepOption(pCreateDbMsg, pCreateDbSql, pMsgBuf) != TSDB_CODE_SUCCESS) { @@ -210,8 +210,8 @@ int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbS return TSDB_CODE_SUCCESS; } -SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) { - SCreateDbMsg* pCreateMsg = calloc(1, sizeof(SCreateDbMsg)); +SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) { + SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq)); if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) { tfree(pCreateMsg); terrno = TSDB_CODE_TSC_INVALID_OPERATION; @@ -230,7 +230,7 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt return pCreateMsg; } -SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { +SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { SSchema* pSchema; int32_t numOfTags = 0; @@ -239,7 +239,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns); } - SCreateStbMsg* pCreateStbMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema)); + SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema)); char* pMsg = NULL; #if 0 @@ -315,7 +315,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, return pCreateStbMsg; } -SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { +SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) { SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0); SName name = {0}; @@ -325,13 +325,13 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p return NULL; } - SDropStbMsg *pDropTableMsg = (SDropStbMsg*) calloc(1, sizeof(SDropStbMsg)); + SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq)); code = tNameExtractFullName(&name, pDropTableMsg->name); assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T); pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0; - *len = sizeof(SDropStbMsg); + *len = sizeof(SMDropStbReq); return pDropTableMsg; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 947941b2cfaccf45bb85313a3549d3e812e20f85..6676e1ebf6e54e6651274f614c215ff517929d3f 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -117,7 +117,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou } // can only perform the parameters based on the macro definitation -static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { +static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) { char msg[512] = {0}; if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) { @@ -870,14 +870,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c goto _error; } - SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); + SCreateDbReq* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_INVALID_OPERATION; goto _error; } pDcl->pMsg = (char*)pCreateMsg; - pDcl->msgLen = sizeof(SCreateDbMsg); + pDcl->msgLen = sizeof(SCreateDbReq); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB; break; } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index d67ff956b9a3b46fdfce21cff72235105f3edd73..0e9b2dd76fe8377e22ec82d34ac0cc50b0784b09 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -557,7 +557,7 @@ static const char* jkEpAddrFqdn = "Fqdn"; static const char* jkEpAddrPort = "Port"; static bool epAddrToJson(const void* obj, cJSON* json) { - const SEpAddrMsg* ep = (const SEpAddrMsg*)obj; + const SEpAddr* ep = (const SEpAddr*)obj; bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn); if (res) { res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port); @@ -566,7 +566,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) { } static bool epAddrFromJson(const cJSON* json, void* obj) { - SEpAddrMsg* ep = (SEpAddrMsg*)obj; + SEpAddr* ep = (SEpAddr*)obj; copyString(json, jkEpAddrFqdn, ep->fqdn); ep->port = getNumber(json, jkEpAddrPort); return true; @@ -583,7 +583,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) { res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); } if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg)); + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr)); } return res; } @@ -593,7 +593,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) { ep->nodeId = getNumber(json, jkNodeAddrId); ep->inUse = getNumber(json, jkNodeAddrInUse); int32_t numOfEps = 0; - bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps); + bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps); ep->numOfEps = numOfEps; return res; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 16cabd648308924377db88a52b9f417cc15314c0..2cde28baf91b7f8408282fa4d8eea5a48e0880bb 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -29,7 +29,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input; - int32_t estimateSize = sizeof(STableInfoMsg); + int32_t estimateSize = sizeof(STableInfoReq); if (NULL == *msg || msgSize < estimateSize) { tfree(*msg); *msg = rpcMallocCont(estimateSize); @@ -38,7 +38,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 } } - STableInfoMsg *bMsg = (STableInfoMsg *)*msg; + STableInfoReq *bMsg = (STableInfoReq *)*msg; bMsg->header.vgId = htonl(bInput->vgId); @@ -146,7 +146,7 @@ _return: return code; } -static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { +static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) { pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags); pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); pMetaMsg->sversion = ntohl(pMetaMsg->sversion); @@ -198,7 +198,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) { return TSDB_CODE_SUCCESS; } -int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) { +int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta) { int32_t total = msg->numOfColumns + msg->numOfTags; int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; @@ -232,7 +232,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { - STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg; + STableMetaRsp *pMetaMsg = (STableMetaRsp *)msg; int32_t code = queryConvertTableMetaMsg(pMetaMsg); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 808c1e19f9d34f116336c246b7baadc26384821f..15e894fd612e421bd131409c572d7a750723d065 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1080,7 +1080,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ return TSDB_CODE_QRY_INVALID_INPUT; } - SResReadyMsg *msg = pMsg->pCont; + SResReadyReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1101,7 +1101,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } int32_t code = 0; - SSchTasksStatusMsg *msg = pMsg->pCont; + SSchTasksStatusReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1125,7 +1125,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } - SResFetchMsg *msg = pMsg->pCont; + SResFetchReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1157,7 +1157,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } int32_t code = 0; - STaskCancelMsg *msg = pMsg->pCont; + STaskCancelReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1182,7 +1182,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } int32_t code = 0; - STaskDropMsg *msg = pMsg->pCont; + STaskDropReq *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task drop msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 7bc1c4ff40f49024d3c5b69470f66740bf25c1ab..eaa79fd39ad7f8c69f5341bdf616e692f6d2a669 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -113,12 +113,12 @@ void *readyThread(void *param) { uint32_t n = 0; void *mockPointer = (void *)0x1; void *mgmt = param; - SResReadyMsg readyMsg = {0}; + SResReadyReq readyMsg = {0}; readyMsg.sId = htobe64(1); readyMsg.queryId = htobe64(1); readyMsg.taskId = htobe64(1); readyRpc.pCont = &readyMsg; - readyRpc.contLen = sizeof(SResReadyMsg); + readyRpc.contLen = sizeof(SResReadyReq); while (!testStop) { code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); @@ -137,12 +137,12 @@ void *fetchThread(void *param) { uint32_t n = 0; void *mockPointer = (void *)0x1; void *mgmt = param; - SResFetchMsg fetchMsg = {0}; + SResFetchReq fetchMsg = {0}; fetchMsg.sId = htobe64(1); fetchMsg.queryId = htobe64(1); fetchMsg.taskId = htobe64(1); fetchRpc.pCont = &fetchMsg; - fetchRpc.contLen = sizeof(SResFetchMsg); + fetchRpc.contLen = sizeof(SResFetchReq); while (!testStop) { code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); @@ -161,12 +161,12 @@ void *dropThread(void *param) { uint32_t n = 0; void *mockPointer = (void *)0x1; void *mgmt = param; - STaskDropMsg dropMsg = {0}; + STaskDropReq dropMsg = {0}; dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropMsg); + dropRpc.contLen = sizeof(STaskDropReq); while (!testStop) { code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); @@ -185,10 +185,10 @@ void *statusThread(void *param) { uint32_t n = 0; void *mockPointer = (void *)0x1; void *mgmt = param; - SSchTasksStatusMsg statusMsg = {0}; + SSchTasksStatusReq statusMsg = {0}; statusMsg.sId = htobe64(1); statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.contLen = sizeof(SSchTasksStatusReq); statusRpc.msgType = TDMT_VND_TASKS_STATUS; while (!testStop) { @@ -228,31 +228,31 @@ TEST(seqTest, normalCase) { queryRpc.pCont = queryMsg; queryRpc.contLen = sizeof(SSubQueryMsg) + 100; - SResReadyMsg readyMsg = {0}; + SResReadyReq readyMsg = {0}; readyMsg.sId = htobe64(1); readyMsg.queryId = htobe64(1); readyMsg.taskId = htobe64(1); readyRpc.pCont = &readyMsg; - readyRpc.contLen = sizeof(SResReadyMsg); + readyRpc.contLen = sizeof(SResReadyReq); - SResFetchMsg fetchMsg = {0}; + SResFetchReq fetchMsg = {0}; fetchMsg.sId = htobe64(1); fetchMsg.queryId = htobe64(1); fetchMsg.taskId = htobe64(1); fetchRpc.pCont = &fetchMsg; - fetchRpc.contLen = sizeof(SResFetchMsg); + fetchRpc.contLen = sizeof(SResFetchReq); - STaskDropMsg dropMsg = {0}; + STaskDropReq dropMsg = {0}; dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropMsg); + dropRpc.contLen = sizeof(STaskDropReq); - SSchTasksStatusMsg statusMsg = {0}; + SSchTasksStatusReq statusMsg = {0}; statusMsg.sId = htobe64(1); statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.contLen = sizeof(SSchTasksStatusReq); statusRpc.msgType = TDMT_VND_TASKS_STATUS; stubSetStringToPlan(); @@ -312,17 +312,17 @@ TEST(seqTest, cancelFirst) { queryRpc.pCont = queryMsg; queryRpc.contLen = sizeof(SSubQueryMsg) + 100; - STaskDropMsg dropMsg = {0}; + STaskDropReq dropMsg = {0}; dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropMsg); + dropRpc.contLen = sizeof(STaskDropReq); - SSchTasksStatusMsg statusMsg = {0}; + SSchTasksStatusReq statusMsg = {0}; statusMsg.sId = htobe64(1); statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.contLen = sizeof(SSchTasksStatusReq); statusRpc.msgType = TDMT_VND_TASKS_STATUS; stubSetStringToPlan(); @@ -370,31 +370,31 @@ TEST(seqTest, randCase) { queryRpc.pCont = queryMsg; queryRpc.contLen = sizeof(SSubQueryMsg) + 100; - SResReadyMsg readyMsg = {0}; + SResReadyReq readyMsg = {0}; readyMsg.sId = htobe64(1); readyMsg.queryId = htobe64(1); readyMsg.taskId = htobe64(1); readyRpc.pCont = &readyMsg; - readyRpc.contLen = sizeof(SResReadyMsg); + readyRpc.contLen = sizeof(SResReadyReq); - SResFetchMsg fetchMsg = {0}; + SResFetchReq fetchMsg = {0}; fetchMsg.sId = htobe64(1); fetchMsg.queryId = htobe64(1); fetchMsg.taskId = htobe64(1); fetchRpc.pCont = &fetchMsg; - fetchRpc.contLen = sizeof(SResFetchMsg); + fetchRpc.contLen = sizeof(SResFetchReq); - STaskDropMsg dropMsg = {0}; + STaskDropReq dropMsg = {0}; dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; - dropRpc.contLen = sizeof(STaskDropMsg); + dropRpc.contLen = sizeof(STaskDropReq); - SSchTasksStatusMsg statusMsg = {0}; + SSchTasksStatusReq statusMsg = {0}; statusMsg.sId = htobe64(1); statusRpc.pCont = &statusMsg; - statusRpc.contLen = sizeof(SSchTasksStatusMsg); + statusRpc.contLen = sizeof(SSchTasksStatusReq); statusRpc.msgType = TDMT_VND_TASKS_STATUS; stubSetStringToPlan(); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3ac08b5c42918ff20d25e438a7f0779a65b2947d..a72e97e13f3f3c5e99b2c3b394855b1e148c7bee 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -595,7 +595,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { - SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; job->resNumOfRows += rsp->affectedRows; code = schProcessOnTaskSuccess(job, task); @@ -832,14 +832,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { break; } case TDMT_VND_RES_READY: { - msgSize = sizeof(SResReadyMsg); + msgSize = sizeof(SResReadyReq); msg = calloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SResReadyMsg *pMsg = msg; + SResReadyReq *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); @@ -849,14 +849,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { break; } case TDMT_VND_FETCH: { - msgSize = sizeof(SResFetchMsg); + msgSize = sizeof(SResFetchReq); msg = calloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SResFetchMsg *pMsg = msg; + SResFetchReq *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); @@ -866,14 +866,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { break; } case TDMT_VND_DROP_TASK:{ - msgSize = sizeof(STaskDropMsg); + msgSize = sizeof(STaskDropReq); msg = calloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - STaskDropMsg *pMsg = msg; + STaskDropReq *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 790778b7368e6d2fcfeeb818acd498a6d28ff263..ac37b9585c9dfd2e0765b5d79c92493b87e7e59c 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -186,7 +186,7 @@ void *schtSendRsp(void *param) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SShellSubmitRspMsg rsp = {0}; + SShellSubmitRsp rsp = {0}; rsp.affectedRows = 10; schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);