提交 a326b290 编写于 作者: S Shengliang Guan

minor changes

上级 8cb02046
......@@ -133,36 +133,36 @@ typedef enum _mgmt_table {
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
typedef struct SKv {
typedef struct {
int32_t keyLen;
int32_t valueLen;
void* key;
void* value;
} SKv;
typedef struct SClientHbKey {
typedef struct {
int32_t connId;
int32_t hbType;
} SClientHbKey;
typedef struct SClientHbReq {
typedef struct {
SClientHbKey connKey;
SHashObj* info; // hash<Slv.key, Sklv>
} SClientHbReq;
typedef struct SClientHbBatchReq {
typedef struct {
int64_t reqId;
SArray* reqs; // SArray<SClientHbReq>
} SClientHbBatchReq;
typedef struct SClientHbRsp {
typedef struct {
SClientHbKey connKey;
int32_t status;
int32_t bodyLen;
void* body;
} SClientHbRsp;
typedef struct SClientHbBatchRsp {
typedef struct {
int64_t reqId;
int64_t rspId;
SArray* rsps; // SArray<SClientHbRsp>
......@@ -220,13 +220,13 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf;
}
typedef struct SBuildTableMetaInput {
typedef struct {
int32_t vgId;
char* dbName;
char* tableFullName;
} SBuildTableMetaInput;
typedef struct SBuildUseDBInput {
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
int32_t vgVersion;
} SBuildUseDBInput;
......@@ -234,17 +234,12 @@ typedef struct SBuildUseDBInput {
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
typedef struct {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
} SEpAddrMsg;
typedef struct {
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
} SEpAddr;
typedef struct SMsgHead {
typedef struct {
int32_t contLen;
int32_t vgId;
} SMsgHead;
......@@ -262,7 +257,7 @@ typedef struct SSubmitBlk {
} SSubmitBlk;
// Submit message for this TSDB
typedef struct SSubmitMsg {
typedef struct {
SMsgHead header;
int64_t version;
int32_t length;
......@@ -301,7 +296,7 @@ typedef struct {
int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks;
SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg;
} SShellSubmitRsp;
typedef struct SSchema {
int8_t type;
......@@ -310,98 +305,24 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
typedef struct {
int32_t contLen;
int32_t vgId;
int8_t tableType;
int16_t numOfColumns;
int16_t numOfTags;
int32_t tid;
int32_t sversion;
int32_t tversion;
int32_t tagDataLen;
int32_t sqlDataLen;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
char tableFname[TSDB_TABLE_FNAME_LEN];
char stbFname[TSDB_TABLE_FNAME_LEN];
char data[];
} SMDCreateTableMsg;
// typedef struct {
// int32_t len; // one create table message
// char tableName[TSDB_TABLE_FNAME_LEN];
// int16_t numOfColumns;
// int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
// int8_t igExists;
// int8_t rspMeta;
// int8_t reserved[16];
// char schema[];
//} SCreateTableMsg;
typedef struct {
char tableName[TSDB_TABLE_FNAME_LEN];
int16_t numOfColumns;
int16_t numOfTags;
int8_t igExists;
int8_t rspMeta;
char schema[];
} SCreateCTableMsg;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int32_t numOfTags;
int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbMsg, SCreateTableMsg;
} SMCreateStbReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SDropStbMsg;
} SMDropStbReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SAlterStbMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
} SVDropStbReq;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t type; /* operation type */
int32_t numOfCols; /* number of schema */
int32_t numOfTags;
char data[];
} SAlterTableMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SDropTableMsg;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int16_t colId;
int8_t type;
int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[];
} SUpdateTableTagValMsg;
} SMAlterStbReq;
typedef struct {
int32_t pid;
......@@ -470,28 +391,13 @@ typedef struct {
} SCreateUserReq, SAlterUserReq;
typedef struct {
int32_t contLen;
int32_t vgId;
int32_t tid;
uint64_t uid;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SMDDropTableMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg;
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
int16_t flag; // denote if it is a tag or a normal column
char name[TSDB_DB_FNAME_LEN];
} SColIndex;
typedef struct SColumnFilterInfo {
typedef struct {
int16_t lowerRelOptr;
int16_t upperRelOptr;
int16_t filterstr; // denote if current column is char(binary/nchar)
......@@ -512,7 +418,7 @@ typedef struct SColumnFilterInfo {
};
} SColumnFilterInfo;
typedef struct SColumnFilterList {
typedef struct {
int16_t numOfFilters;
union {
int64_t placeholder;
......@@ -523,14 +429,14 @@ typedef struct SColumnFilterList {
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.
*/
typedef struct SColumnInfo {
typedef struct {
int16_t colId;
int16_t type;
int16_t bytes;
SColumnFilterList flist;
} SColumnInfo;
typedef struct STableIdInfo {
typedef struct {
uint64_t uid;
TSKEY key; // last accessed ts, for subscription
} STableIdInfo;
......@@ -547,7 +453,7 @@ typedef struct {
int32_t tsOrder; // ts comp block order
} STsBufInfo;
typedef struct SInterval {
typedef struct {
int32_t tz; // query client timezone
char intervalUnit;
char slidingUnit;
......@@ -606,7 +512,7 @@ typedef struct {
int32_t udfContentOffset;
int32_t udfContentLen;
SColumnInfo tableCols[];
} SQueryTableMsg;
} SQueryTableReq;
typedef struct {
int32_t code;
......@@ -623,7 +529,7 @@ typedef struct {
int8_t free;
} SRetrieveTableReq;
typedef struct SRetrieveTableRsp {
typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;
......@@ -655,7 +561,7 @@ typedef struct {
int8_t update;
int8_t cacheLastRow;
int8_t ignoreExist;
} SCreateDbMsg;
} SCreateDbReq;
typedef struct {
char db[TSDB_DB_FNAME_LEN];
......@@ -768,7 +674,7 @@ typedef struct {
typedef struct {
int32_t reserved;
} STransMsg;
} STransReq;
typedef struct {
int32_t dnodeId;
......@@ -843,7 +749,7 @@ typedef struct {
SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg;
} STableInfoReq;
typedef struct {
int8_t metaClone; // create local clone of the cached table meta
......@@ -851,7 +757,7 @@ typedef struct {
int32_t numOfTables;
int32_t numOfUdfs;
char tableNames[];
} SMultiTableInfoMsg;
} SMultiTableInfoReq;
typedef struct SVgroupInfo {
int32_t vgId;
......@@ -859,19 +765,19 @@ typedef struct SVgroupInfo {
uint32_t hashEnd;
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SVgroupMsg;
typedef struct {
int32_t numOfVgroups;
SVgroupMsg vgroups[];
} SVgroupsMsg, SVgroupsInfo;
} SVgroupsInfo;
typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
......@@ -888,9 +794,9 @@ typedef struct {
uint64_t tuid;
int32_t vgId;
SSchema pSchema[];
} STableMetaMsg;
} STableMetaRsp;
typedef struct SMultiTableMeta {
typedef struct {
int32_t numOfTables;
int32_t numOfVgroup;
int32_t numOfUdf;
......@@ -934,9 +840,9 @@ typedef struct {
int32_t vgid[];
} SCompactReq;
typedef struct SShowRsp {
typedef struct {
int64_t showId;
STableMetaMsg tableMeta;
STableMetaRsp tableMeta;
} SShowRsp;
typedef struct {
......@@ -975,17 +881,6 @@ typedef struct {
int32_t dnodeId;
} SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq;
typedef struct {
int32_t dnodeId;
int32_t vgId;
int32_t tid;
} SConfigTableMsg;
typedef struct {
int32_t dnodeId;
int32_t vgId;
} SConfigVnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
int32_t queryId;
......@@ -1083,46 +978,6 @@ typedef struct {
} SSubmitReqReader;
typedef struct {
/* data */
} SCreateTableReq;
typedef struct {
/* data */
} SCreateTableRsp;
typedef struct {
/* data */
} SDropTableReq;
typedef struct {
/* data */
} SDropTableRsp;
typedef struct {
/* data */
} SAlterTableReq;
typedef struct {
/* data */
} SAlterTableRsp;
typedef struct {
/* data */
} SDropStableReq;
typedef struct {
/* data */
} SDropStableRsp;
typedef struct {
/* data */
} SUpdateTagValReq;
typedef struct {
/* data */
} SUpdateTagValRsp;
typedef struct SSubQueryMsg {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
......@@ -1131,59 +986,59 @@ typedef struct SSubQueryMsg {
char msg[];
} SSubQueryMsg;
typedef struct SResReadyMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResReadyMsg;
} SResReadyReq;
typedef struct SResReadyRsp {
typedef struct {
int32_t code;
} SResReadyRsp;
typedef struct SResFetchMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResFetchMsg;
} SResFetchReq;
typedef struct SSchTasksStatusMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
} SSchTasksStatusMsg;
} SSchTasksStatusReq;
typedef struct STaskStatus {
typedef struct {
uint64_t queryId;
uint64_t taskId;
int8_t status;
} STaskStatus;
typedef struct SSchedulerStatusRsp {
typedef struct {
uint32_t num;
STaskStatus status[];
} SSchedulerStatusRsp;
typedef struct STaskCancelMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskCancelMsg;
} STaskCancelReq;
typedef struct STaskCancelRsp {
typedef struct {
int32_t code;
} STaskCancelRsp;
typedef struct STaskDropMsg {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskDropMsg;
} STaskDropReq;
typedef struct STaskDropRsp {
typedef struct {
int32_t code;
} STaskDropRsp;
......@@ -1334,18 +1189,18 @@ typedef struct {
void* executor;
int32_t sqlLen;
char* sql;
} SCreateTopicMsg;
} SCreateTopicReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SDropTopicMsg;
} SDropTopicReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
} SAlterTopicMsg;
} SAlterTopicReq;
typedef struct {
SMsgHead head;
......@@ -1356,13 +1211,13 @@ typedef struct {
char* executor;
int32_t sqlLen;
char* sql;
} SDCreateTopicMsg;
} SDCreateTopicReq;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t tuid;
} SDDropTopicMsg;
} SDDropTopicReq;
typedef struct SVCreateTbReq {
uint64_t ver; // use a general definition
......@@ -1402,24 +1257,63 @@ void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct SVCreateTbRsp {
typedef struct {
SMsgHead head;
} SVCreateTbRsp;
typedef struct SVShowTablesReq {
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SVAlterTbReq;
typedef struct {
SMsgHead head;
} SVAlterTbRsp;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
} SVDropTbReq;
typedef struct {
SMsgHead head;
} SVDropTbRsp;
typedef struct {
SMsgHead head;
int64_t uid;
int32_t tid;
int16_t tversion;
int16_t colId;
int8_t type;
int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
char data[];
} SUpdateTagValReq;
typedef struct {
SMsgHead head;
} SUpdateTagValRsp;
typedef struct {
SMsgHead head;
} SVShowTablesReq;
typedef struct SVShowTablesRsp {
typedef struct {
int64_t id;
STableMetaMsg metaInfo;
STableMetaRsp metaInfo;
} SVShowTablesRsp;
typedef struct SVShowTablesFetchReq {
typedef struct {
SMsgHead head;
int32_t id;
} SVShowTablesFetchReq;
typedef struct SVShowTablesFetchRsp {
typedef struct {
int64_t useconds;
int8_t completed; // all results are returned to client
int8_t precision;
......
......@@ -114,7 +114,7 @@ typedef struct SProjectPhyNode {
typedef struct SExchangePhyNode {
SPhyNode node;
uint64_t srcTemplateId; // template id of datasource suplans
SArray *pSrcEndPoints; // SEpAddrMsg, scheduler fill by calling qSetSuplanExecutionNode
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhyNode;
typedef struct SSubplanId {
......
......@@ -101,11 +101,11 @@ typedef struct SMsgSendInfo {
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
......
......@@ -130,7 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
pShow->showId = htobe64(pShow->showId);
STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
STableMetaRsp *pMetaMsg = &(pShow->tableMeta);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
SSchema* pSchema = pMetaMsg->pSchema;
......
......@@ -53,7 +53,7 @@ class Testbase {
void SendShowMetaReq(int8_t showType, const char* db);
void SendShowRetrieveReq();
STableMetaMsg* GetShowMeta();
STableMetaRsp* GetShowMeta();
SRetrieveTableRsp* GetRetrieveRsp();
int32_t GetMetaNum();
......@@ -74,7 +74,7 @@ class Testbase {
private:
int64_t showId;
STableMetaMsg* pMeta;
STableMetaRsp* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
......
......@@ -179,6 +179,6 @@ const char* Testbase::GetShowBinary(int32_t len) {
int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; }
STableMetaMsg* Testbase::GetShowMeta() { return pMeta; }
STableMetaRsp* Testbase::GetShowMeta() { return pMeta; }
SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; }
\ No newline at end of file
......@@ -32,7 +32,7 @@ extern "C" {
typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
......
......@@ -31,7 +31,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter);
......@@ -391,7 +391,7 @@ static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -26,7 +26,7 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster);
static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster);
static int32_t mndCreateDefaultCluster(SMnode *pMnode);
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter);
......@@ -163,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
......
......@@ -39,7 +39,7 @@ static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
......@@ -395,7 +395,7 @@ static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname);
......@@ -417,9 +417,9 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pConsumer->lock);
int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pConsumer->lock);
mndReleaseDb(pMnode, pDb);
......@@ -483,7 +483,7 @@ static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumO
return 0;
}
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -35,7 +35,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pReq);
static int32_t mndProcessUseDbReq(SMnodeMsg *pReq);
static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq);
static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq);
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
......@@ -378,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) {
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0};
memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN);
memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN);
......@@ -449,7 +449,7 @@ CREATE_DB_OVER:
static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SCreateDbMsg *pCreate = pReq->rpcMsg.pCont;
SCreateDbReq *pCreate = pReq->rpcMsg.pCont;
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
......@@ -858,7 +858,7 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
pInfo->numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
SEpAddrMsg *pEpArrr = &pInfo->epAddr[gid];
SEpAddr *pEpArrr = &pInfo->epAddr[gid];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode != NULL) {
memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
......@@ -915,7 +915,7 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) {
return 0;
}
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -53,10 +53,10 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessStatusReq(SMnodeMsg *pReq);
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter);
......@@ -582,7 +582,7 @@ static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle);
}
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
......@@ -656,7 +656,7 @@ static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data,
static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {}
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -31,7 +31,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropFuncMsg(SMnodeMsg *pMsg);
static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg);
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveFuncs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
......@@ -395,7 +395,7 @@ static int32_t mndProcessRetrieveFuncMsg(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetFuncMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -33,7 +33,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
......@@ -579,7 +579,7 @@ static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -51,9 +51,9 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq);
static int32_t mndProcessConnectReq(SMnodeMsg *pReq);
static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq);
static int32_t mndProcessKillConnReq(SMnodeMsg *pReq);
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
......@@ -389,7 +389,7 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) {
}
}
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -518,7 +518,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
return numOfRows;
}
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......
......@@ -31,7 +31,7 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
......@@ -391,7 +391,7 @@ static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -31,7 +31,7 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveSnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
......@@ -393,7 +393,7 @@ static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -32,14 +32,14 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb);
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg);
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg);
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg);
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
......@@ -52,9 +52,9 @@ int32_t mndInitStb(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndStbActionUpdate,
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbMsg);
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcesSMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcesSMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcesSMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
......@@ -264,10 +264,10 @@ static void *mndBuildCreateStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
return buf;
}
static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropStbReq);
static SVDropTbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb) {
int32_t contLen = sizeof(SVDropTbReq);
SVDropStbReq *pDrop = calloc(1, contLen);
SVDropTbReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -276,12 +276,12 @@ static SVDropStbReq *mndBuildDropStbMsg(SMnode *pMnode, SVgObj *pVgroup, SStbObj
pDrop->head.contLen = htonl(contLen);
pDrop->head.vgId = htonl(pVgroup->vgId);
memcpy(pDrop->name, pStb->name, TSDB_TABLE_FNAME_LEN);
pDrop->suid = htobe64(pStb->uid);
// pDrop->suid = htobe64(pStb->uid);
return pDrop;
}
static int32_t mndCheckCreateStbMsg(SCreateStbMsg *pCreate) {
static int32_t mndCheckCreateStbMsg(SMCreateStbReq *pCreate) {
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
pCreate->numOfTags = htonl(pCreate->numOfTags);
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
......@@ -398,7 +398,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
if (pIter == NULL) break;
if (pVgroup->dbUid != pDb->uid) continue;
SVDropStbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
SVDropTbReq *pMsg = mndBuildDropStbMsg(pMnode, pVgroup, pStb);
if (pMsg == NULL) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
......@@ -409,7 +409,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg;
action.contLen = sizeof(SVDropStbReq);
action.contLen = sizeof(SVDropTbReq);
action.msgType = TDMT_VND_DROP_STB;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
......@@ -423,7 +423,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCreate, SDbObj *pDb) {
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0};
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
......@@ -494,9 +494,9 @@ CREATE_STB_OVER:
return code;
}
static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateStbMsg *pCreate = pMsg->rpcMsg.pCont;
SMCreateStbReq *pCreate = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to create", pCreate->name);
......@@ -551,7 +551,7 @@ static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndCheckAlterStbMsg(SMAlterStbReq *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
......@@ -578,9 +578,9 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pOldStb, SStbObj *pNewStb) { return 0; }
static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMAlterStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SAlterStbMsg *pAlter = pMsg->rpcMsg.pCont;
SMAlterStbReq *pAlter = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to alter", pAlter->name);
......@@ -692,9 +692,9 @@ DROP_STB_OVER:
return 0;
}
static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcesSMDropStbReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropStbMsg *pDrop = pMsg->rpcMsg.pCont;
SMDropStbReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to drop", pDrop->name);
......@@ -729,7 +729,7 @@ static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
......@@ -750,9 +750,9 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
......@@ -818,7 +818,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
return 0;
}
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -35,7 +35,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
......@@ -186,10 +186,10 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db);
}
static SDDropTopicMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicMsg);
static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMqTopicObj *pTopic) {
int32_t contLen = sizeof(SDDropTopicReq);
SDDropTopicMsg *pDrop = calloc(1, contLen);
SDDropTopicReq *pDrop = calloc(1, contLen);
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -274,7 +274,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SDropTopicMsg *pDrop = pMsg->rpcMsg.pCont;
SDropTopicReq *pDrop = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to drop", pDrop->name);
......@@ -309,7 +309,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) {
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
mDebug("topic:%s, start to retrieve meta", pInfo->tableFname);
......@@ -331,9 +331,9 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
taosRLockLatch(&pTopic->lock);
int32_t totalCols = pTopic->numOfColumns + pTopic->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
taosRUnLockLatch(&pTopic->lock);
mndReleaseDb(pMnode, pDb);
......@@ -397,7 +397,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0;
}
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -32,7 +32,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass
static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq);
static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq);
static int32_t mndProcessDropUserReq(SMnodeMsg *pReq);
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveUsers(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
......@@ -432,7 +432,7 @@ static int32_t mndProcessDropUserReq(SMnodeMsg *pReq) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) {
static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -27,19 +27,19 @@
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp);
static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
int32_t mndInitVgroup(SMnode *pMnode) {
......@@ -164,14 +164,14 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
return 0;
}
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup);
pOldVgroup->updateTime = pNewVgroup->updateTime;
pOldVgroup->version = pNewVgroup->version;
pOldVgroup->hashBegin = pNewVgroup->hashBegin;
pOldVgroup->hashEnd = pNewVgroup->hashEnd;
pOldVgroup->replica = pNewVgroup->replica;
memcpy(pOldVgroup->vnodeGid, pNewVgroup->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->hashBegin = pNew->hashBegin;
pOld->hashEnd = pNew->hashEnd;
pOld->replica = pNew->replica;
memcpy(pOld->vnodeGid, pNew->vnodeGid, TSDB_MAX_REPLICA * sizeof(SVnodeGid));
return 0;
}
......@@ -427,24 +427,24 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
return epset;
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
mndTransProcessRsp(pMsg);
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp) { return 0; }
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
......@@ -475,8 +475,8 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
return 0;
}
static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
if (mndGetVgroupMaxReplica(pMnode, pShow->db, &pShow->replica, &pShow->numOfRows) != 0) {
......@@ -526,8 +526,8 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
return 0;
}
static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
......@@ -593,8 +593,8 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
return numOfVnodes;
}
static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
......@@ -633,8 +633,8 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
return 0;
}
static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pReq->pMnode;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
......
......@@ -61,8 +61,8 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
STransReq *pMsg = rpcMallocCont(sizeof(STransReq));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}
......
......@@ -64,9 +64,9 @@ TEST_F(MndTestDb, 01_ShowDb) {
TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -224,9 +224,9 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) {
TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -290,7 +290,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
SEpAddr* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost");
......@@ -306,7 +306,7 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) {
EXPECT_EQ(pInfo->hashEnd, UINT32_MAX);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
SEpAddr* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9030);
EXPECT_STREQ(pAddr->fqdn, "localhost");
......
......@@ -63,7 +63,7 @@ TEST_F(MndTestShow, 03_ShowMsg_Conn) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_CONNS, "");
STableMetaMsg* pMeta = test.GetShowMeta();
STableMetaRsp* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
......
......@@ -27,9 +27,9 @@ Testbase MndTestStb::test;
TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
int32_t contLen = sizeof(SCreateDbMsg);
int32_t contLen = sizeof(SCreateDbReq);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -59,9 +59,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
int32_t cols = 2;
int32_t tags = 3;
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(contLen);
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
pReq->numOfTags = htonl(tags);
pReq->numOfColumns = htonl(cols);
......@@ -123,16 +123,16 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
// ----- meta ------
{
int32_t contLen = sizeof(STableInfoMsg);
int32_t contLen = sizeof(STableInfoReq);
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen);
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont;
STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
pRsp->numOfTags = htonl(pRsp->numOfTags);
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
......@@ -214,9 +214,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
CheckInt32(3);
{
int32_t contLen = sizeof(SDropStbMsg);
int32_t contLen = sizeof(SMDropStbReq);
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(contLen);
SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
......
......@@ -22,9 +22,9 @@
extern "C" {
#endif
// SVDropTableReq
// int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq);
// void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq);
// SVDropTbReq
// int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq);
// void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq);
#ifdef __cplusplus
}
......
......@@ -53,14 +53,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STableInfoMsg * pReq = (STableInfoMsg *)(pMsg->pCont);
STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont);
STbCfg * pTbCfg = NULL;
STbCfg * pStbCfg = NULL;
tb_uid_t uid;
int32_t nCols;
int32_t nTagCols;
SSchemaWrapper *pSW;
STableMetaMsg * pTbMetaMsg = NULL;
STableMetaRsp * pTbMetaMsg = NULL;
SSchema * pTagSchema;
SRpcMsg rpcMsg;
int msgLen = 0;
......@@ -94,8 +94,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTagSchema = NULL;
}
msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
msgLen = sizeof(STableMetaRsp) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaRsp *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) {
goto _exit;
}
......@@ -167,7 +167,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);
memset(pFetchRsp, 0, sizeof(SVShowTablesFetchRsp) + payloadLen);
char *p = pFetchRsp->data;
for (int32_t i = 0; i < numOfTables; ++i) {
......
......@@ -108,12 +108,12 @@ static void *vnodeParseCreateTableReq(void *buf, SVCreateTableReq *pReq) {
return buf;
}
int vnodeBuildDropTableReq(void **buf, const SVDropTableReq *pReq) {
int vnodeBuildDropTableReq(void **buf, const SVDropTbReq *pReq) {
// TODO
return 0;
}
void *vnodeParseDropTableReq(void *buf, SVDropTableReq *pReq) {
void *vnodeParseDropTableReq(void *buf, SVDropTbReq *pReq) {
// TODO
}
#endif
\ No newline at end of file
......@@ -54,7 +54,7 @@ typedef struct STsdbMemTable {
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
......
......@@ -73,7 +73,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
}
}
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg, SShellSubmitRsp *pRsp) {
SSubmitBlk * pBlock = NULL;
SSubmitMsgIter msgIter = {0};
int32_t affectedrows = 0, numOfRows = 0;
......
......@@ -68,7 +68,7 @@ char *ctgTestSTablename = "stable1";
void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(sizeof(SCreateDbReq));
strcpy(pReq->db, "1.db1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -92,7 +92,7 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.contLen = sizeof(SCreateDbReq);
rpcMsg.msgType = TDMT_MND_CREATE_DB;
SRpcMsg rpcRsp = {0};
......@@ -200,7 +200,7 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo *dbVgroup) {
vgInfo.numOfEps = i % TSDB_MAX_REPLICA + 1;
vgInfo.inUse = i % vgInfo.numOfEps;
for (int32_t n = 0; n < vgInfo.numOfEps; ++n) {
SEpAddrMsg *addr = &vgInfo.epAddr[n];
SEpAddr *addr = &vgInfo.epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22);
}
......@@ -234,7 +234,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
vg->numOfEps = i % TSDB_MAX_REPLICA + 1;
vg->inUse = i % vg->numOfEps;
for (int32_t n = 0; n < vg->numOfEps; ++n) {
SEpAddrMsg *addr = &vg->epAddr[n];
SEpAddr *addr = &vg->epAddr[n];
strcpy(addr->fqdn, "a0");
addr->port = htons(n + 22);
}
......@@ -249,12 +249,12 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestTablename);
rspMsg->numOfTags = 0;
rspMsg->numOfColumns = htonl(ctgTestColNum);
......@@ -285,12 +285,12 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestCTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
......@@ -329,12 +329,12 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
sprintf(rspMsg->stbFname, "%s.%s", ctgTestDbname, ctgTestSTablename);
rspMsg->numOfTags = htonl(ctgTestTagNum);
......@@ -372,13 +372,13 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
}
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
STableMetaMsg *rspMsg = NULL; //todo
STableMetaRsp *rspMsg = NULL; //todo
static int32_t idx = 1;
pRsp->code =0;
pRsp->contLen = sizeof(STableMetaMsg) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->contLen = sizeof(STableMetaRsp) + (ctgTestColNum + ctgTestTagNum) * sizeof(SSchema);
pRsp->pCont = calloc(1, pRsp->contLen);
rspMsg = (STableMetaMsg *)pRsp->pCont;
rspMsg = (STableMetaRsp *)pRsp->pCont;
sprintf(rspMsg->tbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
sprintf(rspMsg->stbFname, "%s.%s_%d", ctgTestDbname, ctgTestSTablename, idx);
rspMsg->numOfTags = htonl(ctgTestTagNum);
......
......@@ -593,17 +593,17 @@ void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity);
void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput);
void freeParam(STaskParam *param);
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param);
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param);
int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo);
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo,
SSqlExpr **pExpr, SExprInfo *prevExpr, struct SUdfInfo *pUdfInfo);
int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters);
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code);
SQInfo *createQInfoImpl(SQueryTableReq *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs,
SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
......
......@@ -86,7 +86,7 @@ int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryMsg, qTaskInfo_t* pTas
}
if (param.pTableIdList == NULL || taosArrayGetSize(param.pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
qError("qmsg:%p, SQueryTableReq wrong format", pQueryMsg);
code = TSDB_CODE_QRY_INVALID_MSG;
goto _over;
}
......
......@@ -2458,7 +2458,7 @@ static void doUpdateLastKey(STaskAttr* pQueryAttr) {
}
}
static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) {
static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
// in case of point-interpolation query, use asc order scan
......@@ -7060,7 +7060,7 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SSqlExpr *pExpr, SCol
return j != INT32_MIN;
}
static bool validateQueryMsg(SQueryTableMsg *pQueryMsg) {
static bool validateQueryMsg(SQueryTableReq *pQueryMsg) {
if (pQueryMsg->interval.interval < 0) {
//qError("qmsg:%p illegal value of interval time %" PRId64, pQueryMsg, pQueryMsg->interval.interval);
return false;
......@@ -7126,7 +7126,7 @@ static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SSqlExpr** pEx
return true;
}
static char *createTableIdList(SQueryTableMsg *pQueryMsg, char *pMsg, SArray **pTableIdList) {
static char *createTableIdList(SQueryTableReq *pQueryMsg, char *pMsg, SArray **pTableIdList) {
assert(pQueryMsg->numOfTables > 0);
*pTableIdList = taosArrayInit(pQueryMsg->numOfTables, sizeof(STableIdInfo));
......@@ -7268,7 +7268,7 @@ int32_t parseTaskInfo(const char* msg, int32_t len) {
* @param pExpr
* @return
*/
int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param) {
int32_t convertQueryMsg(SQueryTableReq *pQueryMsg, STaskParam* param) {
int32_t code = TSDB_CODE_SUCCESS;
// if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) {
......@@ -7843,7 +7843,7 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) {
// todo refactor
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo,
SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo *pUdfInfo) {
// *pExprInfo = NULL;
// int32_t code = TSDB_CODE_SUCCESS;
......@@ -7901,7 +7901,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t nu
return TSDB_CODE_SUCCESS;
}
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code) {
if (pQueryMsg->numOfGroupCols == 0) {
return NULL;
}
......@@ -8069,7 +8069,7 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return ((SQInfo *)qHandle)->qId == qId;
}
SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SQInfo* createQInfoImpl(SQueryTableReq* pQueryMsg, SGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,
SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId,
char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo) {
int16_t numOfCols = pQueryMsg->numOfCols;
......
......@@ -9,9 +9,9 @@ SCreateUserReq* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
SCreateAcctReq* buildAcctManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SDropUserReq* buildDropUserMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen);
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx* pParseCtx, char* msgBuf, int32_t msgLen);
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf);
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
......
......@@ -112,7 +112,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf,
return pShowMsg;
}
static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) {
static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid number of keep options";
const char* msg2 = "invalid keep value";
const char* msg3 = "invalid keep value, should be keep0 <= keep1 <= keep2";
......@@ -151,7 +151,7 @@ static int32_t setKeepOption(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb,
return TSDB_CODE_SUCCESS;
}
static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) {
static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDbInfo, SMsgBuf* pMsgBuf) {
const char* msg = "invalid time precision";
pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default
......@@ -178,7 +178,7 @@ static int32_t setTimePrecision(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreate
return TSDB_CODE_SUCCESS;
}
static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize);
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
......@@ -196,7 +196,7 @@ static void doSetDbOptions(SCreateDbMsg* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups);
}
int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
doSetDbOptions(pCreateDbMsg, pCreateDbSql);
if (setKeepOption(pCreateDbMsg, pCreateDbSql, pMsgBuf) != TSDB_CODE_SUCCESS) {
......@@ -210,8 +210,8 @@ int32_t setDbOptions(SCreateDbMsg* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
return TSDB_CODE_SUCCESS;
}
SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
SCreateDbMsg* pCreateMsg = calloc(1, sizeof(SCreateDbMsg));
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCtx, SMsgBuf* pMsgBuf) {
SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq));
if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
tfree(pCreateMsg);
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
......@@ -230,7 +230,7 @@ SCreateDbMsg* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseBasicCtx *pCt
return pCreateMsg;
}
SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SSchema* pSchema;
int32_t numOfTags = 0;
......@@ -239,7 +239,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len,
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
}
SCreateStbMsg* pCreateStbMsg = (SCreateStbMsg*)calloc(1, sizeof(SCreateStbMsg) + (numOfCols + numOfTags) * sizeof(SSchema));
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema));
char* pMsg = NULL;
#if 0
......@@ -315,7 +315,7 @@ SCreateStbMsg* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len,
return pCreateStbMsg;
}
SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* pParseCtx, SMsgBuf* pMsgBuf) {
SToken* tableName = taosArrayGet(pInfo->pMiscInfo->a, 0);
SName name = {0};
......@@ -325,13 +325,13 @@ SDropStbMsg* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseBasicCtx* p
return NULL;
}
SDropStbMsg *pDropTableMsg = (SDropStbMsg*) calloc(1, sizeof(SDropStbMsg));
SMDropStbReq *pDropTableMsg = (SMDropStbReq*) calloc(1, sizeof(SMDropStbReq));
code = tNameExtractFullName(&name, pDropTableMsg->name);
assert(code == TSDB_CODE_SUCCESS && name.type == TSDB_TABLE_NAME_T);
pDropTableMsg->igNotExists = pInfo->pMiscInfo->existsCheck ? 1 : 0;
*len = sizeof(SDropStbMsg);
*len = sizeof(SMDropStbReq);
return pDropTableMsg;
}
......
......@@ -117,7 +117,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
}
// can only perform the parameters based on the macro definitation
static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
static int32_t doCheckDbOptions(SCreateDbReq* pCreate, SMsgBuf* pMsgBuf) {
char msg[512] = {0};
if (pCreate->walLevel != -1 && (pCreate->walLevel < TSDB_MIN_WAL_LEVEL || pCreate->walLevel > TSDB_MAX_WAL_LEVEL)) {
......@@ -870,14 +870,14 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
goto _error;
}
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
SCreateDbReq* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
}
pDcl->pMsg = (char*)pCreateMsg;
pDcl->msgLen = sizeof(SCreateDbMsg);
pDcl->msgLen = sizeof(SCreateDbReq);
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB;
break;
}
......
......@@ -557,7 +557,7 @@ static const char* jkEpAddrFqdn = "Fqdn";
static const char* jkEpAddrPort = "Port";
static bool epAddrToJson(const void* obj, cJSON* json) {
const SEpAddrMsg* ep = (const SEpAddrMsg*)obj;
const SEpAddr* ep = (const SEpAddr*)obj;
bool res = cJSON_AddStringToObject(json, jkEpAddrFqdn, ep->fqdn);
if (res) {
res = cJSON_AddNumberToObject(json, jkEpAddrPort, ep->port);
......@@ -566,7 +566,7 @@ static bool epAddrToJson(const void* obj, cJSON* json) {
}
static bool epAddrFromJson(const cJSON* json, void* obj) {
SEpAddrMsg* ep = (SEpAddrMsg*)obj;
SEpAddr* ep = (SEpAddr*)obj;
copyString(json, jkEpAddrFqdn, ep->fqdn);
ep->port = getNumber(json, jkEpAddrPort);
return true;
......@@ -583,7 +583,7 @@ static bool nodeAddrToJson(const void* obj, cJSON* json) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
}
if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg));
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr));
}
return res;
}
......@@ -593,7 +593,7 @@ static bool nodeAddrFromJson(const cJSON* json, void* obj) {
ep->nodeId = getNumber(json, jkNodeAddrId);
ep->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps);
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps);
ep->numOfEps = numOfEps;
return res;
}
......
......@@ -29,7 +29,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;
int32_t estimateSize = sizeof(STableInfoMsg);
int32_t estimateSize = sizeof(STableInfoReq);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = rpcMallocCont(estimateSize);
......@@ -38,7 +38,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
}
}
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
STableInfoReq *bMsg = (STableInfoReq *)*msg;
bMsg->header.vgId = htonl(bInput->vgId);
......@@ -146,7 +146,7 @@ _return:
return code;
}
static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) {
pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags);
pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns);
pMetaMsg->sversion = ntohl(pMetaMsg->sversion);
......@@ -198,7 +198,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaMsg* pMetaMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta) {
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
......@@ -232,7 +232,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaMsg* msg, bool isSuperTable, STabl
int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)msg;
STableMetaRsp *pMetaMsg = (STableMetaRsp *)msg;
int32_t code = queryConvertTableMetaMsg(pMetaMsg);
if (code != TSDB_CODE_SUCCESS) {
return code;
......
......@@ -1080,7 +1080,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResReadyMsg *msg = pMsg->pCont;
SResReadyReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -1101,7 +1101,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
SSchTasksStatusMsg *msg = pMsg->pCont;
SSchTasksStatusReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -1125,7 +1125,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SResFetchMsg *msg = pMsg->pCont;
SResFetchReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1157,7 +1157,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
STaskCancelMsg *msg = pMsg->pCont;
STaskCancelReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......@@ -1182,7 +1182,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
int32_t code = 0;
STaskDropMsg *msg = pMsg->pCont;
STaskDropReq *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task drop msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
......
......@@ -113,12 +113,12 @@ void *readyThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
while (!testStop) {
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
......@@ -137,12 +137,12 @@ void *fetchThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
while (!testStop) {
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
......@@ -161,12 +161,12 @@ void *dropThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
while (!testStop) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
......@@ -185,10 +185,10 @@ void *statusThread(void *param) {
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
......@@ -228,31 +228,31 @@ TEST(seqTest, normalCase) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
......@@ -312,17 +312,17 @@ TEST(seqTest, cancelFirst) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
......@@ -370,31 +370,31 @@ TEST(seqTest, randCase) {
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchMsg fetchMsg = {0};
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropMsg dropMsg = {0};
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusMsg statusMsg = {0};
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusMsg);
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
stubSetStringToPlan();
......
......@@ -595,7 +595,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
job->resNumOfRows += rsp->affectedRows;
code = schProcessOnTaskSuccess(job, task);
......@@ -832,14 +832,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_RES_READY: {
msgSize = sizeof(SResReadyMsg);
msgSize = sizeof(SResReadyReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResReadyMsg *pMsg = msg;
SResReadyReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
......@@ -849,14 +849,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_FETCH: {
msgSize = sizeof(SResFetchMsg);
msgSize = sizeof(SResFetchReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SResFetchMsg *pMsg = msg;
SResFetchReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
......@@ -866,14 +866,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
break;
}
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msgSize = sizeof(STaskDropReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
SCH_TASK_ELOG("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
STaskDropReq *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
......
......@@ -186,7 +186,7 @@ void *schtSendRsp(void *param) {
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SShellSubmitRspMsg rsp = {0};
SShellSubmitRsp rsp = {0};
rsp.affectedRows = 10;
schProcessRspMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册