/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef _TD_COMMON_TAOS_MSG_H_ #define _TD_COMMON_TAOS_MSG_H_ #ifdef __cplusplus extern "C" { #endif #include "encode.h" #include "taosdef.h" #include "taoserror.h" #include "tarray.h" #include "tcoding.h" #include "tdataformat.h" #include "thash.h" #include "tlist.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ #define TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #define TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" extern char* tMsgInfo[]; extern int tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type #define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_META 2 #define TSDB_IE_TYPE_MGMT_IP 3 #define TSDB_IE_TYPE_DNODE_CFG 4 #define TSDB_IE_TYPE_NEW_VERSION 5 #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 typedef enum { HEARTBEAT_TYPE_MQ = 0, HEARTBEAT_TYPE_QUERY = 1, // types can be added here // HEARTBEAT_TYPE_MAX } EHbType; enum { HEARTBEAT_KEY_DBINFO = 1, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_MQ_TMP, }; typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_ACCT, TSDB_MGMT_TABLE_USER, TSDB_MGMT_TABLE_DB, TSDB_MGMT_TABLE_TABLE, TSDB_MGMT_TABLE_DNODE, TSDB_MGMT_TABLE_MNODE, TSDB_MGMT_TABLE_QNODE, TSDB_MGMT_TABLE_SNODE, TSDB_MGMT_TABLE_BNODE, TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_STB, TSDB_MGMT_TABLE_MODULE, TSDB_MGMT_TABLE_QUERIES, TSDB_MGMT_TABLE_STREAMS, TSDB_MGMT_TABLE_VARIABLES, TSDB_MGMT_TABLE_CONNS, TSDB_MGMT_TABLE_SCORES, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_FUNC, TSDB_MGMT_TABLE_MAX, } EShowType; #define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1 #define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2 #define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3 #define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 #define TSDB_ALTER_TABLE_ADD_COLUMN 5 #define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_CHANGE_COLUMN 7 #define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8 #define TSDB_FILL_NONE 0 #define TSDB_FILL_NULL 1 #define TSDB_FILL_SET_VALUE 2 #define TSDB_FILL_LINEAR 3 #define TSDB_FILL_PREV 4 #define TSDB_FILL_NEXT 5 #define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_KILL_MSG_LEN 30 #define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_COL_NORMAL 0x0u // the normal column of the table #define TSDB_COL_TAG 0x1u // the tag column type #define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column #define TSDB_COL_TMP 0x4u // internal column generated by the previous operators #define TSDB_COL_NULL 0x8u // the column filter NULL or not #define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TD_SUPER_TABLE TSDB_SUPER_TABLE #define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE typedef struct { int32_t vgId; char* dbFName; char* tbName; } SBuildTableMetaInput; typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SBuildUseDBInput; #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta typedef struct SEp { char fqdn[TSDB_FQDN_LEN]; uint16_t port; } SEp; typedef struct { int32_t contLen; int32_t vgId; } SMsgHead; // Submit message for one table typedef struct SSubmitBlk { uint64_t uid; // table unique id int32_t tid; // table id int32_t padding; // TODO just for padding here int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists int16_t numOfRows; // total number of rows in current submit block char data[]; } SSubmitBlk; typedef struct { /* data */ } SSubmitReq; typedef struct { /* data */ } SSubmitRsp; typedef struct { /* data */ } SSubmitReqReader; // Submit message for this TSDB typedef struct { SMsgHead header; int64_t version; int32_t length; int32_t numOfBlocks; char blocks[]; } SSubmitMsg; typedef struct { int32_t totalLen; int32_t len; SMemRow row; } SSubmitBlkIter; typedef struct { int32_t totalLen; int32_t len; void* pMsg; } SSubmitMsgIter; int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); SMemRow tGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block int32_t sid; // table index of failed block int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table } SShellSubmitRspBlock; typedef struct { int32_t code; // 0-success, > 0 error code int32_t numOfRows; // number of records the client is trying to write int32_t affectedRows; // number of records actually written int32_t failedRows; // number of failed records (exclude duplicate records) int32_t numOfFailedBlocks; SShellSubmitRspBlock failedBlocks[]; } SShellSubmitRsp; typedef struct SSchema { int8_t type; int32_t colId; int32_t bytes; char name[TSDB_COL_NAME_LEN]; } SSchema; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; int32_t numOfTags; int32_t numOfColumns; SSchema pSchema[]; } SMCreateStbReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropStbReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; int32_t numOfColumns; SSchema pSchema[]; } SMAlterStbReq; typedef struct { int32_t pid; char app[TSDB_APP_NAME_LEN]; char db[TSDB_DB_NAME_LEN]; int64_t startTime; } SConnectReq; typedef struct SEpSet { int8_t inUse; int8_t numOfEps; SEp eps[TSDB_MAX_REPLICA]; } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { int tlen = 0; tlen += taosEncodeFixedI8(buf, pEp->inUse); tlen += taosEncodeFixedI8(buf, pEp->numOfEps); for (int i = 0; i < TSDB_MAX_REPLICA; i++) { tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); tlen += taosEncodeString(buf, pEp->eps[i].fqdn); } return tlen; } static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { buf = taosDecodeFixedI8(buf, &pEp->inUse); buf = taosDecodeFixedI8(buf, &pEp->numOfEps); for (int i = 0; i < TSDB_MAX_REPLICA; i++) { buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); } return buf; } typedef struct { int32_t acctId; int64_t clusterId; int32_t connId; int8_t superUser; int8_t align[3]; SEpSet epSet; char sVersion[128]; } SConnectRsp; typedef struct { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; int32_t maxUsers; int32_t maxDbs; int32_t maxTimeSeries; int32_t maxStreams; int32_t accessState; // Configured only by command int64_t maxStorage; // In unit of GB } SCreateAcctReq, SAlterAcctReq; typedef struct { char user[TSDB_USER_LEN]; } SDropUserReq, SDropAcctReq; typedef struct { int8_t type; char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; int8_t superUser; // denote if it is a super user or not } SCreateUserReq, SAlterUserReq; typedef struct { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t flag; // denote if it is a tag or a normal column char name[TSDB_DB_FNAME_LEN]; } SColIndex; typedef struct { int16_t lowerRelOptr; int16_t upperRelOptr; int16_t filterstr; // denote if current column is char(binary/nchar) union { struct { int64_t lowerBndi; int64_t upperBndi; }; struct { double lowerBndd; double upperBndd; }; struct { int64_t pz; int64_t len; }; }; } SColumnFilterInfo; typedef struct { int16_t numOfFilters; union { int64_t placeholder; SColumnFilterInfo* filterInfo; }; } SColumnFilterList; /* * for client side struct, we only need the column id, type, bytes are not necessary * But for data in vnode side, we need all the following information. */ typedef struct { int16_t colId; int16_t type; int16_t bytes; SColumnFilterList flist; } SColumnInfo; typedef struct { uint64_t uid; TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { TSKEY skey; TSKEY ekey; } STimeWindow; typedef struct { int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsOrder; // ts comp block order } STsBufInfo; typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; char offsetUnit; int64_t interval; int64_t sliding; int64_t offset; } SInterval; typedef struct { SMsgHead head; char version[TSDB_VERSION_LEN]; bool stableQuery; // super table query or not bool topBotQuery; // TODO used bitwise flag bool interpQuery; // interp query or not bool groupbyColumn; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo; // if the time window start/end required interpolation bool queryBlockDist; // if query data block distribution bool stabledev; // super table stddev query bool tsCompQuery; // is tscomp query bool simpleAgg; bool pointInterpQuery; // point interpolation query bool needReverseScan; // need reverse scan bool stateWindow; // state window flag STimeWindow window; int32_t numOfTables; int16_t order; int16_t orderColId; int16_t numOfCols; // the number of columns will be load from vnode SInterval interval; // SSessionWindow sw; // session window int16_t tagCondLen; // tag length in current query int16_t colCondLen; // column length in current query int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int16_t prjOrder; // global order in super table projection query. int64_t limit; int64_t offset; int32_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t fillType; // interpolate type int64_t fillVal; // default value array list int32_t secondStageOutput; STsBufInfo tsBuf; // tsBuf info int32_t numOfTags; // number of tags columns involved int32_t sqlstrLen; // sql query string int32_t prevResultLen; // previous result length int32_t numOfOperator; int32_t tableScanOperator; // table scan operator. -1 means no scan operator int32_t udfNum; // number of udf function int32_t udfContentOffset; int32_t udfContentLen; SColumnInfo tableCols[]; } SQueryTableReq; typedef struct { int32_t code; } SQueryTableRsp; // todo: the show handle should be replaced with id typedef struct { SMsgHead header; union { int64_t showId; int64_t qhandle; int64_t qId; }; // query handle int8_t free; } SRetrieveTableReq; typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; int32_t compLen; int32_t numOfRows; char data[]; } SRetrieveTableRsp; typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroups; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t minRows; int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; int8_t walLevel; int8_t precision; // time resolution int8_t compression; int8_t replications; int8_t quorum; int8_t update; int8_t cacheLastRow; int8_t ignoreExist; } SCreateDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t totalBlocks; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t fsyncPeriod; int8_t walLevel; int8_t quorum; int8_t cacheLastRow; } SAlterDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; int8_t ignoreNotExists; } SDropDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; uint64_t uid; } SDropDbRsp; typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SUseDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; } SSyncDbReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; } SCompactDbReq; typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t igExists; int8_t funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; int64_t signature; int32_t commentSize; int32_t codeSize; char pCont[]; } SCreateFuncReq; typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t igNotExists; } SDropFuncReq; typedef struct { int32_t numOfFuncs; char pFuncNames[]; } SRetrieveFuncReq; typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t align; int8_t funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; int64_t signature; int32_t commentSize; int32_t codeSize; char pCont[]; } SFuncInfo; typedef struct { int32_t numOfFuncs; char pFuncInfos[]; } SRetrieveFuncRsp; typedef struct { int32_t statusInterval; int64_t checkTime; // 1970-01-01 00:00:00.000 char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone char locale[TSDB_LOCALE_LEN]; // tsLocale char charset[TSDB_LOCALE_LEN]; // tsCharset } SClusterCfg; typedef struct { int32_t vgId; int8_t role; int8_t align[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; int64_t tablesNum; } SVnodeLoad; typedef struct { int32_t num; SVnodeLoad data[]; } SVnodeLoads; typedef struct { int32_t sver; int32_t dnodeId; int64_t clusterId; int64_t dver; int64_t rebootTime; int64_t updateTime; int32_t numOfCores; int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SVnodeLoads vnodeLoads; } SStatusReq; typedef struct { int32_t reserved; } STransReq; typedef struct { int32_t dnodeId; int64_t clusterId; } SDnodeCfg; typedef struct { int32_t id; int8_t isMnode; int8_t align; SEp ep; } SDnodeEp; typedef struct { int32_t num; SDnodeEp eps[]; } SDnodeEps; typedef struct { int64_t dver; SDnodeCfg dnodeCfg; SDnodeEps dnodeEps; } SStatusRsp; typedef struct { int32_t id; uint16_t port; // node sync Port char fqdn[TSDB_FQDN_LEN]; // node FQDN } SReplica; typedef struct { int32_t vgId; int32_t dnodeId; char db[TSDB_DB_FNAME_LEN]; uint64_t dbUid; int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t minRows; int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; int8_t walLevel; int8_t precision; int8_t compression; int8_t quorum; int8_t update; int8_t cacheLastRow; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; } SCreateVnodeReq, SAlterVnodeReq; typedef struct { int32_t vgId; int32_t dnodeId; uint64_t dbUid; char db[TSDB_DB_FNAME_LEN]; } SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; typedef struct { int32_t vgId; int8_t accessState; } SAuthVnodeReq; typedef struct { SMsgHead header; char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; } STableInfoReq; typedef struct { int8_t metaClone; // create local clone of the cached table meta int32_t numOfVgroups; int32_t numOfTables; int32_t numOfUdfs; char tableNames[]; } SMultiTableInfoReq; // todo refactor typedef struct SVgroupInfo { int32_t vgId; uint32_t hashBegin; uint32_t hashEnd; SEpSet epset; } SVgroupInfo; typedef struct { int32_t numOfVgroups; SVgroupInfo vgroups[]; } SVgroupsInfo; typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; int32_t numOfTags; int32_t numOfColumns; int8_t precision; int8_t tableType; int8_t update; int32_t sversion; int32_t tversion; uint64_t suid; uint64_t tuid; int32_t vgId; SSchema pSchema[]; } STableMetaRsp; typedef struct { int32_t numOfTables; int32_t numOfVgroup; int32_t numOfUdf; int32_t contLen; int8_t compressed; // denote if compressed or not int32_t rawLen; // size before compress uint8_t metaClone; // make meta clone after retrieve meta from mnode char meta[]; } SMultiTableMeta; typedef struct { int32_t dataLen; char name[TSDB_TABLE_FNAME_LEN]; char* data; } STagData; typedef struct { char db[TSDB_DB_FNAME_LEN]; uint64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; SVgroupInfo vgroupInfo[]; } SUseDbRsp; /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' * payloadLen is the length of payload */ typedef struct { int8_t type; char db[TSDB_DB_FNAME_LEN]; int16_t payloadLen; char payload[]; } SShowReq; typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroup; int32_t vgid[]; } SCompactReq; typedef struct { int64_t showId; STableMetaRsp tableMeta; } SShowRsp; typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; } SCreateDnodeReq; typedef struct { int32_t dnodeId; } SDropDnodeReq; typedef struct { int32_t dnodeId; char config[TSDB_DNODE_CONFIG_LEN]; } SMCfgDnodeReq, SDCfgDnodeReq; typedef struct { int32_t dnodeId; } SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq; typedef struct { int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; } SDCreateMnodeReq, SDAlterMnodeReq; typedef struct { int32_t dnodeId; } SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq; typedef struct { int32_t dnodeId; } SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq; typedef struct { int32_t dnodeId; } SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq; typedef struct { char sql[TSDB_SHOW_SQL_LEN]; int32_t queryId; int64_t useconds; int64_t stime; int64_t qId; int64_t sqlObjId; int32_t pid; char fqdn[TSDB_FQDN_LEN]; int8_t stableQuery; int32_t numOfSub; char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) } SQueryDesc; typedef struct { int32_t connId; int32_t pid; int32_t numOfQueries; int32_t numOfStreams; char app[TSDB_APP_NAME_LEN]; char pData[]; } SHeartBeatReq; typedef struct { int32_t connId; int32_t queryId; int32_t streamId; int32_t totalDnodes; int32_t onlineDnodes; int8_t killConnection; int8_t align[3]; SEpSet epSet; } SHeartBeatRsp; typedef struct { int32_t connId; int32_t queryId; } SKillQueryReq; typedef struct { int32_t connId; } SKillConnReq; typedef struct { char user[TSDB_USER_LEN]; char spi; char encrypt; char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; } SAuthReq, SAuthRsp; typedef struct { int8_t finished; int8_t align[7]; char name[TSDB_STEP_NAME_LEN]; char desc[TSDB_STEP_DESC_LEN]; } SStartupReq; /** * The layout of the query message payload is as following: * +--------------------+---------------------------------+ * |Sql statement | Physical plan | * |(denoted by sqlLen) |(In JSON, denoted by contentLen) | * +--------------------+---------------------------------+ */ typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; int8_t taskType; uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; } SSubQueryMsg; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SSinkDataReq; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SQueryContinueReq; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SResReadyReq; typedef struct { int32_t code; } SResReadyRsp; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SResFetchReq; typedef struct { SMsgHead header; uint64_t sId; } SSchTasksStatusReq; typedef struct { uint64_t queryId; uint64_t taskId; int8_t status; } STaskStatus; typedef struct { uint32_t num; STaskStatus status[]; } SSchedulerStatusRsp; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskCancelReq; typedef struct { int32_t code; } STaskCancelRsp; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskDropReq; typedef struct { int32_t code; } STaskDropRsp; typedef struct { int8_t igExists; char* name; char* sql; char* physicalPlan; char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { int tlen = 0; tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeString(buf, pReq->name); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan); return tlen; } static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->sql)); buf = taosDecodeString(buf, &(pReq->physicalPlan)); buf = taosDecodeString(buf, &(pReq->logicalPlan)); return buf; } typedef struct { int64_t topicId; } SCMCreateTopicRsp; static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->topicId); return tlen; } static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->topicId); return buf; } typedef struct { int32_t topicNum; int64_t consumerId; char* consumerGroup; SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->topicNum); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->consumerGroup); for (int i = 0; i < pReq->topicNum; i++) { tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->topicNum); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeString(buf, &pReq->consumerGroup); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); for (int i = 0; i < pReq->topicNum; i++) { char* name = NULL; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); } return buf; } typedef struct SMqSubTopic { int32_t vgId; int64_t topicId; SEpSet epSet; } SMqSubTopic; typedef struct { int32_t topicNum; SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pRsp->topicNum); for (int i = 0; i < pRsp->topicNum; i++) { tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &pRsp->topicNum); for (int i = 0; i < pRsp->topicNum; i++) { buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); } return buf; } typedef struct { int64_t topicId; int64_t consumerId; int64_t consumerGroupId; int64_t offset; char* sql; char* logicalPlan; char* physicalPlan; } SMVSubscribeReq; static FORCE_INLINE int tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) { int tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->topicId); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId); tlen += taosEncodeFixedI64(buf, pReq->offset); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); return tlen; } static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->topicId); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId); buf = taosDecodeFixedI64(buf, &pReq->offset); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); return buf; } typedef struct SMqTmrMsg { int32_t reserved; } SMqTmrMsg; typedef struct { int64_t status; } SMVSubscribeRsp; typedef struct { char name[TSDB_TOPIC_NAME_LEN]; int8_t igExists; int32_t execLen; void* executor; int32_t sqlLen; char* sql; } SCreateTopicReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SDropTopicReq; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; SSchema schema; } SAlterTopicReq; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t tuid; int32_t sverson; int32_t execLen; char* executor; int32_t sqlLen; char* sql; } SDCreateTopicReq; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; uint64_t tuid; } SDDropTopicReq; typedef struct SVCreateTbReq { uint64_t ver; // use a general definition char* name; uint32_t ttl; uint32_t keep; uint8_t type; union { struct { tb_uid_t suid; uint32_t nCols; SSchema* pSchema; uint32_t nTagCols; SSchema* pTagSchema; } stbCfg; struct { tb_uid_t suid; SKVRow pTag; } ctbCfg; struct { uint32_t nCols; SSchema* pSchema; } ntbCfg; }; } SVCreateTbReq; typedef struct { uint64_t ver; // use a general definition SArray* pArray; } SVCreateTbBatchReq; int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); typedef struct { SMsgHead head; } SVCreateTbRsp; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; int8_t ignoreNotExists; } SVAlterTbReq; typedef struct { SMsgHead head; } SVAlterTbRsp; typedef struct { uint64_t ver; char* name; uint8_t type; tb_uid_t suid; } SVDropTbReq; typedef struct { uint64_t ver; } SVDropTbRsp; int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq); int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp); void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp); typedef struct { SMsgHead head; int64_t uid; int32_t tid; int16_t tversion; int16_t colId; int8_t type; int16_t bytes; int32_t tagValLen; int16_t numOfTags; int32_t schemaLen; char data[]; } SUpdateTagValReq; typedef struct { SMsgHead head; } SUpdateTagValRsp; typedef struct { SMsgHead head; } SVShowTablesReq; typedef struct { int64_t id; STableMetaRsp metaInfo; } SVShowTablesRsp; typedef struct { SMsgHead head; int32_t id; } SVShowTablesFetchReq; typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; int32_t compLen; int32_t numOfRows; char data[]; } SVShowTablesFetchRsp; typedef struct SMqCMGetSubEpReq { int64_t consumerId; int32_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCMGetSubEpReq; #pragma pack(pop) static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->contLen); tlen += taosEncodeFixedI32(buf, pMsg->vgId); return tlen; } typedef struct SMqHbRsp { int8_t status; //idle or not int8_t vnodeChanged; int8_t epChanged; // should use new epset int8_t reserved; SEpSet epSet; } SMqHbRsp; static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI8(buf, pRsp->status); tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); tlen += taosEncodeFixedI8(buf, pRsp->epChanged); tlen += taosEncodeSEpSet(buf, &pRsp->epSet); return tlen; } static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { buf = taosDecodeFixedI8(buf, &pRsp->status); buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); buf = taosDecodeFixedI8(buf, &pRsp->epChanged); buf = taosDecodeSEpSet(buf, &pRsp->epSet); return buf; } typedef struct SMqHbOneTopicBatchRsp { char topicName[TSDB_TOPIC_FNAME_LEN]; SArray* rsps; // SArray } SMqHbOneTopicBatchRsp; static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { int tlen = 0; tlen += taosEncodeString(buf, pBatchRsp->topicName); int32_t sz = taosArrayGetSize(pBatchRsp->rsps); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); tlen += taosEncodeSMqHbRsp(buf, pRsp); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { int32_t sz; buf = taosDecodeStringTo(buf, pBatchRsp->topicName); buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); for (int32_t i = 0; i < sz; i++) { SMqHbRsp rsp; buf = taosDecodeSMqHbRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->rsps, &rsp); } return buf; } typedef struct SMqHbBatchRsp { int64_t consumerId; SArray* batchRsps; // SArray } SMqHbBatchRsp; static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { int tlen = 0; tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); int32_t sz; tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); for (int32_t i = 0; i < sz; i++) { SMqHbOneTopicBatchRsp rsp; buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); } return buf; } typedef struct { int32_t key; int32_t valueLen; void* value; } SKv; typedef struct { int32_t connId; int32_t hbType; } SClientHbKey; typedef struct { SClientHbKey connKey; SHashObj* info; // hash } SClientHbReq; typedef struct { int64_t reqId; SArray* reqs; // SArray } SClientHbBatchReq; typedef struct { SClientHbKey connKey; int32_t status; SArray* info; // Array } SClientHbRsp; typedef struct { int64_t reqId; int64_t rspId; SArray* rsps; // SArray } SClientHbBatchRsp; static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { void *pIter = taosHashIterate(info, NULL); while (pIter != NULL) { SKv* kv = (SKv*)pIter; tfree(kv->value); pIter = taosHashIterate(info, pIter); } } static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; if (req->info) { tFreeReqKvHash(req->info); taosHashCleanup(req->info); } } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; if (deep) { taosArrayDestroyEx(req->reqs, tFreeClientHbReq); } else { taosArrayDestroy(req->reqs); } free(pReq); } static FORCE_INLINE void tFreeClientKv(void *pKv) { SKv *kv = (SKv *)pKv; if (kv) { tfree(kv->value); } } static FORCE_INLINE void tFreeClientHbRsp(void *pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { SClientHbBatchRsp *rsp = (SClientHbBatchRsp*)pRsp; taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); } int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pKv->key); tlen += taosEncodeFixedI32(buf, pKv->valueLen); tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { buf = taosDecodeFixedI32(buf, &pKv->key); buf = taosDecodeFixedI32(buf, &pKv->valueLen); buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pKey->connId); tlen += taosEncodeFixedI32(buf, pKey->hbType); return tlen; } static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { buf = taosDecodeFixedI32(buf, &pKey->connId); buf = taosDecodeFixedI32(buf, &pKey->hbType); return buf; } typedef struct SMqHbVgInfo { int32_t vgId; } SMqHbVgInfo; static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); return tlen; } static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); return buf; } typedef struct SMqHbTopicInfo { int32_t epoch; int64_t topicUid; char name[TSDB_TOPIC_FNAME_LEN]; SArray* pVgInfo; } SMqHbTopicInfo; static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); tlen += taosEncodeString(buf, pTopicInfo->name); int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); tlen += taosEncodeSMqVgInfo(buf, pVgInfo); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); buf = taosDecodeStringTo(buf, pTopicInfo->name); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); for (int32_t i = 0; i < sz; i++) { SMqHbVgInfo vgInfo; buf = taosDecodeSMqVgInfo(buf, &vgInfo); taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); } return buf; } typedef struct SMqHbMsg { int32_t status; // ask hb endpoint int32_t epoch; int64_t consumerId; SArray* pTopics; // SArray } SMqHbMsg; static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->status); tlen += taosEncodeFixedI32(buf, pMsg->epoch); tlen += taosEncodeFixedI64(buf, pMsg->consumerId); int32_t sz = taosArrayGetSize(pMsg->pTopics); tlen += taosEncodeFixedI32(buf, sz); for (int i = 0; i < sz; i++) { SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); } return tlen; } static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { buf = taosDecodeFixedI32(buf, &pMsg->status); buf = taosDecodeFixedI32(buf, &pMsg->epoch); buf = taosDecodeFixedI64(buf, &pMsg->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); for (int i = 0; i < sz; i++) { SMqHbTopicInfo topicInfo; buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); taosArrayPush(pMsg->pTopics, &topicInfo); } return buf; } typedef struct SMqSetCVgReq { int64_t leftForVer; int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; char* sql; char* logicalPlan; char* physicalPlan; char* qmsg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedU64(buf, pMsg->sId); tlen += taosEncodeFixedU64(buf, pMsg->queryId); tlen += taosEncodeFixedU64(buf, pMsg->taskId); tlen += taosEncodeFixedU32(buf, pMsg->sqlLen); tlen += taosEncodeFixedU32(buf, pMsg->phyLen); //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); return tlen; } static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { buf = taosDecodeFixedU64(buf, &pMsg->sId); buf = taosDecodeFixedU64(buf, &pMsg->queryId); buf = taosDecodeFixedU64(buf, &pMsg->taskId); buf = taosDecodeFixedU32(buf, &pMsg->sqlLen); buf = taosDecodeFixedU32(buf, &pMsg->phyLen); //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); return buf; } static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->qmsg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->qmsg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } typedef struct SMqSetCVgRsp { SMsgHead header; int32_t vgId; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct { uint32_t nCols; SSchema *pSchema; } SSchemaWrapper; static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI32(buf, pSchema->colId); tlen += taosEncodeString(buf, pSchema->name); return tlen; } static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) { buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI32(buf, &pSchema->colId); buf = taosDecodeStringTo(buf, pSchema->name); return buf; } static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->nCols); for (int32_t i = 0; i < pSW->nCols; i ++) { tlen += tEncodeSSchema(buf, &pSW->pSchema[i]); } return tlen; } static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->nCols); pSW->pSchema = (SSchema*) calloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; } for (int32_t i = 0; i < pSW->nCols; i ++) { buf = tDecodeSSchema(buf, &pSW->pSchema[i]); } return buf; } typedef struct SMqTbData { int64_t uid; int32_t numOfRows; char* colData; } SMqTbData; typedef struct SMqTopicBlk { char topicName[TSDB_TOPIC_FNAME_LEN]; int64_t committedOffset; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; int32_t bodyLen; int32_t numOfTb; SMqTbData* tbData; } SMqTopicData; typedef struct SMqConsumeRsp { int64_t consumerId; SSchemaWrapper* schemas; int64_t committedOffset; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; int32_t numOfTopics; SArray* pBlockData; //SArray } SMqConsumeRsp; // one req for one vg+topic typedef struct SMqConsumeReq { SMsgHead head; //0: commit only, current offset //1: consume only, poll next offset //2: commit current and consume next offset int32_t reqType; int64_t reqId; int64_t consumerId; int64_t blockingTime; char cgroup[TSDB_CONSUMER_GROUP_LEN]; int64_t offset; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; typedef struct SMqSubVgEp { int32_t vgId; SEpSet epSet; } SMqSubVgEp; typedef struct SMqSubTopicEp { char topic[TSDB_TOPIC_FNAME_LEN]; SArray* vgs; // SArray } SMqSubTopicEp; typedef struct SMqCMGetSubEpRsp { int64_t consumerId; int64_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return buf; } static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { taosArrayDestroyEx(pRsp->topics, (void (*)(void*)) tDeleteSMqSubTopicEp); } static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); int32_t sz = taosArrayGetSize(pTopicEp->vgs); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); tlen += tEncodeSMqSubVgEp(buf, pVgEp); } return tlen; } static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { buf = taosDecodeStringTo(buf, pTopicEp->topic); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); if (pTopicEp->vgs == NULL) { return NULL; } for (int32_t i = 0; i < sz; i++) { SMqSubVgEp vgEp; buf = tDecodeSMqSubVgEp(buf, &vgEp); taosArrayPush(pTopicEp->vgs, &vgEp); } return buf; } static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); tlen += taosEncodeFixedI64(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i); tlen += tEncodeSMqSubTopicEp(buf, pVgEp); } return tlen; } static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); if (pRsp->topics == NULL) { return NULL; } for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp topicEp; buf = tDecodeSMqSubTopicEp(buf, &topicEp); taosArrayPush(pRsp->topics, &topicEp); } return buf; } #ifdef __cplusplus } #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/