/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef _TD_COMMON_TAOS_MSG_H_ #define _TD_COMMON_TAOS_MSG_H_ #include "taosdef.h" #include "taoserror.h" #include "tarray.h" #include "tcoding.h" #include "tencode.h" #include "thash.h" #include "tlist.h" #include "tname.h" #include "trow.h" #include "tuuid.h" #ifdef __cplusplus extern "C" { #endif /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ #define TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #define TD_MSG_SEG_CODE_ #include "tmsgdef.h" #undef TD_MSG_NUMBER_ #undef TD_MSG_DICT_ #undef TD_MSG_INFO_ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" extern char* tMsgInfo[]; extern int32_t tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; /* ------------------------ OTHER DEFINITIONS ------------------------ */ // IE type #define TSDB_IE_TYPE_SEC 1 #define TSDB_IE_TYPE_META 2 #define TSDB_IE_TYPE_MGMT_IP 3 #define TSDB_IE_TYPE_DNODE_CFG 4 #define TSDB_IE_TYPE_NEW_VERSION 5 #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; enum { HEARTBEAT_KEY_DBINFO = 1, HEARTBEAT_KEY_STBINFO, HEARTBEAT_KEY_MQ_TMP, }; typedef enum _mgmt_table { TSDB_MGMT_TABLE_START, TSDB_MGMT_TABLE_DNODE, TSDB_MGMT_TABLE_MNODE, TSDB_MGMT_TABLE_MODULE, TSDB_MGMT_TABLE_QNODE, TSDB_MGMT_TABLE_SNODE, TSDB_MGMT_TABLE_BNODE, TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_DB, TSDB_MGMT_TABLE_FUNC, TSDB_MGMT_TABLE_INDEX, TSDB_MGMT_TABLE_STB, TSDB_MGMT_TABLE_STREAMS, TSDB_MGMT_TABLE_TABLE, TSDB_MGMT_TABLE_USER, TSDB_MGMT_TABLE_GRANTS, TSDB_MGMT_TABLE_VGROUP, TSDB_MGMT_TABLE_TOPICS, TSDB_MGMT_TABLE_CONSUMERS, TSDB_MGMT_TABLE_SUBSCRIBES, TSDB_MGMT_TABLE_TRANS, TSDB_MGMT_TABLE_SMAS, TSDB_MGMT_TABLE_CONFIGS, TSDB_MGMT_TABLE_CONNS, TSDB_MGMT_TABLE_QUERIES, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_MAX, } EShowType; #define TSDB_ALTER_TABLE_ADD_TAG 1 #define TSDB_ALTER_TABLE_DROP_TAG 2 #define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 #define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 #define TSDB_ALTER_TABLE_ADD_COLUMN 5 #define TSDB_ALTER_TABLE_DROP_COLUMN 6 #define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 #define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 #define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9 #define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10 #define TSDB_FILL_NONE 0 #define TSDB_FILL_NULL 1 #define TSDB_FILL_SET_VALUE 2 #define TSDB_FILL_LINEAR 3 #define TSDB_FILL_PREV 4 #define TSDB_FILL_NEXT 5 #define TSDB_ALTER_USER_PASSWD 0x1 #define TSDB_ALTER_USER_SUPERUSER 0x2 #define TSDB_ALTER_USER_ADD_READ_DB 0x3 #define TSDB_ALTER_USER_REMOVE_READ_DB 0x4 #define TSDB_ALTER_USER_CLEAR_READ_DB 0x5 #define TSDB_ALTER_USER_ADD_WRITE_DB 0x6 #define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7 #define TSDB_ALTER_USER_CLEAR_WRITE_DB 0x8 #define TSDB_ALTER_USER_PRIVILEGES 0x2 #define TSDB_KILL_MSG_LEN 30 #define TSDB_TABLE_NUM_UNIT 100000 #define TSDB_VN_READ_ACCCESS ((char)0x1) #define TSDB_VN_WRITE_ACCCESS ((char)0x2) #define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS) #define TSDB_COL_NORMAL 0x0u // the normal column of the table #define TSDB_COL_TAG 0x1u // the tag column type #define TSDB_COL_UDC 0x2u // the user specified normal string column, it is a dummy column #define TSDB_COL_TMP 0x4u // internal column generated by the previous operators #define TSDB_COL_NULL 0x8u // the column filter NULL or not #define TSDB_COL_IS_TAG(f) (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0) #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) #define TD_SUPER_TABLE TSDB_SUPER_TABLE #define TD_CHILD_TABLE TSDB_CHILD_TABLE #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE typedef struct { int32_t vgId; char* dbFName; char* tbName; } SBuildTableMetaInput; typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SBuildUseDBInput; typedef struct SField { char name[TSDB_COL_NAME_LEN]; uint8_t type; int32_t bytes; } SField; typedef struct SRetention { int32_t freq; int32_t keep; int8_t freqUnit; int8_t keepUnit; } SRetention; #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta typedef struct SEp { char fqdn[TSDB_FQDN_LEN]; uint16_t port; } SEp; typedef struct { int32_t contLen; int32_t vgId; } SMsgHead; // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id int64_t suid; // stable id int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists int16_t numOfRows; // total number of rows in current submit block int16_t padding; // TODO just for padding here char data[]; } SSubmitBlk; // Submit message for this TSDB typedef struct { SMsgHead header; int64_t version; int32_t length; int32_t numOfBlocks; char blocks[]; } SSubmitReq; typedef struct { int32_t totalLen; int32_t len; STSRow* row; } SSubmitBlkIter; typedef struct { int32_t totalLen; int32_t len; // head of SSubmitBlk int64_t uid; // table unique id int64_t suid; // stable id int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head int32_t schemaLen; // schema length, if length is 0, no schema exists int16_t numOfRows; // total number of rows in current submit block // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); // TODO: KEEP one suite of iterator API finally. // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts // 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later // 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block int32_t sid; // table index of failed block int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table } SSubmitRspBlock; typedef struct { int32_t code; // 0-success, > 0 error code int32_t numOfRows; // number of records the client is trying to write int32_t affectedRows; // number of records actually written int32_t failedRows; // number of failed records (exclude duplicate records) int32_t numOfFailedBlocks; SSubmitRspBlock failedBlocks[]; } SSubmitRsp; #define SCHEMA_SMA_ON 0x1 #define SCHEMA_IDX_ON 0x2 typedef struct SSchema { int8_t type; int8_t flags; col_id_t colId; int32_t bytes; char name[TSDB_COL_NAME_LEN]; } SSchema; #define SSCHMEA_TYPE(s) ((s)->type) #define SSCHMEA_SMA(s) ((s)->sma) #define SSCHMEA_COLID(s) ((s)->colId) #define SSCHMEA_BYTES(s) ((s)->bytes) #define SSCHMEA_NAME(s) ((s)->name) STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igExists; float xFilesFactor; int32_t delay; int32_t ttl; int32_t numOfColumns; int32_t numOfTags; int32_t numOfSmas; int32_t commentLen; int32_t ast1Len; int32_t ast2Len; SArray* pColumns; // array of SField SArray* pTags; // array of SField SArray* pSmas; // array of SField char* comment; char* pAst1; char* pAst2; } SMCreateStbReq; int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); void tFreeSMCreateStbReq(SMCreateStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropStbReq; int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; int32_t numOfFields; SArray* pFields; } SMAltertbReq; int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAltertbReq* pReq); typedef struct SEpSet { int8_t inUse; int8_t numOfEps; SEp eps[TSDB_MAX_REPLICA]; } SEpSet; int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp); int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); typedef struct { SEpSet epSet; } SMEpSet; int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq); int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq); typedef struct { int8_t connType; int32_t pid; char app[TSDB_APP_NAME_LEN]; char db[TSDB_DB_NAME_LEN]; char user[TSDB_USER_LEN]; char passwd[TSDB_PASSWORD_LEN]; int64_t startTime; } SConnectReq; int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { int32_t acctId; int64_t clusterId; uint32_t connId; int8_t superUser; int8_t connType; SEpSet epSet; char sVersion[128]; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); typedef struct { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; int32_t maxUsers; int32_t maxDbs; int32_t maxTimeSeries; int32_t maxStreams; int32_t accessState; // Configured only by command int64_t maxStorage; } SCreateAcctReq, SAlterAcctReq; int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq); int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq); typedef struct { char user[TSDB_USER_LEN]; } SDropUserReq, SDropAcctReq; int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq); typedef struct { int8_t createType; int8_t superUser; // denote if it is a super user or not char user[TSDB_USER_LEN]; char pass[TSDB_USET_PASSWORD_LEN]; } SCreateUserReq; int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq); typedef struct { int8_t alterType; int8_t superUser; char user[TSDB_USER_LEN]; char pass[TSDB_USET_PASSWORD_LEN]; char dbname[TSDB_DB_FNAME_LEN]; } SAlterUserReq; int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq); int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq); typedef struct { char user[TSDB_USER_LEN]; } SGetUserAuthReq; int32_t tSerializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq); int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq); typedef struct { char user[TSDB_USER_LEN]; int8_t superAuth; SHashObj* readDbs; SHashObj* writeDbs; } SGetUserAuthRsp; int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp); typedef struct { int16_t colId; // column id int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t flag; // denote if it is a tag or a normal column char name[TSDB_DB_FNAME_LEN]; } SColIndex; typedef struct { int16_t lowerRelOptr; int16_t upperRelOptr; int16_t filterstr; // denote if current column is char(binary/nchar) union { struct { int64_t lowerBndi; int64_t upperBndi; }; struct { double lowerBndd; double upperBndd; }; struct { int64_t pz; int64_t len; }; }; } SColumnFilterInfo; typedef struct { int16_t numOfFilters; union { int64_t placeholder; SColumnFilterInfo* filterInfo; }; } SColumnFilterList; /* * for client side struct, only column id, type, bytes are necessary * But for data in vnode side, we need all the following information. */ typedef struct { union { col_id_t colId; int16_t slotId; }; int16_t type; int32_t bytes; uint8_t precision; uint8_t scale; } SColumnInfo; typedef struct { int64_t uid; TSKEY key; // last accessed ts, for subscription } STableIdInfo; typedef struct STimeWindow { TSKEY skey; TSKEY ekey; } STimeWindow; typedef struct { int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsOrder; // ts comp block order } STsBufInfo; typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; int64_t offset; } SInterval; typedef struct { int32_t code; } SQueryTableRsp; int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t numOfVgroups; int32_t cacheBlockSize; // MB int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t minRows; int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; int32_t ttl; int8_t walLevel; int8_t precision; // time resolution int8_t compression; int8_t replications; int8_t strict; int8_t update; int8_t cacheLastRow; int8_t ignoreExist; int8_t streamMode; int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; // SRetention } SCreateDbReq; int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); int32_t tDeserializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq); void tFreeSCreateDbReq(SCreateDbReq* pReq); typedef struct { char db[TSDB_DB_FNAME_LEN]; int32_t totalBlocks; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t fsyncPeriod; int8_t walLevel; int8_t strict; int8_t cacheLastRow; int8_t replications; } SAlterDbReq; int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq); int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq); typedef struct { char db[TSDB_DB_FNAME_LEN]; int8_t ignoreNotExists; } SDropDbReq; int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq); typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t uid; } SDropDbRsp; int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; SArray* pVgroupInfos; // Array of SVgroupInfo } SUseDbRsp; int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp); int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp); int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp); void tFreeSUsedbRsp(SUseDbRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; } SDbCfgReq; int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq); typedef struct { int32_t numOfVgroups; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t minRows; int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; int32_t ttl; int8_t walLevel; int8_t precision; int8_t compression; int8_t replications; int8_t strict; int8_t update; int8_t cacheLastRow; int8_t streamMode; int8_t singleSTable; int32_t numOfRetensions; SArray* pRetensions; } SDbCfgRsp; int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp); int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp); typedef struct { int32_t rowNum; } SQnodeListReq; int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq); typedef struct { SArray* addrsList; // SArray } SQnodeListRsp; int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp); void tFreeSQnodeListRsp(SQnodeListRsp* pRsp); typedef struct SQueryNodeAddr { int32_t nodeId; // vgId or qnodeId SEpSet epSet; } SQueryNodeAddr; typedef struct { SArray* pArray; // Array of SUseDbRsp } SUseDbBatchRsp; int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp); void tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; } SCompactDbReq; int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); int32_t tDeserializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq); typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t igExists; int8_t funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; int32_t codeLen; int64_t signature; char* pComment; char* pCode; } SCreateFuncReq; int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq); int32_t tDeserializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq); void tFreeSCreateFuncReq(SCreateFuncReq* pReq); typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t igNotExists; } SDropFuncReq; int32_t tSerializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq); int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq); typedef struct { int32_t numOfFuncs; bool ignoreCodeComment; SArray* pFuncNames; } SRetrieveFuncReq; int32_t tSerializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq); int32_t tDeserializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq); void tFreeSRetrieveFuncReq(SRetrieveFuncReq* pReq); typedef struct { char name[TSDB_FUNC_NAME_LEN]; int8_t funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; int64_t signature; int32_t commentSize; int32_t codeSize; char* pComment; char* pCode; } SFuncInfo; typedef struct { int32_t numOfFuncs; SArray* pFuncInfos; } SRetrieveFuncRsp; int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); void tFreeSFuncInfo(SFuncInfo* pInfo); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); typedef struct { int32_t statusInterval; int64_t checkTime; // 1970-01-01 00:00:00.000 char timezone[TD_TIMEZONE_LEN]; // tsTimezone char locale[TD_LOCALE_LEN]; // tsLocale char charset[TD_LOCALE_LEN]; // tsCharset } SClusterCfg; typedef struct { int32_t openVnodes; int32_t totalVnodes; int32_t masterNum; int64_t numOfSelectReqs; int64_t numOfInsertReqs; int64_t numOfInsertSuccessReqs; int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; int64_t errors; } SVnodesStat; typedef struct { int32_t vgId; int32_t syncState; int64_t numOfTables; int64_t numOfTimeSeries; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; int64_t numOfSelectReqs; int64_t numOfInsertReqs; int64_t numOfInsertSuccessReqs; int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; } SVnodeLoad; typedef struct { int32_t syncState; } SMnodeLoad; typedef struct { int32_t sver; // software version int64_t dnodeVer; // dnode table version in sdb int32_t dnodeId; int64_t clusterId; int64_t rebootTime; int64_t updateTime; int32_t numOfCores; int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); typedef struct { int32_t dnodeId; int64_t clusterId; } SDnodeCfg; typedef struct { int32_t id; int8_t isMnode; SEp ep; } SDnodeEp; typedef struct { int64_t dnodeVer; SDnodeCfg dnodeCfg; SArray* pDnodeEps; // Array of SDnodeEp } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); void tFreeSStatusRsp(SStatusRsp* pRsp); typedef struct { int32_t reserved; } SMTimerReq; int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); typedef struct { int32_t id; uint16_t port; // node sync Port char fqdn[TSDB_FQDN_LEN]; // node FQDN } SReplica; typedef struct { int32_t vgId; int32_t dnodeId; char db[TSDB_DB_FNAME_LEN]; int64_t dbUid; int32_t vgVersion; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int32_t minRows; int32_t maxRows; int32_t commitTime; int32_t fsyncPeriod; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; int8_t walLevel; int8_t precision; int8_t compression; int8_t strict; int8_t update; int8_t cacheLastRow; int8_t replica; int8_t selfIndex; int8_t streamMode; SReplica replicas[TSDB_MAX_REPLICA]; int32_t numOfRetensions; SArray* pRetensions; // SRetention } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq); typedef struct { int32_t vgId; int32_t dnodeId; int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; } SDropVnodeReq; int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); typedef struct { int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; } SCompactVnodeReq; int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); typedef struct { int32_t vgVersion; int32_t totalBlocks; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; int8_t walLevel; int8_t strict; int8_t cacheLastRow; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; } SAlterVnodeReq; int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); typedef struct { SMsgHead header; char dbFName[TSDB_DB_FNAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; } STableInfoReq; int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq); int32_t tDeserializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq); typedef struct { int8_t metaClone; // create local clone of the cached table meta int32_t numOfVgroups; int32_t numOfTables; int32_t numOfUdfs; char tableNames[]; } SMultiTableInfoReq; // todo refactor typedef struct SVgroupInfo { int32_t vgId; uint32_t hashBegin; uint32_t hashEnd; SEpSet epSet; union { int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT int32_t taskId; // used in stream }; } SVgroupInfo; typedef struct { int32_t numOfVgroups; SVgroupInfo vgroups[]; } SVgroupsInfo; typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t numOfTags; int32_t numOfColumns; int8_t precision; int8_t tableType; int8_t update; int32_t sversion; int32_t tversion; uint64_t suid; uint64_t tuid; int32_t vgId; SSchema* pSchemas; } STableMetaRsp; int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); void tFreeSTableMetaRsp(STableMetaRsp* pRsp); typedef struct { SArray* pArray; // Array of STableMetaRsp } STableMetaBatchRsp; int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp); typedef struct { int32_t numOfTables; int32_t numOfVgroup; int32_t numOfUdf; int32_t contLen; int8_t compressed; // denote if compressed or not int32_t rawLen; // size before compress uint8_t metaClone; // make meta clone after retrieve meta from mnode char meta[]; } SMultiTableMeta; typedef struct { int32_t dataLen; char name[TSDB_TABLE_FNAME_LEN]; char* data; } STagData; /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' * payloadLen is the length of payload */ typedef struct { int32_t type; char db[TSDB_DB_FNAME_LEN]; int32_t payloadLen; char* payload; } SShowReq; int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); void tFreeSShowReq(SShowReq* pReq); typedef struct { int64_t showId; STableMetaRsp tableMeta; } SShowRsp, SVShowTablesRsp; int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp); typedef struct { char db[TSDB_DB_FNAME_LEN]; char tb[TSDB_TABLE_NAME_LEN]; int64_t showId; } SRetrieveTableReq; typedef struct SSysTableSchema { int8_t type; col_id_t colId; int32_t bytes; } SSysTableSchema; int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq); typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; int32_t compLen; int32_t numOfRows; char data[]; } SRetrieveTableRsp; typedef struct { int64_t handle; int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; int32_t compLen; int32_t numOfRows; char data[]; } SRetrieveMetaTableRsp; typedef struct SExplainExecInfo { uint64_t startupCost; uint64_t totalCost; uint64_t numOfRows; void* verboseInfo; } SExplainExecInfo; typedef struct { int32_t numOfPlans; SExplainExecInfo* subplanInfo; } SExplainRsp; int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; } SCreateDnodeReq; int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq); int32_t tDeserializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq); typedef struct { int32_t dnodeId; char fqdn[TSDB_FQDN_LEN]; int32_t port; } SDropDnodeReq; int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq); int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq); typedef struct { int32_t dnodeId; char config[TSDB_DNODE_CONFIG_LEN]; char value[TSDB_DNODE_VALUE_LEN]; } SMCfgDnodeReq, SDCfgDnodeReq; int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq); int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq); typedef struct { int32_t dnodeId; } SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq, SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq, SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq, SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq; int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); int32_t tDeserializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq); typedef struct { int32_t dnodeId; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; } SDCreateMnodeReq, SDAlterMnodeReq; int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); typedef struct { int32_t connId; int32_t queryId; } SKillQueryReq; int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq); typedef struct { int32_t connId; } SKillConnReq; int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); typedef struct { int32_t transId; } SKillTransReq; int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); typedef struct { char user[TSDB_USER_LEN]; char spi; char encrypt; char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; } SAuthReq, SAuthRsp; int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq); int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq); typedef struct { int32_t statusCode; char details[1024]; } SServerStatusRsp; int32_t tSerializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp); int32_t tDeserializeSServerStatusRsp(void* buf, int32_t bufLen, SServerStatusRsp* pRsp); /** * The layout of the query message payload is as following: * +--------------------+---------------------------------+ * |Sql statement | Physical plan | * |(denoted by sqlLen) |(In JSON, denoted by contentLen) | * +--------------------+---------------------------------+ */ typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; int64_t refId; int8_t taskType; int8_t explain; uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; } SSubQueryMsg; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SSinkDataReq; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SQueryContinueReq; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SResReadyReq; typedef struct { int32_t code; } SResReadyRsp; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; } SResFetchReq; typedef struct { SMsgHead header; uint64_t sId; } SSchTasksStatusReq; typedef struct { uint64_t queryId; uint64_t taskId; int64_t refId; int8_t status; } STaskStatus; typedef struct { int64_t refId; SArray* taskStatus; // SArray } SSchedulerStatusRsp; typedef struct { uint64_t queryId; uint64_t taskId; int8_t action; } STaskAction; typedef struct SQueryNodeEpId { int32_t nodeId; // vgId or qnodeId SEp ep; } SQueryNodeEpId; typedef struct { SMsgHead header; uint64_t sId; SQueryNodeEpId epId; SArray* taskAction; // SArray } SSchedulerHbReq; int32_t tSerializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq); int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq); void tFreeSSchedulerHbReq(SSchedulerHbReq* pReq); typedef struct { SQueryNodeEpId epId; SArray* taskStatus; // SArray } SSchedulerHbRsp; int32_t tSerializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp); int32_t tDeserializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp); void tFreeSSchedulerHbRsp(SSchedulerHbRsp* pRsp); typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; int64_t refId; } STaskCancelReq; typedef struct { int32_t code; } STaskCancelRsp; typedef struct { SMsgHead header; uint64_t sId; uint64_t queryId; uint64_t taskId; int64_t refId; } STaskDropReq; typedef struct { int32_t code; } STaskDropRsp; #define STREAM_TRIGGER_AT_ONCE 1 #define STREAM_TRIGGER_WINDOW_CLOSE 2 typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char outputSTbName[TSDB_TABLE_FNAME_LEN]; int8_t igExists; char* sql; char* ast; int8_t triggerType; int64_t watermark; } SCMCreateStreamReq; typedef struct { int64_t streamId; } SCMCreateStreamRsp; int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq); int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int64_t streamId; char* sql; char* executorMsg; } SMVCreateStreamReq, SMSCreateStreamReq; typedef struct { int64_t streamId; } SMVCreateStreamRsp, SMSCreateStreamRsp; typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic int8_t igExists; int8_t withTbName; int8_t withSchema; int8_t withTag; char* sql; char* ast; char subscribeDbName[TSDB_DB_NAME_LEN]; } SCMCreateTopicReq; int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq); int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq); void tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq); typedef struct { int64_t topicId; } SCMCreateTopicRsp; int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp); int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp); typedef struct { int64_t consumerId; } SMqConsumerLostMsg; typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->cgroup); int32_t topicNum = taosArrayGetSize(pReq->topicNames); tlen += taosEncodeFixedI32(buf, topicNum); for (int32_t i = 0; i < topicNum; i++) { tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->cgroup); int32_t topicNum; buf = taosDecodeFixedI32(buf, &topicNum); pReq->topicNames = taosArrayInit(topicNum, sizeof(void*)); for (int32_t i = 0; i < topicNum; i++) { char* name; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); } return buf; } typedef struct SMqSubTopic { int32_t vgId; int64_t topicId; SEpSet epSet; } SMqSubTopic; typedef struct { int32_t topicNum; SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pRsp->topicNum); for (int32_t i = 0; i < pRsp->topicNum; i++) { tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); } return tlen; } static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &pRsp->topicNum); for (int32_t i = 0; i < pRsp->topicNum; i++) { buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); } return buf; } typedef struct { int64_t topicId; int64_t consumerId; int64_t consumerGroupId; int64_t offset; char* sql; char* logicalPlan; char* physicalPlan; } SMVSubscribeReq; static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->topicId); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId); tlen += taosEncodeFixedI64(buf, pReq->offset); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); return tlen; } static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->topicId); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId); buf = taosDecodeFixedI64(buf, &pReq->offset); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); return buf; } typedef struct { char key[TSDB_SUBSCRIBE_KEY_LEN]; SArray* lostConsumers; // SArray SArray* removedConsumers; // SArray SArray* newConsumers; // SArray } SMqRebSubscribe; static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe)); if (pRebSub == NULL) { goto _err; } strcpy(pRebSub->key, key); pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebSub->lostConsumers == NULL) { goto _err; } pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebSub->removedConsumers == NULL) { goto _err; } pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebSub->newConsumers == NULL) { goto _err; } return pRebSub; _err: taosArrayDestroy(pRebSub->lostConsumers); taosArrayDestroy(pRebSub->removedConsumers); taosArrayDestroy(pRebSub->newConsumers); taosMemoryFreeClear(pRebSub); return NULL; } // this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or // deserialization typedef struct { int8_t* mqInReb; SHashObj* rebSubHash; // SHashObj } SMqDoRebalanceMsg; typedef struct { int64_t status; } SMVSubscribeRsp; typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropTopicReq; int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; SSchema schema; } SAlterTopicReq; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; int64_t tuid; int32_t sverson; int32_t execLen; char* executor; int32_t sqlLen; char* sql; } SDCreateTopicReq; typedef struct { SMsgHead head; char name[TSDB_TABLE_FNAME_LEN]; int64_t tuid; } SDDropTopicReq; typedef struct { float xFilesFactor; int32_t delay; int32_t qmsg1Len; int32_t qmsg2Len; char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1 char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 } SRSmaParam; typedef struct SVCreateTbReq { char* name; uint32_t ttl; uint32_t keep; union { uint8_t info; struct { uint8_t rollup : 1; // 1 means rollup sma uint8_t type : 7; }; }; union { struct { tb_uid_t suid; col_id_t nCols; col_id_t nBSmaCols; SSchema* pSchema; col_id_t nTagCols; SSchema* pTagSchema; SRSmaParam* pRSmaParam; } stbCfg; struct { tb_uid_t suid; SKVRow pTag; } ctbCfg; struct { col_id_t nCols; col_id_t nBSmaCols; SSchema* pSchema; SRSmaParam* pRSmaParam; } ntbCfg; }; } SVCreateTbReq, SVUpdateTbReq; typedef struct { int32_t code; } SVCreateTbRsp, SVUpdateTbRsp; int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); typedef struct { int64_t ver; // use a general definition SArray* pArray; } SVCreateTbBatchReq; int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { SArray* rspList; // SArray } SVCreateTbBatchRsp; int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp); typedef struct { int64_t ver; char* name; uint8_t type; tb_uid_t suid; } SVDropTbReq; typedef struct { int tmp; // TODO: to avoid compile error } SVDropTbRsp; int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq); typedef struct { SMsgHead head; int64_t uid; int32_t tid; int16_t tversion; int16_t colId; int8_t type; int16_t bytes; int32_t tagValLen; int16_t numOfTags; int32_t schemaLen; char data[]; } SUpdateTagValReq; typedef struct { SMsgHead head; } SUpdateTagValRsp; typedef struct { SMsgHead head; } SVShowTablesReq; typedef struct { SMsgHead head; int32_t id; } SVShowTablesFetchReq; typedef struct { int64_t useconds; int8_t completed; // all results are returned to client int8_t precision; int8_t compressed; int32_t compLen; int32_t numOfRows; char data[]; } SVShowTablesFetchRsp; typedef struct SMqCMGetSubEpReq { int64_t consumerId; int32_t epoch; char cgroup[TSDB_CGROUP_LEN]; } SMqCMGetSubEpReq; static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->contLen); tlen += taosEncodeFixedI32(buf, pMsg->vgId); return tlen; } typedef struct SMqHbRsp { int8_t status; // idle or not int8_t vnodeChanged; int8_t epChanged; // should use new epset int8_t reserved; SEpSet epSet; } SMqHbRsp; static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pRsp->status); tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); tlen += taosEncodeFixedI8(buf, pRsp->epChanged); tlen += taosEncodeSEpSet(buf, &pRsp->epSet); return tlen; } static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { buf = taosDecodeFixedI8(buf, &pRsp->status); buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); buf = taosDecodeFixedI8(buf, &pRsp->epChanged); buf = taosDecodeSEpSet(buf, &pRsp->epSet); return buf; } typedef struct SMqHbOneTopicBatchRsp { char topicName[TSDB_TOPIC_FNAME_LEN]; SArray* rsps; // SArray } SMqHbOneTopicBatchRsp; static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pBatchRsp->topicName); int32_t sz = taosArrayGetSize(pBatchRsp->rsps); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); tlen += taosEncodeSMqHbRsp(buf, pRsp); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { int32_t sz; buf = taosDecodeStringTo(buf, pBatchRsp->topicName); buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); for (int32_t i = 0; i < sz; i++) { SMqHbRsp rsp; buf = taosDecodeSMqHbRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->rsps, &rsp); } return buf; } typedef struct SMqHbBatchRsp { int64_t consumerId; SArray* batchRsps; // SArray } SMqHbBatchRsp; static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); int32_t sz; tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i); tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); for (int32_t i = 0; i < sz; i++) { SMqHbOneTopicBatchRsp rsp; buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); } return buf; } typedef struct { int32_t key; int32_t valueLen; void* value; } SKv; typedef struct { int64_t tscRid; int8_t connType; } SClientHbKey; typedef struct { int64_t tid; int32_t status; } SQuerySubDesc; typedef struct { char sql[TSDB_SHOW_SQL_LEN]; uint64_t queryId; int64_t useconds; int64_t stime; int64_t reqRid; int32_t pid; char fqdn[TSDB_FQDN_LEN]; int32_t subPlanNum; SArray* subDesc; // SArray } SQueryDesc; typedef struct { uint32_t connId; int32_t pid; char app[TSDB_APP_NAME_LEN]; SArray* queryDesc; // SArray } SQueryHbReqBasic; typedef struct { uint32_t connId; uint64_t killRid; int32_t totalDnodes; int32_t onlineDnodes; int8_t killConnection; int8_t align[3]; SEpSet epSet; } SQueryHbRspBasic; typedef struct { SClientHbKey connKey; SQueryHbReqBasic* query; SHashObj* info; // hash } SClientHbReq; typedef struct { int64_t reqId; SArray* reqs; // SArray } SClientHbBatchReq; typedef struct { SClientHbKey connKey; int32_t status; SQueryHbRspBasic* query; SArray* info; // Array } SClientHbRsp; typedef struct { int64_t reqId; int64_t rspId; SArray* rsps; // SArray } SClientHbBatchRsp; static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) { void* pIter = taosHashIterate(info, NULL); while (pIter != NULL) { SKv* kv = (SKv*)pIter; taosMemoryFreeClear(kv->value); pIter = taosHashIterate(info, pIter); } } static FORCE_INLINE void tFreeClientHbQueryDesc(void* pDesc) { SQueryDesc* desc = (SQueryDesc*)pDesc; if (desc->subDesc) { taosArrayDestroy(desc->subDesc); desc->subDesc = NULL; } } static FORCE_INLINE void tFreeClientHbReq(void* pReq) { SClientHbReq* req = (SClientHbReq*)pReq; if (req->query) { if (req->query->queryDesc) { taosArrayDestroyEx(req->query->queryDesc, tFreeClientHbQueryDesc); } taosMemoryFreeClear(req->query); } if (req->info) { tFreeReqKvHash(req->info); taosHashCleanup(req->info); } } int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq); int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq); static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { SClientHbBatchReq* req = (SClientHbBatchReq*)pReq; if (deep) { taosArrayDestroyEx(req->reqs, tFreeClientHbReq); } else { taosArrayDestroy(req->reqs); } taosMemoryFree(pReq); } static FORCE_INLINE void tFreeClientKv(void* pKv) { SKv* kv = (SKv*)pKv; if (kv) { taosMemoryFreeClear(kv->value); } } static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) { SClientHbRsp* rsp = (SClientHbRsp*)pRsp; taosMemoryFreeClear(rsp->query); if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv); } static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) { SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp; taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp); } int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp); int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp); static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) { if (tEncodeI32(pEncoder, pKv->key) < 0) return -1; if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1; if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1; return 0; } static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) { if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1; if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1; pKv->value = taosMemoryMalloc(pKv->valueLen + 1); if (pKv->value == NULL) return -1; if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1; return 0; } static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) { if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1; if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1; return 0; } static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) { if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1; if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1; return 0; } typedef struct SMqHbVgInfo { int32_t vgId; } SMqHbVgInfo; static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); return tlen; } static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); return buf; } typedef struct SMqHbTopicInfo { int32_t epoch; int64_t topicUid; char name[TSDB_TOPIC_FNAME_LEN]; SArray* pVgInfo; } SMqHbTopicInfo; static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); tlen += taosEncodeString(buf, pTopicInfo->name); int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); tlen += taosEncodeSMqVgInfo(buf, pVgInfo); } return tlen; } static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); buf = taosDecodeStringTo(buf, pTopicInfo->name); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); for (int32_t i = 0; i < sz; i++) { SMqHbVgInfo vgInfo; buf = taosDecodeSMqVgInfo(buf, &vgInfo); taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); } return buf; } typedef struct SMqHbMsg { int32_t status; // ask hb endpoint int32_t epoch; int64_t consumerId; SArray* pTopics; // SArray } SMqHbMsg; static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pMsg->status); tlen += taosEncodeFixedI32(buf, pMsg->epoch); tlen += taosEncodeFixedI64(buf, pMsg->consumerId); int32_t sz = taosArrayGetSize(pMsg->pTopics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); } return tlen; } static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { buf = taosDecodeFixedI32(buf, &pMsg->status); buf = taosDecodeFixedI32(buf, &pMsg->epoch); buf = taosDecodeFixedI64(buf, &pMsg->consumerId); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); for (int32_t i = 0; i < sz; i++) { SMqHbTopicInfo topicInfo; buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); taosArrayPush(pMsg->pTopics, &topicInfo); } return buf; } enum { TOPIC_SUB_TYPE__DB = 1, TOPIC_SUB_TYPE__TABLE, }; typedef struct { int64_t leftForVer; int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; int8_t withTbName; int8_t withSchema; int8_t withTag; char* qmsg; } SMqRebVgReq; static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeString(buf, pReq->subKey); tlen += taosEncodeFixedI8(buf, pReq->subType); tlen += taosEncodeFixedI8(buf, pReq->withTbName); tlen += taosEncodeFixedI8(buf, pReq->withSchema); tlen += taosEncodeFixedI8(buf, pReq->withTag); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { tlen += taosEncodeString(buf, pReq->qmsg); } return tlen; } static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeStringTo(buf, pReq->subKey); buf = taosDecodeFixedI8(buf, &pReq->subType); buf = taosDecodeFixedI8(buf, &pReq->withTbName); buf = taosDecodeFixedI8(buf, &pReq->withSchema); buf = taosDecodeFixedI8(buf, &pReq->withTag); if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { buf = taosDecodeString(buf, &pReq->qmsg); } return (void*)buf; } typedef struct { int8_t reserved; } SMqRebVgRsp; typedef struct { int64_t leftForVer; int32_t vgId; int32_t epoch; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; char* sql; char* physicalPlan; char* qmsg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->epoch); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->cgroup); tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->qmsg); return tlen; } static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI32(buf, &pReq->epoch); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->cgroup); buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->qmsg); return buf; } typedef struct { int32_t vgId; int64_t offset; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; } SMqOffset; typedef struct { int32_t num; SMqOffset* offsets; } SMqCMCommitOffsetReq; typedef struct { int32_t reserved; } SMqCMCommitOffsetRsp; int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset); int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset); int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq); int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq); typedef struct { uint32_t nCols; SSchema* pSchema; } SSchemaWrapper; static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { taosMemoryFree(pSW); return NULL; } memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema)); return pSW; } static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) { taosMemoryFree(pSchemaWrapper->pSchema); taosMemoryFree(pSchemaWrapper); } static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pSchema->type); tlen += taosEncodeFixedI8(buf, pSchema->flags); tlen += taosEncodeFixedI32(buf, pSchema->bytes); tlen += taosEncodeFixedI16(buf, pSchema->colId); tlen += taosEncodeString(buf, pSchema->name); return tlen; } static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) { buf = taosDecodeFixedI8(buf, &pSchema->type); buf = taosDecodeFixedI8(buf, &pSchema->flags); buf = taosDecodeFixedI32(buf, &pSchema->bytes); buf = taosDecodeFixedI16(buf, &pSchema->colId); buf = taosDecodeStringTo(buf, pSchema->name); return (void*)buf; } static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) { if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1; if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1; if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1; if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1; if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1; return 0; } static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) { if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1; if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1; if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1; if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1; if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1; return 0; } static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->nCols); for (int32_t i = 0; i < pSW->nCols; i++) { tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); } return tlen; } static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->nCols); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; } for (int32_t i = 0; i < pSW->nCols; i++) { buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); } return (void*)buf; } static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) { if (tEncodeU32(pEncoder, pSW->nCols) < 0) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; } return pEncoder->pos; } static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeU32(pDecoder, &pSW->nCols) < 0) return -1; void* ptr = taosMemoryRealloc(pSW->pSchema, pSW->nCols * sizeof(SSchema)); if (ptr == NULL) { return -1; } pSW->pSchema = (SSchema*)ptr; for (int32_t i = 0; i < pSW->nCols; i++) { if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; } return 0; } typedef struct { char name[TSDB_TABLE_FNAME_LEN]; char stb[TSDB_TABLE_FNAME_LEN]; int8_t igExists; int8_t intervalUnit; int8_t slidingUnit; int8_t timezone; int32_t dstVgId; // for stream int64_t interval; int64_t offset; int64_t sliding; int32_t exprLen; // strlen + 1 int32_t tagsFilterLen; // strlen + 1 int32_t sqlLen; // strlen + 1 int32_t astLen; // strlen + 1 char* expr; char* tagsFilter; char* sql; char* ast; } SMCreateSmaReq; int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); int32_t tDeserializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); void tFreeSMCreateSmaReq(SMCreateSmaReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropSmaReq; int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX int8_t slidingUnit; // MACRO: TIME_UNIT_XXX int8_t timezoneInt; // sma data expired if timezone changes. char indexName[TSDB_INDEX_NAME_LEN]; int32_t exprLen; int32_t tagsFilterLen; int64_t indexUid; tb_uid_t tableUid; // super/child/common table uid int64_t interval; int64_t offset; // use unit by precision of DB int64_t sliding; char* expr; // sma expression char* tagsFilter; } STSma; // Time-range-wise SMA typedef struct { int64_t ver; // use a general definition STSma tSma; } SVCreateTSmaReq; typedef struct { int8_t type; // 0 status report, 1 update data int64_t indexUid; int64_t skey; // start TS key of interval/sliding window } STSmaMsg; typedef struct { int64_t ver; // use a general definition int64_t indexUid; char indexName[TSDB_INDEX_NAME_LEN]; } SVDropTSmaReq; typedef struct { int tmp; // TODO: to avoid compile error } SVCreateTSmaRsp, SVDropTSmaRsp; int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq); void* tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq); int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq); void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq); // RSma: Rollup SMA typedef struct { int64_t interval; int32_t retention; // unit: day uint16_t days; // unit: day int8_t intervalUnit; } SSmaParams; typedef struct { STSma tsma; float xFilesFactor; SArray* smaParams; // SSmaParams } SRSma; typedef struct { uint32_t number; STSma* tSma; } STSmaWrapper; static FORCE_INLINE void tdDestroyTSma(STSma* pSma) { if (pSma) { taosMemoryFreeClear(pSma->expr); taosMemoryFreeClear(pSma->tagsFilter); } } static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { if (pSW) { if (pSW->tSma) { for (uint32_t i = 0; i < pSW->number; ++i) { tdDestroyTSma(pSW->tSma + i); } taosMemoryFreeClear(pSW->tSma); } } } static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW) { tdDestroyTSmaWrapper(pSW); taosMemoryFree(pSW); return NULL; } static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { int32_t tlen = 0; tlen += taosEncodeFixedI8(buf, pSma->version); tlen += taosEncodeFixedI8(buf, pSma->intervalUnit); tlen += taosEncodeFixedI8(buf, pSma->slidingUnit); tlen += taosEncodeFixedI8(buf, pSma->timezoneInt); tlen += taosEncodeString(buf, pSma->indexName); tlen += taosEncodeFixedI32(buf, pSma->exprLen); tlen += taosEncodeFixedI32(buf, pSma->tagsFilterLen); tlen += taosEncodeFixedI64(buf, pSma->indexUid); tlen += taosEncodeFixedI64(buf, pSma->tableUid); tlen += taosEncodeFixedI64(buf, pSma->interval); tlen += taosEncodeFixedI64(buf, pSma->offset); tlen += taosEncodeFixedI64(buf, pSma->sliding); if (pSma->exprLen > 0) { tlen += taosEncodeString(buf, pSma->expr); } if (pSma->tagsFilterLen > 0) { tlen += taosEncodeString(buf, pSma->tagsFilter); } return tlen; } static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) { int32_t tlen = 0; tlen += taosEncodeFixedU32(buf, pSW->number); for (uint32_t i = 0; i < pSW->number; ++i) { tlen += tEncodeTSma(buf, pSW->tSma + i); } return tlen; } static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { buf = taosDecodeFixedI8(buf, &pSma->version); buf = taosDecodeFixedI8(buf, &pSma->intervalUnit); buf = taosDecodeFixedI8(buf, &pSma->slidingUnit); buf = taosDecodeFixedI8(buf, &pSma->timezoneInt); buf = taosDecodeStringTo(buf, pSma->indexName); buf = taosDecodeFixedI32(buf, &pSma->exprLen); buf = taosDecodeFixedI32(buf, &pSma->tagsFilterLen); buf = taosDecodeFixedI64(buf, &pSma->indexUid); buf = taosDecodeFixedI64(buf, &pSma->tableUid); buf = taosDecodeFixedI64(buf, &pSma->interval); buf = taosDecodeFixedI64(buf, &pSma->offset); buf = taosDecodeFixedI64(buf, &pSma->sliding); if (pSma->exprLen > 0) { if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) { tdDestroyTSma(pSma); return NULL; } } else { pSma->expr = NULL; } if (pSma->tagsFilterLen > 0) { if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) { tdDestroyTSma(pSma); return NULL; } } else { pSma->tagsFilter = NULL; } return buf; } static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) { buf = taosDecodeFixedU32(buf, &pSW->number); pSW->tSma = (STSma*)taosMemoryCalloc(pSW->number, sizeof(STSma)); if (pSW->tSma == NULL) { return NULL; } for (uint32_t i = 0; i < pSW->number; ++i) { if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) { for (uint32_t j = i; j >= 0; --i) { tdDestroyTSma(pSW->tSma + j); } taosMemoryFree(pSW->tSma); return NULL; } } return buf; } typedef struct { int idx; } SMCreateFullTextReq; int32_t tSerializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq); int32_t tDeserializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq); void tFreeSMCreateFullTextReq(SMCreateFullTextReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; } SMDropFullTextReq; int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq); int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq); typedef struct { char indexFName[TSDB_INDEX_FNAME_LEN]; } SUserIndexReq; int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq); int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq); typedef struct { char dbFName[TSDB_DB_FNAME_LEN]; char tblFName[TSDB_TABLE_FNAME_LEN]; char colName[TSDB_COL_NAME_LEN]; char indexType[TSDB_INDEX_TYPE_LEN]; char indexExts[TSDB_INDEX_EXTS_LEN]; } SUserIndexRsp; int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp); int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp); typedef struct { int8_t mqMsgType; int32_t code; int32_t epoch; int64_t consumerId; } SMqRspHead; #if 0 typedef struct { SMsgHead head; int64_t consumerId; int64_t blockingTime; int32_t epoch; int8_t withSchema; char cgroup[TSDB_CGROUP_LEN]; int64_t currentOffset; uint64_t reqId; char topic[TSDB_TOPIC_FNAME_LEN]; } SMqPollReq; #endif typedef struct { SMsgHead head; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int32_t epoch; uint64_t reqId; int64_t consumerId; int64_t blockingTime; int64_t currentOffset; } SMqPollReqV2; typedef struct { int32_t vgId; int64_t offset; SEpSet epSet; } SMqSubVgEp; typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int8_t isSchemaAdaptive; SArray* vgs; // SArray SSchemaWrapper schema; } SMqSubTopicEp; typedef struct { SMqRspHead head; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; // TODO: replace with topic name int32_t numOfTopics; // TODO: remove from msg SSchemaWrapper* schema; SArray* pBlockData; // SArray } SMqPollRsp; typedef struct { SMqRspHead head; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; int32_t dataLen; SArray* blockPos; // beginning pos for each SRetrieveTableRsp void* blockData; // serialized batched SRetrieveTableRsp } SMqPollRspV2; static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->dataLen); if (pRsp->dataLen != 0) { int32_t sz = taosArrayGetSize(pRsp->blockPos); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); tlen += taosEncodeFixedI32(buf, blockPos); } tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen); } return tlen; } static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->dataLen); if (pRsp->dataLen != 0) { int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t)); for (int32_t i = 0; i < sz; i++) { int32_t blockPos; buf = taosDecodeFixedI32(buf, &blockPos); taosArrayPush(pRsp->blockPos, &blockPos); } buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen); } return (void*)buf; } typedef struct { SMqRspHead head; int64_t reqOffset; int64_t rspOffset; int32_t skipLogNum; int32_t blockNum; int8_t withTbName; int8_t withSchema; int8_t withTag; SArray* blockDataLen; // SArray SArray* blockData; // SArray SArray* blockTbName; // SArray SArray* blockSchema; // SArray SArray* blockTags; // SArray SArray* blockTagSchema; // SArray } SMqDataBlkRsp; static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->blockNum); if (pRsp->blockNum != 0) { tlen += taosEncodeFixedI8(buf, pRsp->withTbName); tlen += taosEncodeFixedI8(buf, pRsp->withSchema); tlen += taosEncodeFixedI8(buf, pRsp->withTag); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t*)taosArrayGet(pRsp->blockDataLen, i); void* data = taosArrayGetP(pRsp->blockData, i); tlen += taosEncodeFixedI32(buf, bLen); tlen += taosEncodeBinary(buf, data, bLen); if (pRsp->withSchema) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); tlen += taosEncodeSSchemaWrapper(buf, pSW); } } } return tlen; } static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withSchema); buf = taosDecodeFixedI8(buf, &pRsp->withTag); for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = 0; void* data = NULL; buf = taosDecodeFixedI32(buf, &bLen); buf = taosDecodeBinary(buf, &data, bLen); taosArrayPush(pRsp->blockDataLen, &bLen); taosArrayPush(pRsp->blockData, &data); if (pRsp->withSchema) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper)); buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); } } } return (void*)buf; } typedef struct { SMqRspHead head; char cgroup[TSDB_CGROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { // taosMemoryFree(pSubTopicEp->schema.pSchema); taosArrayDestroy(pSubTopicEp->vgs); } static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI64(buf, pVgEp->offset); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI64(buf, &pVgEp->offset); buf = taosDecodeSEpSet(buf, &pVgEp->epSet); return buf; } static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) { taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); } static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); int32_t sz = taosArrayGetSize(pTopicEp->vgs); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); tlen += tEncodeSMqSubVgEp(buf, pVgEp); } tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); return tlen; } static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { buf = taosDecodeStringTo(buf, pTopicEp->topic); buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); if (pTopicEp->vgs == NULL) { return NULL; } for (int32_t i = 0; i < sz; i++) { SMqSubVgEp vgEp; buf = tDecodeSMqSubVgEp(buf, &vgEp); taosArrayPush(pTopicEp->vgs, &vgEp); } buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); return buf; } static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; // tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i); tlen += tEncodeSMqSubTopicEp(buf, pVgEp); } return tlen; } static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { // buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); if (pRsp->topics == NULL) { return NULL; } for (int32_t i = 0; i < sz; i++) { SMqSubTopicEp topicEp; buf = tDecodeSMqSubTopicEp(buf, &topicEp); taosArrayPush(pRsp->topics, &topicEp); } return buf; } #pragma pack(pop) #ifdef __cplusplus } #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/