tmsg.h 62.8 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_COMMON_TAOS_MSG_H_
#define _TD_COMMON_TAOS_MSG_H_

#include "taosdef.h"
#include "taoserror.h"
H
more  
Hongze Cheng 已提交
21
#include "tarray.h"
H
more  
Hongze Cheng 已提交
22
#include "tcoding.h"
S
Shengliang Guan 已提交
23
#include "tencode.h"
L
Liu Jicong 已提交
24
#include "thash.h"
H
Hongze Cheng 已提交
25
#include "tlist.h"
26
#include "trow.h"
D
dapan1121 已提交
27
#include "tname.h"
D
dapan1121 已提交
28

S
Shengliang Guan 已提交
29 30 31 32
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
33
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
H
Hongze Cheng 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
#define TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"

#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#define TD_MSG_SEG_CODE_
#include "tmsgdef.h"

#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"

S
Shengliang Guan 已提交
52 53
extern char*   tMsgInfo[];
extern int32_t tMsgDict[];
H
Hongze Cheng 已提交
54 55

#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
L
Liu Jicong 已提交
56 57 58
#define TMSG_SEG_SEQ(TYPE)  ((TYPE)&0xff)
#define TMSG_INFO(TYPE)     tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)]
#define TMSG_INDEX(TYPE)    (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
D
dapan1121 已提交
59

H
Hongze Cheng 已提交
60 61
typedef uint16_t tmsg_t;

H
Hongze Cheng 已提交
62
/* ------------------------ OTHER DEFINITIONS ------------------------ */
D
dapan1121 已提交
63
// IE type
L
Liu Jicong 已提交
64 65 66 67
#define TSDB_IE_TYPE_SEC         1
#define TSDB_IE_TYPE_META        2
#define TSDB_IE_TYPE_MGMT_IP     3
#define TSDB_IE_TYPE_DNODE_CFG   4
D
dapan1121 已提交
68
#define TSDB_IE_TYPE_NEW_VERSION 5
L
Liu Jicong 已提交
69
#define TSDB_IE_TYPE_DNODE_EXT   6
D
dapan1121 已提交
70 71
#define TSDB_IE_TYPE_DNODE_STATE 7

L
Liu Jicong 已提交
72
typedef enum {
73
  HEARTBEAT_TYPE_MQ = 0,
L
Liu Jicong 已提交
74
  HEARTBEAT_TYPE_QUERY,
L
Liu Jicong 已提交
75 76 77 78 79
  // types can be added here
  //
  HEARTBEAT_TYPE_MAX
} EHbType;

D
dapan1121 已提交
80
enum {
D
dapan1121 已提交
81 82
  HEARTBEAT_KEY_DBINFO = 1,
  HEARTBEAT_KEY_STBINFO,
D
dapan1121 已提交
83
  HEARTBEAT_KEY_MQ_TMP,
D
dapan1121 已提交
84 85
};

S
Shengliang Guan 已提交
86 87
typedef enum _mgmt_table {
  TSDB_MGMT_TABLE_START,
D
dapan1121 已提交
88 89 90 91 92 93
  TSDB_MGMT_TABLE_ACCT,
  TSDB_MGMT_TABLE_USER,
  TSDB_MGMT_TABLE_DB,
  TSDB_MGMT_TABLE_TABLE,
  TSDB_MGMT_TABLE_DNODE,
  TSDB_MGMT_TABLE_MNODE,
S
Shengliang Guan 已提交
94 95 96
  TSDB_MGMT_TABLE_QNODE,
  TSDB_MGMT_TABLE_SNODE,
  TSDB_MGMT_TABLE_BNODE,
D
dapan1121 已提交
97
  TSDB_MGMT_TABLE_VGROUP,
S
Shengliang Guan 已提交
98
  TSDB_MGMT_TABLE_STB,
D
dapan1121 已提交
99 100 101 102 103
  TSDB_MGMT_TABLE_MODULE,
  TSDB_MGMT_TABLE_QUERIES,
  TSDB_MGMT_TABLE_STREAMS,
  TSDB_MGMT_TABLE_VARIABLES,
  TSDB_MGMT_TABLE_CONNS,
S
Shengliang Guan 已提交
104
  TSDB_MGMT_TABLE_TRANS,
D
dapan1121 已提交
105 106 107
  TSDB_MGMT_TABLE_GRANTS,
  TSDB_MGMT_TABLE_VNODES,
  TSDB_MGMT_TABLE_CLUSTER,
S
Shengliang Guan 已提交
108
  TSDB_MGMT_TABLE_STREAMTABLES,
D
dapan1121 已提交
109
  TSDB_MGMT_TABLE_TP,
S
Shengliang 已提交
110
  TSDB_MGMT_TABLE_FUNC,
D
dapan1121 已提交
111
  TSDB_MGMT_TABLE_MAX,
S
Shengliang Guan 已提交
112
} EShowType;
D
dapan1121 已提交
113

L
Liu Jicong 已提交
114 115
#define TSDB_ALTER_TABLE_ADD_TAG         1
#define TSDB_ALTER_TABLE_DROP_TAG        2
S
Shengliang Guan 已提交
116
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
L
Liu Jicong 已提交
117
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL  4
D
dapan1121 已提交
118

L
Liu Jicong 已提交
119 120
#define TSDB_ALTER_TABLE_ADD_COLUMN          5
#define TSDB_ALTER_TABLE_DROP_COLUMN         6
S
Shengliang Guan 已提交
121
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
L
Liu Jicong 已提交
122
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES    8
D
dapan1121 已提交
123

L
Liu Jicong 已提交
124 125
#define TSDB_FILL_NONE      0
#define TSDB_FILL_NULL      1
H
Hongze Cheng 已提交
126
#define TSDB_FILL_SET_VALUE 2
L
Liu Jicong 已提交
127 128 129 130 131 132 133 134 135 136
#define TSDB_FILL_LINEAR    3
#define TSDB_FILL_PREV      4
#define TSDB_FILL_NEXT      5

#define TSDB_ALTER_USER_PASSWD          0x1
#define TSDB_ALTER_USER_SUPERUSER       0x2
#define TSDB_ALTER_USER_ADD_READ_DB     0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB  0x4
#define TSDB_ALTER_USER_CLEAR_READ_DB   0x5
#define TSDB_ALTER_USER_ADD_WRITE_DB    0x6
S
Shengliang Guan 已提交
137
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
L
Liu Jicong 已提交
138
#define TSDB_ALTER_USER_CLEAR_WRITE_DB  0x8
S
Shengliang Guan 已提交
139

D
dapan1121 已提交
140 141
#define TSDB_ALTER_USER_PRIVILEGES 0x2

H
Hongze Cheng 已提交
142
#define TSDB_KILL_MSG_LEN 30
D
dapan1121 已提交
143

D
dapan1121 已提交
144 145
#define TSDB_TABLE_NUM_UNIT 100000

L
Liu Jicong 已提交
146
#define TSDB_VN_READ_ACCCESS  ((char)0x1)
H
Hongze Cheng 已提交
147
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
L
Liu Jicong 已提交
148
#define TSDB_VN_ALL_ACCCESS   (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
D
dapan1121 已提交
149

H
Hongze Cheng 已提交
150
#define TSDB_COL_NORMAL 0x0u  // the normal column of the table
L
Liu Jicong 已提交
151 152 153 154
#define TSDB_COL_TAG    0x1u  // the tag column type
#define TSDB_COL_UDC    0x2u  // the user specified normal string column, it is a dummy column
#define TSDB_COL_TMP    0x4u  // internal column generated by the previous operators
#define TSDB_COL_NULL   0x8u  // the column filter NULL or not
D
dapan1121 已提交
155

L
Liu Jicong 已提交
156
#define TSDB_COL_IS_TAG(f)        (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0)
H
Hongze Cheng 已提交
157
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
L
Liu Jicong 已提交
158 159
#define TSDB_COL_IS_UD_COL(f)     ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f)      (((f)&TSDB_COL_NULL) != 0)
S
Shengliang Guan 已提交
160

L
Liu Jicong 已提交
161 162
#define TD_SUPER_TABLE  TSDB_SUPER_TABLE
#define TD_CHILD_TABLE  TSDB_CHILD_TABLE
S
Shengliang Guan 已提交
163 164
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE

S
Shengliang Guan 已提交
165
typedef struct {
S
Shengliang Guan 已提交
166
  int32_t vgId;
D
dapan1121 已提交
167 168
  char*   dbFName;
  char*   tbName;
D
dapan1121 已提交
169 170
} SBuildTableMetaInput;

S
Shengliang Guan 已提交
171
typedef struct {
D
dapan1121 已提交
172
  char    db[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
173
  int64_t dbId;
174
  int32_t vgVersion;
D
dapan 已提交
175
  int32_t numOfTable;      // unit is TSDB_TABLE_NUM_UNIT
D
dapan1121 已提交
176 177
} SBuildUseDBInput;

S
Shengliang Guan 已提交
178 179 180 181 182 183
typedef struct SField {
  char    name[TSDB_COL_NAME_LEN];
  uint8_t type;
  int32_t bytes;
} SField;

D
dapan1121 已提交
184 185 186
#pragma pack(push, 1)

// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
H
Haojun Liao 已提交
187
typedef struct SEp {
D
dapan1121 已提交
188
  char     fqdn[TSDB_FQDN_LEN];
D
dapan1121 已提交
189
  uint16_t port;
H
Haojun Liao 已提交
190
} SEp;
D
dapan1121 已提交
191

S
Shengliang Guan 已提交
192
typedef struct {
D
dapan1121 已提交
193
  char    dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
194 195 196 197
  int32_t contLen;
  int32_t vgId;
} SMsgHead;

D
dapan1121 已提交
198 199 200 201
typedef struct {
  char    dbFName[TSDB_DB_FNAME_LEN];
} SRspHead;

D
dapan1121 已提交
202 203
// Submit message for one table
typedef struct SSubmitBlk {
204 205
  int64_t uid;        // table unique id
  int32_t tid;        // table id
D
dapan1121 已提交
206
  char    tableName[TSDB_TABLE_NAME_LEN];
207 208 209 210 211 212
  int32_t padding;    // TODO just for padding here
  int32_t sversion;   // data schema version
  int32_t dataLen;    // data part length, not including the SSubmitBlk head
  int32_t schemaLen;  // schema length, if length is 0, no schema exists
  int16_t numOfRows;  // total number of rows in current submit block
  char    data[];
D
dapan1121 已提交
213 214 215
} SSubmitBlk;

// Submit message for this TSDB
S
Shengliang Guan 已提交
216
typedef struct {
S
Shengliang Guan 已提交
217
  SMsgHead header;
H
Hongze Cheng 已提交
218
  int64_t  version;
S
Shengliang Guan 已提交
219 220 221
  int32_t  length;
  int32_t  numOfBlocks;
  char     blocks[];
S
Shengliang Guan 已提交
222
} SSubmitReq;
D
dapan1121 已提交
223

H
Hongze Cheng 已提交
224 225 226
typedef struct {
  int32_t totalLen;
  int32_t len;
227
  STSRow* row;
H
Hongze Cheng 已提交
228 229 230 231 232 233 234 235
} SSubmitBlkIter;

typedef struct {
  int32_t totalLen;
  int32_t len;
  void*   pMsg;
} SSubmitMsgIter;

S
Shengliang Guan 已提交
236
int32_t tInitSubmitMsgIter(SSubmitReq* pMsg, SSubmitMsgIter* pIter);
S
Shengliang Guan 已提交
237 238
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
C
Cary Xu 已提交
239
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
H
Hongze Cheng 已提交
240

D
dapan1121 已提交
241 242 243 244 245
typedef struct {
  int32_t index;  // index of failed block in submit blocks
  int32_t vnode;  // vnode index of failed block
  int32_t sid;    // table index of failed block
  int32_t code;   // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
S
Shengliang Guan 已提交
246
} SSubmitRspBlock;
D
dapan1121 已提交
247 248

typedef struct {
S
Shengliang Guan 已提交
249 250 251 252 253 254 255
  int32_t         code;          // 0-success, > 0 error code
  int32_t         numOfRows;     // number of records the client is trying to write
  int32_t         affectedRows;  // number of records actually written
  int32_t         failedRows;    // number of failed records (exclude duplicate records)
  int32_t         numOfFailedBlocks;
  SSubmitRspBlock failedBlocks[];
} SSubmitRsp;
D
dapan1121 已提交
256 257

typedef struct SSchema {
S
Shengliang Guan 已提交
258
  int8_t  type;
S
Shengliang Guan 已提交
259
  int32_t colId;
260
  int32_t bytes;
S
Shengliang Guan 已提交
261
  char    name[TSDB_COL_NAME_LEN];
D
dapan1121 已提交
262 263 264
} SSchema;

typedef struct {
S
Shengliang Guan 已提交
265 266 267
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  igExists;
  int32_t numOfColumns;
S
Shengliang Guan 已提交
268 269 270
  int32_t numOfTags;
  SArray* pColumns;
  SArray* pTags;
S
Shengliang Guan 已提交
271
  char    comment[TSDB_STB_COMMENT_LEN];
S
Shengliang Guan 已提交
272
} SMCreateStbReq;
D
dapan1121 已提交
273

S
Shengliang Guan 已提交
274 275
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
S
Shengliang Guan 已提交
276
void    tFreeSMCreateStbReq(SMCreateStbReq* pReq);
S
Shengliang Guan 已提交
277

D
dapan1121 已提交
278 279 280
typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
281
} SMDropStbReq;
S
Shengliang Guan 已提交
282

S
Shengliang Guan 已提交
283 284
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
S
Shengliang Guan 已提交
285

S
Shengliang Guan 已提交
286 287 288
typedef struct {
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  alterType;
S
Shengliang Guan 已提交
289 290
  int32_t numOfFields;
  SArray* pFields;
S
Shengliang Guan 已提交
291
} SMAltertbReq;
D
dapan1121 已提交
292

S
Shengliang Guan 已提交
293 294 295
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void    tFreeSMAltertbReq(SMAltertbReq* pReq);
D
dapan1121 已提交
296 297

typedef struct SEpSet {
298 299 300
  int8_t inUse;
  int8_t numOfEps;
  SEp    eps[TSDB_MAX_REPLICA];
D
dapan1121 已提交
301 302
} SEpSet;

S
Shengliang Guan 已提交
303 304 305 306
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void*   taosDecodeSEpSet(void* buf, SEpSet* pEp);
L
Liu Jicong 已提交
307

D
dapan1121 已提交
308 309
typedef struct {
  int32_t pid;
S
Shengliang Guan 已提交
310 311
  char    app[TSDB_APP_NAME_LEN];
  char    db[TSDB_DB_NAME_LEN];
312
  int64_t startTime;
S
Shengliang Guan 已提交
313
} SConnectReq;
L
Liu Jicong 已提交
314

S
Shengliang Guan 已提交
315 316
int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
317

L
Liu Jicong 已提交
318 319 320 321 322 323
typedef struct {
  int32_t acctId;
  int64_t clusterId;
  int32_t connId;
  int8_t  superUser;
  SEpSet  epSet;
324
  char    sVersion[128];
L
Liu Jicong 已提交
325
} SConnectRsp;
L
Liu Jicong 已提交
326

S
Shengliang Guan 已提交
327 328 329
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);

L
Liu Jicong 已提交
330 331 332 333 334 335 336 337 338 339
typedef struct {
  char    user[TSDB_USER_LEN];
  char    pass[TSDB_PASSWORD_LEN];
  int32_t maxUsers;
  int32_t maxDbs;
  int32_t maxTimeSeries;
  int32_t maxStreams;
  int32_t accessState;  // Configured only by command
  int64_t maxStorage;   // In unit of GB
} SCreateAcctReq, SAlterAcctReq;
L
Liu Jicong 已提交
340

S
Shengliang Guan 已提交
341 342
int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
S
Shengliang Guan 已提交
343

L
Liu Jicong 已提交
344 345 346
typedef struct {
  char user[TSDB_USER_LEN];
} SDropUserReq, SDropAcctReq;
D
dapan1121 已提交
347

S
Shengliang Guan 已提交
348 349
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
S
Shengliang Guan 已提交
350

D
dapan1121 已提交
351
typedef struct {
S
Shengliang Guan 已提交
352 353
  int8_t createType;
  int8_t superUser;  // denote if it is a super user or not
S
Shengliang Guan 已提交
354 355
  char   user[TSDB_USER_LEN];
  char   pass[TSDB_PASSWORD_LEN];
S
Shengliang Guan 已提交
356 357
} SCreateUserReq;

S
Shengliang Guan 已提交
358 359
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
S
Shengliang Guan 已提交
360 361 362

typedef struct {
  int8_t alterType;
S
Shengliang Guan 已提交
363
  int8_t superUser;
S
Shengliang Guan 已提交
364 365 366 367 368
  char   user[TSDB_USER_LEN];
  char   pass[TSDB_PASSWORD_LEN];
  char   dbname[TSDB_DB_FNAME_LEN];
} SAlterUserReq;

S
Shengliang Guan 已提交
369 370
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
D
dapan1121 已提交
371

S
Shengliang Guan 已提交
372 373 374 375
typedef struct {
  char user[TSDB_USER_LEN];
} SGetUserAuthReq;

S
Shengliang Guan 已提交
376 377
int32_t tSerializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
S
Shengliang Guan 已提交
378 379 380 381 382 383 384 385

typedef struct {
  char      user[TSDB_USER_LEN];
  int8_t    superAuth;
  SHashObj* readDbs;
  SHashObj* writeDbs;
} SGetUserAuthRsp;

S
Shengliang Guan 已提交
386 387
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
S
Shengliang Guan 已提交
388

D
dapan1121 已提交
389
typedef struct {
S
Shengliang Guan 已提交
390 391 392
  int16_t colId;     // column id
  int16_t colIndex;  // column index in colList if it is a normal column or index in tagColList if a tag
  int16_t flag;      // denote if it is a tag or a normal column
393
  char    name[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
394 395
} SColIndex;

S
Shengliang Guan 已提交
396
typedef struct {
D
dapan1121 已提交
397 398
  int16_t lowerRelOptr;
  int16_t upperRelOptr;
S
Shengliang Guan 已提交
399
  int16_t filterstr;  // denote if current column is char(binary/nchar)
D
dapan1121 已提交
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416

  union {
    struct {
      int64_t lowerBndi;
      int64_t upperBndi;
    };
    struct {
      double lowerBndd;
      double upperBndd;
    };
    struct {
      int64_t pz;
      int64_t len;
    };
  };
} SColumnFilterInfo;

S
Shengliang Guan 已提交
417
typedef struct {
S
Shengliang Guan 已提交
418 419 420
  int16_t numOfFilters;
  union {
    int64_t            placeholder;
H
Hongze Cheng 已提交
421
    SColumnFilterInfo* filterInfo;
D
dapan1121 已提交
422 423 424 425 426 427
  };
} SColumnFilterList;
/*
 * for client side struct, we only need the column id, type, bytes are not necessary
 * But for data in vnode side, we need all the following information.
 */
S
Shengliang Guan 已提交
428
typedef struct {
H
Haojun Liao 已提交
429 430 431 432 433
  union {
    int16_t colId;
    int16_t slotId;
  };

H
Haojun Liao 已提交
434 435 436 437
  int16_t   type;
  int32_t   bytes;
  uint8_t   precision;
  uint8_t   scale;
D
dapan1121 已提交
438 439
} SColumnInfo;

S
Shengliang Guan 已提交
440
typedef struct {
L
Liu Jicong 已提交
441 442
  int64_t uid;
  TSKEY   key;  // last accessed ts, for subscription
D
dapan1121 已提交
443 444 445 446 447 448 449 450
} STableIdInfo;

typedef struct STimeWindow {
  TSKEY skey;
  TSKEY ekey;
} STimeWindow;

typedef struct {
S
Shengliang Guan 已提交
451 452 453 454
  int32_t tsOffset;       // offset value in current msg body, NOTE: ts list is compressed
  int32_t tsLen;          // total length of ts comp block
  int32_t tsNumOfBlocks;  // ts comp block numbers
  int32_t tsOrder;        // ts comp block order
D
dapan1121 已提交
455 456
} STsBufInfo;

S
Shengliang Guan 已提交
457
typedef struct {
S
Shengliang Guan 已提交
458
  int32_t tz;  // query client timezone
D
dapan1121 已提交
459 460 461 462 463 464 465 466 467
  char    intervalUnit;
  char    slidingUnit;
  char    offsetUnit;
  int64_t interval;
  int64_t sliding;
  int64_t offset;
} SInterval;

typedef struct {
S
Shengliang Guan 已提交
468
  int32_t code;
D
dapan1121 已提交
469
  SName   tableName;
D
dapan1121 已提交
470 471
} SQueryTableRsp;

D
dapan1121 已提交
472 473 474 475 476
int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);

int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp);


D
dapan1121 已提交
477
typedef struct {
478
  char    db[TSDB_DB_FNAME_LEN];
479
  int32_t numOfVgroups;
S
Shengliang Guan 已提交
480 481 482 483 484 485
  int32_t cacheBlockSize;  // MB
  int32_t totalBlocks;
  int32_t daysPerFile;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
S
Shengliang Guan 已提交
486 487
  int32_t minRows;
  int32_t maxRows;
S
Shengliang Guan 已提交
488 489
  int32_t commitTime;
  int32_t fsyncPeriod;
S
Shengliang Guan 已提交
490
  int8_t  walLevel;
S
Shengliang Guan 已提交
491 492 493 494 495 496
  int8_t  precision;  // time resolution
  int8_t  compression;
  int8_t  replications;
  int8_t  quorum;
  int8_t  update;
  int8_t  cacheLastRow;
S
Shengliang Guan 已提交
497
  int8_t  ignoreExist;
X
Xiaoyu Wang 已提交
498
  int8_t  streamMode;
S
Shengliang Guan 已提交
499
} SCreateDbReq;
S
Shengliang Guan 已提交
500

S
Shengliang Guan 已提交
501 502 503
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
int32_t tDeserializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);

S
Shengliang Guan 已提交
504
typedef struct {
505
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
506 507 508 509 510 511 512 513
  int32_t totalBlocks;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
  int32_t fsyncPeriod;
  int8_t  walLevel;
  int8_t  quorum;
  int8_t  cacheLastRow;
S
Shengliang Guan 已提交
514
} SAlterDbReq;
S
Shengliang Guan 已提交
515

S
Shengliang Guan 已提交
516 517 518
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);

S
Shengliang Guan 已提交
519
typedef struct {
S
Shengliang Guan 已提交
520
  char   db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
521
  int8_t ignoreNotExists;
S
Shengliang Guan 已提交
522
} SDropDbReq;
S
Shengliang Guan 已提交
523

S
Shengliang Guan 已提交
524 525 526
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);

S
Shengliang Guan 已提交
527
typedef struct {
L
Liu Jicong 已提交
528 529
  char    db[TSDB_DB_FNAME_LEN];
  int64_t uid;
S
Shengliang Guan 已提交
530 531
} SDropDbRsp;

S
Shengliang Guan 已提交
532 533 534
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);

S
Shengliang Guan 已提交
535 536
typedef struct {
  char    db[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
537
  int64_t dbId;
538
  int32_t vgVersion;
D
dapan 已提交
539
  int32_t numOfTable;    // unit is TSDB_TABLE_NUM_UNIT
S
Shengliang Guan 已提交
540
} SUseDbReq;
S
Shengliang Guan 已提交
541

S
Shengliang Guan 已提交
542 543
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
S
Shengliang Guan 已提交
544 545

typedef struct {
L
Liu Jicong 已提交
546 547 548 549 550 551
  char    db[TSDB_DB_FNAME_LEN];
  int64_t uid;
  int32_t vgVersion;
  int32_t vgNum;
  int8_t  hashMethod;
  SArray* pVgroupInfos;  // Array of SVgroupInfo
S
Shengliang Guan 已提交
552 553 554 555 556 557
} SUseDbRsp;

int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
void    tFreeSUsedbRsp(SUseDbRsp* pRsp);

D
dapan1121 已提交
558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
typedef struct {
  int32_t rowNum;
} SQnodeListReq;

int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);

typedef struct {
  SArray *epSetList; // SArray<SEpSet>
} SQnodeListRsp;

int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void    tFreeSQnodeListRsp(SQnodeListRsp* pRsp);



S
Shengliang Guan 已提交
575 576 577 578 579 580 581
typedef struct {
  SArray* pArray;  // Array of SUseDbRsp
} SUseDbBatchRsp;

int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
void    tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
S
Shengliang Guan 已提交
582 583

typedef struct {
S
Shengliang Guan 已提交
584
  char db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
585 586 587 588
} SSyncDbReq, SCompactDbReq;

int32_t tSerializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
int32_t tDeserializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
D
dapan1121 已提交
589 590

typedef struct {
S
Shengliang Guan 已提交
591
  char    name[TSDB_FUNC_NAME_LEN];
S
Shengliang 已提交
592
  int8_t  igExists;
S
Shengliang Guan 已提交
593 594
  int8_t  funcType;
  int8_t  scriptType;
S
Shengliang Guan 已提交
595
  int8_t  outputType;
S
Shengliang Guan 已提交
596
  int32_t outputLen;
S
Shengliang Guan 已提交
597
  int32_t bufSize;
S
Shengliang 已提交
598
  int64_t signature;
S
Shengliang Guan 已提交
599 600
  int32_t commentSize;
  int32_t codeSize;
S
Shengliang Guan 已提交
601 602
  char    pComment[TSDB_FUNC_COMMENT_LEN];
  char    pCode[TSDB_FUNC_CODE_LEN];
S
Shengliang Guan 已提交
603
} SCreateFuncReq;
D
dapan1121 已提交
604

S
Shengliang Guan 已提交
605 606 607
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
int32_t tDeserializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);

D
dapan1121 已提交
608
typedef struct {
S
Shengliang 已提交
609 610
  char   name[TSDB_FUNC_NAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
611
} SDropFuncReq;
S
Shengliang Guan 已提交
612

S
Shengliang Guan 已提交
613 614 615
int32_t tSerializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);

S
Shengliang Guan 已提交
616 617
typedef struct {
  int32_t numOfFuncs;
S
Shengliang Guan 已提交
618
  SArray* pFuncNames;
S
Shengliang Guan 已提交
619
} SRetrieveFuncReq;
D
dapan1121 已提交
620

S
Shengliang Guan 已提交
621 622 623
int32_t tSerializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
int32_t tDeserializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);

D
dapan1121 已提交
624 625
typedef struct {
  char    name[TSDB_FUNC_NAME_LEN];
S
Shengliang Guan 已提交
626 627 628 629
  int8_t  funcType;
  int8_t  scriptType;
  int8_t  outputType;
  int32_t outputLen;
D
dapan1121 已提交
630
  int32_t bufSize;
S
Shengliang 已提交
631
  int64_t signature;
S
Shengliang Guan 已提交
632 633
  int32_t commentSize;
  int32_t codeSize;
S
Shengliang Guan 已提交
634 635
  char    pComment[TSDB_FUNC_COMMENT_LEN];
  char    pCode[TSDB_FUNC_CODE_LEN];
S
Shengliang Guan 已提交
636
} SFuncInfo;
D
dapan1121 已提交
637 638

typedef struct {
S
Shengliang Guan 已提交
639
  int32_t numOfFuncs;
S
Shengliang Guan 已提交
640
  SArray* pFuncInfos;
S
Shengliang Guan 已提交
641
} SRetrieveFuncRsp;
D
dapan1121 已提交
642

S
Shengliang Guan 已提交
643 644 645
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);

D
dapan1121 已提交
646
typedef struct {
S
Shengliang Guan 已提交
647
  int32_t statusInterval;
S
Shengliang Guan 已提交
648
  int64_t checkTime;                  // 1970-01-01 00:00:00.000
S
Shengliang Guan 已提交
649 650 651
  char    timezone[TD_TIMEZONE_LEN];  // tsTimezone
  char    locale[TD_LOCALE_LEN];      // tsLocale
  char    charset[TD_LOCALE_LEN];     // tsCharset
S
Shengliang Guan 已提交
652
} SClusterCfg;
D
dapan1121 已提交
653

S
Shengliang Guan 已提交
654 655 656
typedef struct {
  int32_t vgId;
  int8_t  role;
S
Shengliang Guan 已提交
657 658
  int64_t numOfTables;
  int64_t numOfTimeSeries;
S
Shengliang Guan 已提交
659 660 661
  int64_t totalStorage;
  int64_t compStorage;
  int64_t pointsWritten;
S
Shengliang Guan 已提交
662 663 664 665 666
  int64_t numOfSelectReqs;
  int64_t numOfInsertReqs;
  int64_t numOfInsertSuccessReqs;
  int64_t numOfBatchInsertReqs;
  int64_t numOfBatchInsertSuccessReqs;
S
Shengliang Guan 已提交
667 668 669
} SVnodeLoad;

typedef struct {
S
Shengliang Guan 已提交
670 671
  int32_t     sver;  // software version
  int64_t     dver;  // dnode table version in sdb
S
Shengliang Guan 已提交
672
  int32_t     dnodeId;
673
  int64_t     clusterId;
S
Shengliang Guan 已提交
674 675
  int64_t     rebootTime;
  int64_t     updateTime;
S
Shengliang Guan 已提交
676 677
  int32_t     numOfCores;
  int32_t     numOfSupportVnodes;
S
Shengliang Guan 已提交
678 679
  char        dnodeEp[TSDB_EP_LEN];
  SClusterCfg clusterCfg;
S
Shengliang Guan 已提交
680
  SArray*     pVloads;  // array of SVnodeLoad
S
Shengliang Guan 已提交
681
} SStatusReq;
D
dapan1121 已提交
682

S
Shengliang Guan 已提交
683 684
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
S
Shengliang Guan 已提交
685

D
dapan1121 已提交
686
typedef struct {
S
Shengliang Guan 已提交
687
  int32_t dnodeId;
688
  int64_t clusterId;
D
dapan1121 已提交
689 690 691
} SDnodeCfg;

typedef struct {
692 693 694
  int32_t id;
  int8_t  isMnode;
  SEp     ep;
D
dapan1121 已提交
695 696 697
} SDnodeEp;

typedef struct {
698
  int64_t   dver;
S
Shengliang Guan 已提交
699
  SDnodeCfg dnodeCfg;
S
Shengliang Guan 已提交
700
  SArray*   pDnodeEps;  // Array of SDnodeEp
D
dapan1121 已提交
701 702
} SStatusRsp;

S
Shengliang Guan 已提交
703 704
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
S
Shengliang Guan 已提交
705 706

typedef struct {
S
Shengliang Guan 已提交
707 708 709 710 711
  int32_t reserved;
} SMTimerReq;

int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
S
Shengliang Guan 已提交
712

D
dapan1121 已提交
713
typedef struct {
S
Shengliang Guan 已提交
714 715 716
  int32_t  id;
  uint16_t port;                 // node sync Port
  char     fqdn[TSDB_FQDN_LEN];  // node FQDN
S
Shengliang Guan 已提交
717 718 719
} SReplica;

typedef struct {
S
Shengliang Guan 已提交
720
  int32_t  vgId;
S
Shengliang Guan 已提交
721
  int32_t  dnodeId;
722
  char     db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
723
  int64_t  dbUid;
724
  int32_t  vgVersion;
S
Shengliang Guan 已提交
725 726 727 728 729 730
  int32_t  cacheBlockSize;
  int32_t  totalBlocks;
  int32_t  daysPerFile;
  int32_t  daysToKeep0;
  int32_t  daysToKeep1;
  int32_t  daysToKeep2;
S
Shengliang Guan 已提交
731 732 733
  int32_t  minRows;
  int32_t  maxRows;
  int32_t  commitTime;
S
Shengliang Guan 已提交
734
  int32_t  fsyncPeriod;
D
dapan1121 已提交
735 736
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
737
  int8_t   hashMethod;
S
Shengliang Guan 已提交
738
  int8_t   walLevel;
S
Shengliang Guan 已提交
739 740 741
  int8_t   precision;
  int8_t   compression;
  int8_t   quorum;
S
Shengliang Guan 已提交
742 743
  int8_t   update;
  int8_t   cacheLastRow;
S
Shengliang Guan 已提交
744
  int8_t   replica;
S
Shengliang Guan 已提交
745
  int8_t   selfIndex;
L
Liu Jicong 已提交
746
  int8_t   streamMode;
S
Shengliang Guan 已提交
747
  SReplica replicas[TSDB_MAX_REPLICA];
L
Liu Jicong 已提交
748

S
Shengliang Guan 已提交
749
} SCreateVnodeReq, SAlterVnodeReq;
D
dapan1121 已提交
750

S
Shengliang Guan 已提交
751 752 753
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);

S
Shengliang Guan 已提交
754
typedef struct {
L
Liu Jicong 已提交
755 756 757 758
  int32_t vgId;
  int32_t dnodeId;
  int64_t dbUid;
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
759
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
S
Shengliang Guan 已提交
760

S
Shengliang Guan 已提交
761 762
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
S
Shengliang Guan 已提交
763

D
dapan1121 已提交
764
typedef struct {
D
dapan1121 已提交
765
  SMsgHead header;
D
dapan1121 已提交
766 767
  char     dbFName[TSDB_DB_FNAME_LEN];
  char     tbName[TSDB_TABLE_NAME_LEN];
S
Shengliang Guan 已提交
768
} STableInfoReq;
D
dapan1121 已提交
769

S
Shengliang Guan 已提交
770 771 772
int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
int32_t tDeserializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);

D
dapan1121 已提交
773
typedef struct {
S
Shengliang Guan 已提交
774
  int8_t  metaClone;  // create local clone of the cached table meta
D
dapan1121 已提交
775 776 777 778
  int32_t numOfVgroups;
  int32_t numOfTables;
  int32_t numOfUdfs;
  char    tableNames[];
S
Shengliang Guan 已提交
779
} SMultiTableInfoReq;
D
dapan1121 已提交
780

H
Haojun Liao 已提交
781
// todo refactor
D
dapan1121 已提交
782
typedef struct SVgroupInfo {
783 784 785
  int32_t  vgId;
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
786
  SEpSet   epSet;
D
dapan1121 已提交
787
  int32_t  numOfTable;  // unit is TSDB_TABLE_NUM_UNIT
D
dapan1121 已提交
788 789
} SVgroupInfo;

D
dapan1121 已提交
790

D
dapan1121 已提交
791
typedef struct {
H
Haojun Liao 已提交
792 793
  int32_t     numOfVgroups;
  SVgroupInfo vgroups[];
S
Shengliang Guan 已提交
794
} SVgroupsInfo;
D
dapan1121 已提交
795

S
Shengliang Guan 已提交
796
typedef struct {
D
dapan1121 已提交
797 798 799
  char     tbName[TSDB_TABLE_NAME_LEN];
  char     stbName[TSDB_TABLE_NAME_LEN];
  char     dbFName[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
800
  int64_t  dbId;
S
Shengliang Guan 已提交
801 802 803 804 805 806 807
  int32_t  numOfTags;
  int32_t  numOfColumns;
  int8_t   precision;
  int8_t   tableType;
  int8_t   update;
  int32_t  sversion;
  int32_t  tversion;
L
Liu Jicong 已提交
808 809
  uint64_t suid;
  uint64_t tuid;
S
Shengliang Guan 已提交
810
  int32_t  vgId;
S
Shengliang Guan 已提交
811
  SSchema* pSchemas;
S
Shengliang Guan 已提交
812
} STableMetaRsp;
D
dapan1121 已提交
813

S
Shengliang Guan 已提交
814 815 816 817 818 819 820 821 822 823 824 825
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void    tFreeSTableMetaRsp(STableMetaRsp* pRsp);

typedef struct {
  SArray* pArray;  // Array of STableMetaRsp
} STableMetaBatchRsp;

int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
void    tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);

S
Shengliang Guan 已提交
826
typedef struct {
S
Shengliang Guan 已提交
827 828 829 830 831 832 833 834
  int32_t numOfTables;
  int32_t numOfVgroup;
  int32_t numOfUdf;
  int32_t contLen;
  int8_t  compressed;  // denote if compressed or not
  int32_t rawLen;      // size before compress
  uint8_t metaClone;   // make meta clone after retrieve meta from mnode
  char    meta[];
D
dapan1121 已提交
835 836 837 838 839
} SMultiTableMeta;

typedef struct {
  int32_t dataLen;
  char    name[TSDB_TABLE_FNAME_LEN];
H
Hongze Cheng 已提交
840
  char*   data;
D
dapan1121 已提交
841 842 843 844 845 846 847 848
} STagData;

/*
 * sql: show tables like '%a_%'
 * payload is the query condition, e.g., '%a_%'
 * payloadLen is the length of payload
 */
typedef struct {
L
Liu Jicong 已提交
849
  int32_t type;
850
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
851 852
  int32_t payloadLen;
  char*   payload;
S
Shengliang Guan 已提交
853
} SShowReq;
D
dapan1121 已提交
854

S
Shengliang Guan 已提交
855 856 857 858
int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
void    tFreeSShowReq(SShowReq* pReq);

S
Shengliang Guan 已提交
859
typedef struct {
860
  int64_t       showId;
S
Shengliang Guan 已提交
861
  STableMetaRsp tableMeta;
S
Shengliang Guan 已提交
862
} SShowRsp, SVShowTablesRsp;
D
dapan1121 已提交
863

S
Shengliang Guan 已提交
864 865 866
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void    tFreeSShowRsp(SShowRsp* pRsp);
D
dapan1121 已提交
867

868
typedef struct {
869 870
  int32_t type;
  char    db[TSDB_DB_FNAME_LEN];
871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887
  int64_t showId;
  int8_t  free;
} SRetrieveTableReq;

int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);

typedef struct {
  int64_t useconds;
  int8_t  completed;  // all results are returned to client
  int8_t  precision;
  int8_t  compressed;
  int32_t compLen;
  int32_t numOfRows;
  char    data[];
} SRetrieveTableRsp;

H
Haojun Liao 已提交
888 889 890 891 892 893 894 895 896 897 898
typedef struct {
  int64_t  handle;
  int64_t  useconds;
  int8_t   completed;  // all results are returned to client
  int8_t   precision;
  int8_t   compressed;
  int32_t  compLen;
  int32_t  numOfRows;
  char     data[];
} SRetrieveMetaTableRsp;

D
dapan1121 已提交
899
typedef struct {
S
Shengliang Guan 已提交
900
  char    fqdn[TSDB_FQDN_LEN];  // end point, hostname:port
901
  int32_t port;
S
Shengliang Guan 已提交
902
} SCreateDnodeReq;
S
Shengliang Guan 已提交
903

S
Shengliang Guan 已提交
904 905 906
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
int32_t tDeserializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);

S
Shengliang Guan 已提交
907 908
typedef struct {
  int32_t dnodeId;
909 910
  char    fqdn[TSDB_FQDN_LEN];
  int32_t port;
S
Shengliang Guan 已提交
911
} SDropDnodeReq;
S
Shengliang Guan 已提交
912

S
Shengliang Guan 已提交
913 914 915
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);

S
Shengliang Guan 已提交
916 917
typedef struct {
  int32_t dnodeId;
918
  char    config[TSDB_DNODE_CONFIG_LEN];
S
Shengliang Guan 已提交
919
  char    value[TSDB_DNODE_VALUE_LEN];
S
Shengliang Guan 已提交
920
} SMCfgDnodeReq, SDCfgDnodeReq;
D
dapan1121 已提交
921

S
Shengliang Guan 已提交
922 923 924
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);

S
Shengliang Guan 已提交
925 926
typedef struct {
  int32_t dnodeId;
S
Shengliang Guan 已提交
927
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq;
S
Shengliang Guan 已提交
928

S
Shengliang Guan 已提交
929 930 931
int32_t tSerializeSMCreateDropMnodeReq(void* buf, int32_t bufLen, SMCreateMnodeReq* pReq);
int32_t tDeserializeSMCreateDropMnodeReq(void* buf, int32_t bufLen, SMCreateMnodeReq* pReq);

D
dapan1121 已提交
932
typedef struct {
S
Shengliang Guan 已提交
933 934
  int32_t  dnodeId;
  int8_t   replica;
S
Shengliang Guan 已提交
935
  SReplica replicas[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
936
} SDCreateMnodeReq, SDAlterMnodeReq;
D
dapan1121 已提交
937

S
Shengliang Guan 已提交
938 939 940
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);

S
Shengliang Guan 已提交
941 942
typedef struct {
  int32_t dnodeId;
S
Shengliang Guan 已提交
943 944
} SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq, SMCreateSnodeReq, SMDropSnodeReq,
    SDCreateSnodeReq, SDDropSnodeReq, SMCreateBnodeReq, SMDropBnodeReq, SDCreateBnodeReq, SDDropBnodeReq;
S
Shengliang Guan 已提交
945

S
Shengliang Guan 已提交
946 947
int32_t tSerializeSMCreateDropQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
int32_t tDeserializeSMCreateDropQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
S
Shengliang Guan 已提交
948

D
dapan1121 已提交
949
typedef struct {
S
Shengliang Guan 已提交
950 951 952 953 954 955 956 957 958 959 960
  char    sql[TSDB_SHOW_SQL_LEN];
  int32_t queryId;
  int64_t useconds;
  int64_t stime;
  int64_t qId;
  int64_t sqlObjId;
  int32_t pid;
  char    fqdn[TSDB_FQDN_LEN];
  int8_t  stableQuery;
  int32_t numOfSub;
  char    subSqlInfo[TSDB_SHOW_SUBQUERY_LEN];  // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
D
dapan1121 已提交
961 962 963
} SQueryDesc;

typedef struct {
S
Shengliang Guan 已提交
964 965 966 967 968 969
  int32_t connId;
  int32_t pid;
  int32_t numOfQueries;
  int32_t numOfStreams;
  char    app[TSDB_APP_NAME_LEN];
  char    pData[];
S
Shengliang Guan 已提交
970
} SHeartBeatReq;
D
dapan1121 已提交
971 972

typedef struct {
S
Shengliang Guan 已提交
973 974 975 976 977 978
  int32_t connId;
  int32_t queryId;
  int32_t streamId;
  int32_t totalDnodes;
  int32_t onlineDnodes;
  int8_t  killConnection;
S
Shengliang Guan 已提交
979
  int8_t  align[3];
S
Shengliang Guan 已提交
980
  SEpSet  epSet;
D
dapan1121 已提交
981 982
} SHeartBeatRsp;

S
Shengliang Guan 已提交
983 984 985
typedef struct {
  int32_t connId;
  int32_t queryId;
S
Shengliang Guan 已提交
986
} SKillQueryReq;
S
Shengliang Guan 已提交
987

S
Shengliang Guan 已提交
988 989 990
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);

S
Shengliang Guan 已提交
991 992
typedef struct {
  int32_t connId;
S
Shengliang Guan 已提交
993
} SKillConnReq;
D
dapan1121 已提交
994

S
Shengliang Guan 已提交
995 996 997
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);

S
Shengliang Guan 已提交
998 999 1000 1001 1002 1003 1004
typedef struct {
  int32_t transId;
} SKillTransReq;

int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);

D
dapan1121 已提交
1005 1006 1007 1008
typedef struct {
  char user[TSDB_USER_LEN];
  char spi;
  char encrypt;
1009 1010
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];
S
Shengliang Guan 已提交
1011
} SAuthReq, SAuthRsp;
D
dapan1121 已提交
1012

S
Shengliang Guan 已提交
1013 1014 1015
int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);

D
dapan1121 已提交
1016
typedef struct {
S
Shengliang Guan 已提交
1017 1018 1019
  int8_t finished;
  char   name[TSDB_STEP_NAME_LEN];
  char   desc[TSDB_STEP_DESC_LEN];
S
Shengliang Guan 已提交
1020
} SStartupReq;
D
dapan1121 已提交
1021

1022 1023 1024 1025 1026 1027 1028
/**
 * The layout of the query message payload is as following:
 * +--------------------+---------------------------------+
 * |Sql statement       | Physical plan                   |
 * |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
 * +--------------------+---------------------------------+
 */
L
Liu Jicong 已提交
1029
typedef struct SSubQueryMsg {
D
dapan1121 已提交
1030
  SMsgHead header;
D
dapan1121 已提交
1031
  uint64_t sId;
H
Hongze Cheng 已提交
1032 1033
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1034
  int64_t  refId;
D
dapan1121 已提交
1035
  int8_t   taskType;
1036
  uint32_t sqlLen;  // the query sql,
1037
  uint32_t phyLen;
H
Hongze Cheng 已提交
1038
  char     msg[];
D
dapan1121 已提交
1039
} SSubQueryMsg;
D
dapan 已提交
1040

1041 1042 1043 1044 1045 1046 1047
typedef struct {
  SMsgHead header;
  uint64_t sId;
  uint64_t queryId;
  uint64_t taskId;
} SSinkDataReq;

D
dapan1121 已提交
1048 1049 1050 1051 1052 1053 1054
typedef struct {
  SMsgHead header;
  uint64_t sId;
  uint64_t queryId;
  uint64_t taskId;
} SQueryContinueReq;

S
Shengliang Guan 已提交
1055
typedef struct {
D
dapan1121 已提交
1056
  SMsgHead header;
D
dapan1121 已提交
1057
  uint64_t sId;
H
Hongze Cheng 已提交
1058 1059
  uint64_t queryId;
  uint64_t taskId;
S
Shengliang Guan 已提交
1060
} SResReadyReq;
D
dapan 已提交
1061

S
Shengliang Guan 已提交
1062
typedef struct {
D
dapan1121 已提交
1063 1064 1065
  int32_t code;
} SResReadyRsp;

S
Shengliang Guan 已提交
1066
typedef struct {
D
dapan1121 已提交
1067
  SMsgHead header;
D
dapan1121 已提交
1068
  uint64_t sId;
H
Hongze Cheng 已提交
1069 1070
  uint64_t queryId;
  uint64_t taskId;
S
Shengliang Guan 已提交
1071
} SResFetchReq;
D
dapan 已提交
1072

S
Shengliang Guan 已提交
1073
typedef struct {
D
dapan1121 已提交
1074
  SMsgHead header;
D
dapan1121 已提交
1075
  uint64_t sId;
S
Shengliang Guan 已提交
1076
} SSchTasksStatusReq;
D
dapan1121 已提交
1077

S
Shengliang Guan 已提交
1078
typedef struct {
H
Hongze Cheng 已提交
1079 1080
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1081
  int64_t  refId;
H
Hongze Cheng 已提交
1082
  int8_t   status;
D
dapan1121 已提交
1083 1084
} STaskStatus;

S
Shengliang Guan 已提交
1085
typedef struct {
D
dapan1121 已提交
1086 1087
  int64_t  refId;
  SArray  *taskStatus;  //SArray<STaskStatus>
D
dapan1121 已提交
1088 1089
} SSchedulerStatusRsp;

D
dapan1121 已提交
1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
typedef struct {
  uint64_t queryId;
  uint64_t taskId;
  int8_t   action;
} STaskAction;


typedef struct SQueryNodeEpId {
  int32_t nodeId;  // vgId or qnodeId
  SEp     ep;
} SQueryNodeEpId;


typedef struct {
  SMsgHead       header;
  uint64_t       sId;
  SQueryNodeEpId epId;
  SArray        *taskAction;  //SArray<STaskAction>
} SSchedulerHbReq;

int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq);
int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq);
void tFreeSSchedulerHbReq(SSchedulerHbReq *pReq);


typedef struct {
  uint64_t       seqId;
  SQueryNodeEpId epId;
  SArray        *taskStatus;  //SArray<STaskStatus>
} SSchedulerHbRsp;

int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp);
int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pRsp);
void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp);


S
Shengliang Guan 已提交
1126
typedef struct {
D
dapan1121 已提交
1127
  SMsgHead header;
D
dapan1121 已提交
1128
  uint64_t sId;
H
Hongze Cheng 已提交
1129 1130
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1131
  int64_t  refId;
S
Shengliang Guan 已提交
1132
} STaskCancelReq;
D
dapan1121 已提交
1133

S
Shengliang Guan 已提交
1134
typedef struct {
D
dapan1121 已提交
1135 1136 1137
  int32_t code;
} STaskCancelRsp;

S
Shengliang Guan 已提交
1138
typedef struct {
D
dapan1121 已提交
1139
  SMsgHead header;
D
dapan1121 已提交
1140
  uint64_t sId;
H
Hongze Cheng 已提交
1141 1142
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1143
  int64_t  refId;
S
Shengliang Guan 已提交
1144
} STaskDropReq;
D
dapan1121 已提交
1145

S
Shengliang Guan 已提交
1146
typedef struct {
D
dapan1121 已提交
1147 1148 1149
  int32_t code;
} STaskDropRsp;

L
Liu Jicong 已提交
1150
typedef struct {
L
Liu Jicong 已提交
1151 1152 1153 1154 1155
  char   name[TSDB_TOPIC_FNAME_LEN];
  int8_t igExists;
  char*  sql;
  char*  physicalPlan;
  char*  logicalPlan;
L
Liu Jicong 已提交
1156
} SCMCreateStreamReq;
L
Liu Jicong 已提交
1157

L
Liu Jicong 已提交
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176
typedef struct {
  int64_t streamId;
} SCMCreateStreamRsp;

int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq);
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
void    tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);

typedef struct {
  char   name[TSDB_TOPIC_FNAME_LEN];
  int8_t igExists;
  char*  sql;
  char*  physicalPlan;
  char*  logicalPlan;
} SCMCreateTopicReq;

int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
void    tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
L
Liu Jicong 已提交
1177 1178 1179

typedef struct {
  int64_t topicId;
L
Liu Jicong 已提交
1180
} SCMCreateTopicRsp;
L
Liu Jicong 已提交
1181

L
Liu Jicong 已提交
1182 1183
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
L
Liu Jicong 已提交
1184 1185

typedef struct {
L
Liu Jicong 已提交
1186
  int32_t topicNum;
L
Liu Jicong 已提交
1187
  int64_t consumerId;
L
Liu Jicong 已提交
1188
  char*   consumerGroup;
L
Liu Jicong 已提交
1189
  SArray* topicNames;  // SArray<char*>
L
Liu Jicong 已提交
1190 1191
} SCMSubscribeReq;

S
Shengliang Guan 已提交
1192 1193
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1194
  tlen += taosEncodeFixedI32(buf, pReq->topicNum);
L
Liu Jicong 已提交
1195
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
L
Liu Jicong 已提交
1196
  tlen += taosEncodeString(buf, pReq->consumerGroup);
L
Liu Jicong 已提交
1197

S
Shengliang Guan 已提交
1198
  for (int32_t i = 0; i < pReq->topicNum; i++) {
L
Liu Jicong 已提交
1199
    tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
L
Liu Jicong 已提交
1200
  }
L
Liu Jicong 已提交
1201 1202 1203 1204
  return tlen;
}

static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
L
Liu Jicong 已提交
1205
  buf = taosDecodeFixedI32(buf, &pReq->topicNum);
L
Liu Jicong 已提交
1206
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
L
Liu Jicong 已提交
1207
  buf = taosDecodeString(buf, &pReq->consumerGroup);
L
Liu Jicong 已提交
1208
  pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
S
Shengliang Guan 已提交
1209
  for (int32_t i = 0; i < pReq->topicNum; i++) {
L
Liu Jicong 已提交
1210
    char* name;
L
Liu Jicong 已提交
1211 1212
    buf = taosDecodeString(buf, &name);
    taosArrayPush(pReq->topicNames, &name);
L
Liu Jicong 已提交
1213
  }
L
Liu Jicong 已提交
1214 1215 1216
  return buf;
}

L
Liu Jicong 已提交
1217
typedef struct SMqSubTopic {
L
Liu Jicong 已提交
1218
  int32_t vgId;
L
Liu Jicong 已提交
1219 1220 1221 1222 1223
  int64_t topicId;
  SEpSet  epSet;
} SMqSubTopic;

typedef struct {
L
Liu Jicong 已提交
1224
  int32_t     topicNum;
L
Liu Jicong 已提交
1225
  SMqSubTopic topics[];
L
Liu Jicong 已提交
1226 1227
} SCMSubscribeRsp;

S
Shengliang Guan 已提交
1228 1229
static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1230
  tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
S
Shengliang Guan 已提交
1231
  for (int32_t i = 0; i < pRsp->topicNum; i++) {
L
Liu Jicong 已提交
1232 1233 1234 1235
    tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
    tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
    tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
  }
L
Liu Jicong 已提交
1236 1237 1238 1239
  return tlen;
}

static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
L
Liu Jicong 已提交
1240
  buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
S
Shengliang Guan 已提交
1241
  for (int32_t i = 0; i < pRsp->topicNum; i++) {
L
Liu Jicong 已提交
1242 1243 1244 1245
    buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
    buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
    buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
  }
L
Liu Jicong 已提交
1246 1247 1248 1249 1250 1251 1252 1253
  return buf;
}

typedef struct {
  int64_t topicId;
  int64_t consumerId;
  int64_t consumerGroupId;
  int64_t offset;
L
Liu Jicong 已提交
1254 1255 1256
  char*   sql;
  char*   logicalPlan;
  char*   physicalPlan;
L
Liu Jicong 已提交
1257 1258
} SMVSubscribeReq;

S
Shengliang Guan 已提交
1259 1260
static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281
  tlen += taosEncodeFixedI64(buf, pReq->topicId);
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
  tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
  tlen += taosEncodeFixedI64(buf, pReq->offset);
  tlen += taosEncodeString(buf, pReq->sql);
  tlen += taosEncodeString(buf, pReq->logicalPlan);
  tlen += taosEncodeString(buf, pReq->physicalPlan);
  return tlen;
}

static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->topicId);
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
  buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
  buf = taosDecodeFixedI64(buf, &pReq->offset);
  buf = taosDecodeString(buf, &pReq->sql);
  buf = taosDecodeString(buf, &pReq->logicalPlan);
  buf = taosDecodeString(buf, &pReq->physicalPlan);
  return buf;
}

L
Liu Jicong 已提交
1282
typedef struct {
1283 1284 1285 1286
  const char* key;
  SArray*     lostConsumers;     // SArray<int64_t>
  SArray*     removedConsumers;  // SArray<int64_t>
  SArray*     newConsumers;      // SArray<int64_t>
L
Liu Jicong 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315
} SMqRebSubscribe;

static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
  SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe));
  if (pRebSub == NULL) {
    goto _err;
  }
  pRebSub->key = key;
  pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->lostConsumers == NULL) {
    goto _err;
  }
  pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->removedConsumers == NULL) {
    goto _err;
  }
  pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->newConsumers == NULL) {
    goto _err;
  }
  return pRebSub;
_err:
  taosArrayDestroy(pRebSub->lostConsumers);
  taosArrayDestroy(pRebSub->removedConsumers);
  taosArrayDestroy(pRebSub->newConsumers);
  tfree(pRebSub);
  return NULL;
}

L
Liu Jicong 已提交
1316
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
1317
// deserialization
L
Liu Jicong 已提交
1318
typedef struct {
1319
  SHashObj* rebSubHash;  // SHashObj<key, SMqRebSubscribe>
L
Liu Jicong 已提交
1320 1321
} SMqDoRebalanceMsg;

L
Liu Jicong 已提交
1322
typedef struct {
L
Liu Jicong 已提交
1323
  int64_t status;
L
Liu Jicong 已提交
1324 1325
} SMVSubscribeRsp;

L
Liu Jicong 已提交
1326 1327 1328
typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
1329 1330
} SMDropTopicReq;

S
Shengliang Guan 已提交
1331
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
S
Shengliang Guan 已提交
1332
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
L
Liu Jicong 已提交
1333 1334 1335 1336 1337

typedef struct {
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  alterType;
  SSchema schema;
S
Shengliang Guan 已提交
1338
} SAlterTopicReq;
L
Liu Jicong 已提交
1339 1340 1341 1342

typedef struct {
  SMsgHead head;
  char     name[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
1343
  int64_t  tuid;
L
Liu Jicong 已提交
1344 1345 1346 1347 1348
  int32_t  sverson;
  int32_t  execLen;
  char*    executor;
  int32_t  sqlLen;
  char*    sql;
S
Shengliang Guan 已提交
1349
} SDCreateTopicReq;
L
Liu Jicong 已提交
1350 1351 1352 1353

typedef struct {
  SMsgHead head;
  char     name[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
1354
  int64_t  tuid;
S
Shengliang Guan 已提交
1355
} SDDropTopicReq;
S
Shengliang Guan 已提交
1356

H
more  
Hongze Cheng 已提交
1357
typedef struct SVCreateTbReq {
L
Liu Jicong 已提交
1358
  int64_t  ver;  // use a general definition
H
more  
Hongze Cheng 已提交
1359 1360 1361
  char*    name;
  uint32_t ttl;
  uint32_t keep;
S
Shengliang Guan 已提交
1362
  uint8_t  type;
H
more  
Hongze Cheng 已提交
1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379
  union {
    struct {
      tb_uid_t suid;
      uint32_t nCols;
      SSchema* pSchema;
      uint32_t nTagCols;
      SSchema* pTagSchema;
    } stbCfg;
    struct {
      tb_uid_t suid;
      SKVRow   pTag;
    } ctbCfg;
    struct {
      uint32_t nCols;
      SSchema* pSchema;
    } ntbCfg;
  };
S
Shengliang Guan 已提交
1380
} SVCreateTbReq, SVUpdateTbReq;
H
more  
Hongze Cheng 已提交
1381

H
more  
Hongze Cheng 已提交
1382
typedef struct {
D
dapan1121 已提交
1383 1384
  int32_t code;
  SName   tableName;
1385
  int tmp; // TODO: to avoid compile error
S
Shengliang Guan 已提交
1386
} SVCreateTbRsp, SVUpdateTbRsp;
H
more  
Hongze Cheng 已提交
1387

S
Shengliang Guan 已提交
1388 1389
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void*   tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
H
more  
Hongze Cheng 已提交
1390

H
more  
Hongze Cheng 已提交
1391
typedef struct {
L
Liu Jicong 已提交
1392 1393
  int64_t ver;  // use a general definition
  SArray* pArray;
H
more  
Hongze Cheng 已提交
1394
} SVCreateTbBatchReq;
H
more  
Hongze Cheng 已提交
1395

D
dapan 已提交
1396 1397
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void*   tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
D
dapan1121 已提交
1398

S
Shengliang Guan 已提交
1399
typedef struct {
D
dapan1121 已提交
1400
  SArray* rspList; // SArray<SVCreateTbRsp>
1401
  int tmp; // TODO: to avoid compile error
S
Shengliang Guan 已提交
1402
} SVCreateTbBatchRsp;
H
more  
Hongze Cheng 已提交
1403

D
dapan 已提交
1404 1405 1406
int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp);

S
Shengliang Guan 已提交
1407 1408

typedef struct {
L
Liu Jicong 已提交
1409
  int64_t  ver;
S
Shengliang Guan 已提交
1410 1411 1412
  char*    name;
  uint8_t  type;
  tb_uid_t suid;
S
Shengliang Guan 已提交
1413 1414 1415
} SVDropTbReq;

typedef struct {
1416
  int tmp; // TODO: to avoid compile error
S
Shengliang Guan 已提交
1417 1418
} SVDropTbRsp;

S
Shengliang Guan 已提交
1419 1420 1421
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void*   tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);

S
Shengliang Guan 已提交
1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440
typedef struct {
  SMsgHead head;
  int64_t  uid;
  int32_t  tid;
  int16_t  tversion;
  int16_t  colId;
  int8_t   type;
  int16_t  bytes;
  int32_t  tagValLen;
  int16_t  numOfTags;
  int32_t  schemaLen;
  char     data[];
} SUpdateTagValReq;

typedef struct {
  SMsgHead head;
} SUpdateTagValRsp;

typedef struct {
H
Hongze Cheng 已提交
1441 1442 1443
  SMsgHead head;
} SVShowTablesReq;

S
Shengliang Guan 已提交
1444
typedef struct {
H
Hongze Cheng 已提交
1445
  SMsgHead head;
H
Haojun Liao 已提交
1446
  int32_t  id;
H
Hongze Cheng 已提交
1447 1448
} SVShowTablesFetchReq;

S
Shengliang Guan 已提交
1449
typedef struct {
H
Hongze Cheng 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458
  int64_t useconds;
  int8_t  completed;  // all results are returned to client
  int8_t  precision;
  int8_t  compressed;
  int32_t compLen;
  int32_t numOfRows;
  char    data[];
} SVShowTablesFetchRsp;

L
Liu Jicong 已提交
1459 1460 1461
typedef struct SMqCMGetSubEpReq {
  int64_t consumerId;
  int32_t epoch;
L
Liu Jicong 已提交
1462
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1463 1464
} SMqCMGetSubEpReq;

L
Liu Jicong 已提交
1465 1466 1467 1468 1469 1470 1471 1472
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pMsg->contLen);
  tlen += taosEncodeFixedI32(buf, pMsg->vgId);
  return tlen;
}

typedef struct SMqHbRsp {
1473
  int8_t status;  // idle or not
L
Liu Jicong 已提交
1474
  int8_t vnodeChanged;
1475
  int8_t epChanged;  // should use new epset
L
Liu Jicong 已提交
1476 1477 1478 1479
  int8_t reserved;
  SEpSet epSet;
} SMqHbRsp;

S
Shengliang Guan 已提交
1480 1481
static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
  tlen += taosEncodeFixedI8(buf, pRsp->status);
  tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
  tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
  tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
  buf = taosDecodeFixedI8(buf, &pRsp->status);
  buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
  buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
  buf = taosDecodeSEpSet(buf, &pRsp->epSet);
  return buf;
}

typedef struct SMqHbOneTopicBatchRsp {
1498
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1499 1500 1501
  SArray* rsps;  // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;

S
Shengliang Guan 已提交
1502 1503
static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527
  tlen += taosEncodeString(buf, pBatchRsp->topicName);
  int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
    tlen += taosEncodeSMqHbRsp(buf, pRsp);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
  int32_t sz;
  buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
  buf = taosDecodeFixedI32(buf, &sz);
  pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
  for (int32_t i = 0; i < sz; i++) {
    SMqHbRsp rsp;
    buf = taosDecodeSMqHbRsp(buf, &rsp);
    buf = taosArrayPush(pBatchRsp->rsps, &rsp);
  }
  return buf;
}

typedef struct SMqHbBatchRsp {
1528 1529
  int64_t consumerId;
  SArray* batchRsps;  // SArray<SMqHbOneTopicBatchRsp>
L
Liu Jicong 已提交
1530 1531
} SMqHbBatchRsp;

S
Shengliang Guan 已提交
1532 1533
static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1534 1535 1536 1537
  tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
  int32_t sz;
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
1538
    SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
L
Liu Jicong 已提交
1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549
    tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
  buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
  for (int32_t i = 0; i < sz; i++) {
1550
    SMqHbOneTopicBatchRsp rsp;
L
Liu Jicong 已提交
1551 1552 1553 1554 1555 1556 1557
    buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
    buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
  }
  return buf;
}

typedef struct {
D
dapan1121 已提交
1558
  int32_t key;
L
Liu Jicong 已提交
1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
  int32_t valueLen;
  void*   value;
} SKv;

typedef struct {
  int32_t connId;
  int32_t hbType;
} SClientHbKey;

typedef struct {
  SClientHbKey connKey;
  SHashObj*    info;  // hash<Skv.key, Skv>
} SClientHbReq;

typedef struct {
  int64_t reqId;
  SArray* reqs;  // SArray<SClientHbReq>
} SClientHbBatchReq;

typedef struct {
  SClientHbKey connKey;
  int32_t      status;
D
dapan1121 已提交
1581
  SArray*      info;  // Array<Skv>
L
Liu Jicong 已提交
1582 1583 1584 1585 1586 1587 1588 1589
} SClientHbRsp;

typedef struct {
  int64_t reqId;
  int64_t rspId;
  SArray* rsps;  // SArray<SClientHbRsp>
} SClientHbBatchRsp;

1590
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
L
Liu Jicong 已提交
1591

1592 1593
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
  void* pIter = taosHashIterate(info, NULL);
D
dapan1121 已提交
1594 1595 1596 1597 1598 1599 1600
  while (pIter != NULL) {
    SKv* kv = (SKv*)pIter;
    tfree(kv->value);
    pIter = taosHashIterate(info, pIter);
  }
}

1601
static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
L
Liu Jicong 已提交
1602
  SClientHbReq* req = (SClientHbReq*)pReq;
D
dapan1121 已提交
1603 1604 1605 1606
  if (req->info) {
    tFreeReqKvHash(req->info);
    taosHashCleanup(req->info);
  }
L
Liu Jicong 已提交
1607 1608
}

S
Shengliang Guan 已提交
1609 1610
int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
L
Liu Jicong 已提交
1611 1612

static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
1613
  SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
L
Liu Jicong 已提交
1614 1615 1616 1617 1618 1619 1620 1621
  if (deep) {
    taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
  } else {
    taosArrayDestroy(req->reqs);
  }
  free(pReq);
}

1622 1623
static FORCE_INLINE void tFreeClientKv(void* pKv) {
  SKv* kv = (SKv*)pKv;
D
dapan1121 已提交
1624 1625 1626 1627 1628
  if (kv) {
    tfree(kv->value);
  }
}

1629
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
D
dapan1121 已提交
1630 1631 1632 1633 1634
  SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
  if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}

static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
1635
  SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp;
D
dapan1121 已提交
1636 1637 1638
  taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
}

S
Shengliang Guan 已提交
1639 1640
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
L
Liu Jicong 已提交
1641

S
Shengliang Guan 已提交
1642 1643 1644
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
  if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
  if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
S
Shengliang Guan 已提交
1645
  if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
S
Shengliang Guan 已提交
1646
  return 0;
L
Liu Jicong 已提交
1647 1648
}

S
Shengliang Guan 已提交
1649 1650 1651 1652 1653 1654 1655
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
  if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
  if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
  pKv->value = malloc(pKv->valueLen + 1);
  if (pKv->value == NULL) return -1;
  if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1656 1657
}

S
Shengliang Guan 已提交
1658 1659 1660 1661
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
  if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1;
  if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1662 1663
}

S
Shengliang Guan 已提交
1664 1665 1666 1667
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
  if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1668 1669 1670 1671 1672 1673
}

typedef struct SMqHbVgInfo {
  int32_t vgId;
} SMqHbVgInfo;

S
Shengliang Guan 已提交
1674 1675
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691
  tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
  buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
  return buf;
}

typedef struct SMqHbTopicInfo {
  int32_t epoch;
  int64_t topicUid;
  char    name[TSDB_TOPIC_FNAME_LEN];
  SArray* pVgInfo;
} SMqHbTopicInfo;

S
Shengliang Guan 已提交
1692 1693
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721
  tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
  tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
  tlen += taosEncodeString(buf, pTopicInfo->name);
  int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
    tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
  buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
  buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
  buf = taosDecodeStringTo(buf, pTopicInfo->name);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
  for (int32_t i = 0; i < sz; i++) {
    SMqHbVgInfo vgInfo;
    buf = taosDecodeSMqVgInfo(buf, &vgInfo);
    taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
  }
  return buf;
}

typedef struct SMqHbMsg {
1722 1723 1724 1725
  int32_t status;  // ask hb endpoint
  int32_t epoch;
  int64_t consumerId;
  SArray* pTopics;  // SArray<SMqHbTopicInfo>
L
Liu Jicong 已提交
1726 1727
} SMqHbMsg;

S
Shengliang Guan 已提交
1728 1729
static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1730 1731 1732 1733 1734
  tlen += taosEncodeFixedI32(buf, pMsg->status);
  tlen += taosEncodeFixedI32(buf, pMsg->epoch);
  tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
  int32_t sz = taosArrayGetSize(pMsg->pTopics);
  tlen += taosEncodeFixedI32(buf, sz);
S
Shengliang Guan 已提交
1735
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748
    SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
    tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
  buf = taosDecodeFixedI32(buf, &pMsg->status);
  buf = taosDecodeFixedI32(buf, &pMsg->epoch);
  buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
S
Shengliang Guan 已提交
1749
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
1750 1751 1752 1753 1754 1755 1756
    SMqHbTopicInfo topicInfo;
    buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
    taosArrayPush(pMsg->pTopics, &topicInfo);
  }
  return buf;
}

L
Liu Jicong 已提交
1757
typedef struct {
1758 1759 1760 1761
  int64_t leftForVer;
  int32_t vgId;
  int64_t consumerId;
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1762
  char    cgroup[TSDB_CGROUP_LEN];
1763 1764 1765 1766
  char*   sql;
  char*   logicalPlan;
  char*   physicalPlan;
  char*   qmsg;
L
Liu Jicong 已提交
1767 1768 1769 1770
} SMqSetCVgReq;

static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1771
  tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
L
Liu Jicong 已提交
1772
  tlen += taosEncodeFixedI32(buf, pReq->vgId);
L
Liu Jicong 已提交
1773
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
L
Liu Jicong 已提交
1774
  tlen += taosEncodeString(buf, pReq->topicName);
L
Liu Jicong 已提交
1775
  tlen += taosEncodeString(buf, pReq->cgroup);
L
Liu Jicong 已提交
1776 1777 1778
  tlen += taosEncodeString(buf, pReq->sql);
  tlen += taosEncodeString(buf, pReq->logicalPlan);
  tlen += taosEncodeString(buf, pReq->physicalPlan);
L
Liu Jicong 已提交
1779
  tlen += taosEncodeString(buf, pReq->qmsg);
L
Liu Jicong 已提交
1780 1781 1782 1783
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
L
Liu Jicong 已提交
1784
  buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
L
Liu Jicong 已提交
1785
  buf = taosDecodeFixedI32(buf, &pReq->vgId);
L
Liu Jicong 已提交
1786
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
L
Liu Jicong 已提交
1787
  buf = taosDecodeStringTo(buf, pReq->topicName);
L
Liu Jicong 已提交
1788
  buf = taosDecodeStringTo(buf, pReq->cgroup);
L
Liu Jicong 已提交
1789 1790 1791
  buf = taosDecodeString(buf, &pReq->sql);
  buf = taosDecodeString(buf, &pReq->logicalPlan);
  buf = taosDecodeString(buf, &pReq->physicalPlan);
L
Liu Jicong 已提交
1792
  buf = taosDecodeString(buf, &pReq->qmsg);
L
Liu Jicong 已提交
1793 1794 1795
  return buf;
}

L
Liu Jicong 已提交
1796
typedef struct {
1797 1798 1799 1800
  int64_t leftForVer;
  int32_t vgId;
  int64_t oldConsumerId;
  int64_t newConsumerId;
L
Liu Jicong 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820
} SMqMVRebReq;

static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
  tlen += taosEncodeFixedI32(buf, pReq->vgId);
  tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
  tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
  buf = taosDecodeFixedI32(buf, &pReq->vgId);
  buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
  buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
  return buf;
}

typedef struct {
L
Liu Jicong 已提交
1821 1822 1823 1824
  SMsgHead header;
  int32_t  vgId;
  int64_t  consumerId;
  char     topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1825
  char     cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1826 1827
} SMqSetCVgRsp;

L
Liu Jicong 已提交
1828 1829 1830 1831 1832
typedef struct {
  SMsgHead header;
  int32_t  vgId;
  int64_t  consumerId;
  char     topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1833
  char     cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1834 1835
} SMqMVRebRsp;

L
Liu Jicong 已提交
1836 1837 1838 1839
typedef struct {
  int32_t vgId;
  int64_t offset;
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1840
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1841 1842 1843 1844 1845
} SMqOffset;

typedef struct {
  int32_t    num;
  SMqOffset* offsets;
L
Liu Jicong 已提交
1846
} SMqCMCommitOffsetReq;
L
Liu Jicong 已提交
1847 1848 1849

typedef struct {
  int32_t reserved;
L
Liu Jicong 已提交
1850
} SMqCMCommitOffsetRsp;
L
Liu Jicong 已提交
1851 1852 1853

int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
L
Liu Jicong 已提交
1854 1855
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
L
Liu Jicong 已提交
1856

L
Liu Jicong 已提交
1857 1858
typedef struct {
  uint32_t nCols;
1859
  SSchema* pSchema;
L
Liu Jicong 已提交
1860 1861
} SSchemaWrapper;

S
Shengliang Guan 已提交
1862
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
L
Liu Jicong 已提交
1863 1864 1865 1866 1867 1868 1869 1870
  int32_t tlen = 0;
  tlen += taosEncodeFixedI8(buf, pSchema->type);
  tlen += taosEncodeFixedI32(buf, pSchema->bytes);
  tlen += taosEncodeFixedI32(buf, pSchema->colId);
  tlen += taosEncodeString(buf, pSchema->name);
  return tlen;
}

S
Shengliang Guan 已提交
1871
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
L
Liu Jicong 已提交
1872 1873 1874 1875 1876 1877 1878
  buf = taosDecodeFixedI8(buf, &pSchema->type);
  buf = taosDecodeFixedI32(buf, &pSchema->bytes);
  buf = taosDecodeFixedI32(buf, &pSchema->colId);
  buf = taosDecodeStringTo(buf, pSchema->name);
  return buf;
}

S
Shengliang Guan 已提交
1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
  if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
  if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
  if (tEncodeI32(pEncoder, pSchema->colId) < 0) return -1;
  if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
  return 0;
}

static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
  if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
  if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
  if (tDecodeI32(pDecoder, &pSchema->colId) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
1895 1896 1897
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedU32(buf, pSW->nCols);
1898
  for (int32_t i = 0; i < pSW->nCols; i++) {
S
Shengliang Guan 已提交
1899
    tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
L
Liu Jicong 已提交
1900 1901 1902 1903 1904 1905
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
  buf = taosDecodeFixedU32(buf, &pSW->nCols);
1906
  pSW->pSchema = (SSchema*)calloc(pSW->nCols, sizeof(SSchema));
L
Liu Jicong 已提交
1907 1908 1909
  if (pSW->pSchema == NULL) {
    return NULL;
  }
1910

1911
  for (int32_t i = 0; i < pSW->nCols; i++) {
S
Shengliang Guan 已提交
1912
    buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
L
Liu Jicong 已提交
1913 1914 1915
  }
  return buf;
}
C
Cary Xu 已提交
1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929
typedef enum {
  TD_TIME_UNIT_UNKNOWN = -1,
  TD_TIME_UNIT_YEAR = 0,
  TD_TIME_UNIT_SEASON = 1,
  TD_TIME_UNIT_MONTH = 2,
  TD_TIME_UNIT_WEEK = 3,
  TD_TIME_UNIT_DAY = 4,
  TD_TIME_UNIT_HOUR = 5,
  TD_TIME_UNIT_MINUTE = 6,
  TD_TIME_UNIT_SEC = 7,
  TD_TIME_UNIT_MILLISEC = 8,
  TD_TIME_UNIT_MICROSEC = 9,
  TD_TIME_UNIT_NANOSEC = 10
} ETDTimeUnit;
C
Cary Xu 已提交
1930 1931

typedef struct {
C
Cary Xu 已提交
1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944
  int8_t   version;  // for compatibility(default 0)
  int8_t   intervalUnit;
  int8_t   slidingUnit;
  char     indexName[TSDB_INDEX_NAME_LEN];
  char     timezone[TD_TIMEZONE_LEN];  // sma data is invalid if timezone change.
  uint16_t exprLen;
  uint16_t tagsFilterLen;
  int64_t  indexUid;
  tb_uid_t tableUid;  // super/child/common table uid
  int64_t  interval;
  int64_t  sliding;
  char*    expr;  // sma expression
  char*    tagsFilter;
C
Cary Xu 已提交
1945
} STSma;  // Time-range-wise SMA
C
Cary Xu 已提交
1946

C
Cary Xu 已提交
1947
typedef struct {
C
Cary Xu 已提交
1948 1949 1950
  int64_t ver;  // use a general definition
  STSma   tSma;
} SVCreateTSmaReq;
C
Cary Xu 已提交
1951 1952

typedef struct {
L
Liu Jicong 已提交
1953
  int8_t      type;                                // 0 status report, 1 update data
C
Cary Xu 已提交
1954
  char        indexName[TSDB_INDEX_NAME_LEN];  //
C
Cary Xu 已提交
1955 1956 1957 1958 1959
  STimeWindow windows;
} STSmaMsg;

typedef struct {
  int64_t ver;  // use a general definition
C
Cary Xu 已提交
1960
  char    indexName[TSDB_INDEX_NAME_LEN];
C
Cary Xu 已提交
1961
} SVDropTSmaReq;
1962

C
Cary Xu 已提交
1963
typedef struct {
1964
  int tmp; // TODO: to avoid compile error
C
Cary Xu 已提交
1965 1966 1967 1968 1969 1970
} SVCreateTSmaRsp, SVDropTSmaRsp;

int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
void*   tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void*   tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
C
Cary Xu 已提交
1971 1972

typedef struct {
C
Cary Xu 已提交
1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996
  col_id_t colId;
  uint16_t blockSize;  // sma data block size
  char     data[];
} STSmaColData;

typedef struct {
  tb_uid_t tableUid;  // super/child/normal table uid
  int32_t  dataLen;   // not including head
  char     data[];
} STSmaTbData;

typedef struct {
  int64_t indexUid;
  TSKEY   skey;  // startTS of one interval/sliding
  int64_t interval;
  int32_t dataLen;  // not including head
  int8_t  intervalUnit;
  char    data[];
} STSmaDataWrapper;  // sma data for a interval/sliding window

// interval/sliding => window

// => window->table->colId
// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记
C
Cary Xu 已提交
1997

C
Cary Xu 已提交
1998
// RSma: Rollup SMA
C
Cary Xu 已提交
1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011
typedef struct {
  int64_t  interval;
  int32_t  retention;  // unit: day
  uint16_t days;       // unit: day
  int8_t   intervalUnit;
} SSmaParams;

typedef struct {
  STSma   tsma;
  float   xFilesFactor;
  SArray* smaParams;  // SSmaParams
} SRSma;

C
Cary Xu 已提交
2012 2013 2014 2015 2016
typedef struct {
  uint32_t number;
  STSma*   tSma;
} STSmaWrapper;

C
Cary Xu 已提交
2017
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
C
Cary Xu 已提交
2018
  if (pSma) {
C
Cary Xu 已提交
2019
    tfree(pSma->expr);
C
Cary Xu 已提交
2020
    tfree(pSma->tagsFilter);
C
Cary Xu 已提交
2021 2022 2023
  }
}

C
Cary Xu 已提交
2024
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
C
Cary Xu 已提交
2025 2026 2027
  if (pSW) {
    if (pSW->tSma) {
      for (uint32_t i = 0; i < pSW->number; ++i) {
C
Cary Xu 已提交
2028
        tdDestroyTSma(pSW->tSma + i);
C
Cary Xu 已提交
2029 2030 2031
      }
      tfree(pSW->tSma);
    }
C
Cary Xu 已提交
2032 2033 2034 2035 2036 2037
  }
}

static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
  int32_t tlen = 0;

C
Cary Xu 已提交
2038 2039 2040
  tlen += taosEncodeFixedI8(buf, pSma->version);
  tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
  tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
C
Cary Xu 已提交
2041
  tlen += taosEncodeString(buf, pSma->indexName);
C
Cary Xu 已提交
2042
  tlen += taosEncodeString(buf, pSma->timezone);
C
Cary Xu 已提交
2043
  tlen += taosEncodeFixedU16(buf, pSma->exprLen);
C
Cary Xu 已提交
2044
  tlen += taosEncodeFixedU16(buf, pSma->tagsFilterLen);
C
Cary Xu 已提交
2045
  tlen += taosEncodeFixedI64(buf, pSma->indexUid);
C
Cary Xu 已提交
2046
  tlen += taosEncodeFixedI64(buf, pSma->tableUid);
C
Cary Xu 已提交
2047 2048
  tlen += taosEncodeFixedI64(buf, pSma->interval);
  tlen += taosEncodeFixedI64(buf, pSma->sliding);
C
Cary Xu 已提交
2049 2050 2051
  
  if (pSma->exprLen > 0) {
    tlen += taosEncodeString(buf, pSma->expr);
C
Cary Xu 已提交
2052 2053
  }

C
Cary Xu 已提交
2054 2055
  if (pSma->tagsFilterLen > 0) {
    tlen += taosEncodeString(buf, pSma->tagsFilter);
C
Cary Xu 已提交
2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071
  }

  return tlen;
}

static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
  int32_t tlen = 0;

  tlen += taosEncodeFixedU32(buf, pSW->number);
  for (uint32_t i = 0; i < pSW->number; ++i) {
    tlen += tEncodeTSma(buf, pSW->tSma + i);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
C
Cary Xu 已提交
2072 2073 2074
  buf = taosDecodeFixedI8(buf, &pSma->version);
  buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
  buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
C
Cary Xu 已提交
2075
  buf = taosDecodeStringTo(buf, pSma->indexName);
C
Cary Xu 已提交
2076
  buf = taosDecodeStringTo(buf, pSma->timezone);
C
Cary Xu 已提交
2077
  buf = taosDecodeFixedU16(buf, &pSma->exprLen);
C
Cary Xu 已提交
2078
  buf = taosDecodeFixedU16(buf, &pSma->tagsFilterLen);
C
Cary Xu 已提交
2079
  buf = taosDecodeFixedI64(buf, &pSma->indexUid);
C
Cary Xu 已提交
2080
  buf = taosDecodeFixedI64(buf, &pSma->tableUid);
C
Cary Xu 已提交
2081 2082 2083
  buf = taosDecodeFixedI64(buf, &pSma->interval);
  buf = taosDecodeFixedI64(buf, &pSma->sliding);

C
Cary Xu 已提交
2084 2085 2086 2087 2088 2089

  if (pSma->exprLen > 0) {
    pSma->expr = (char*)calloc(pSma->exprLen, 1);
    if (pSma->expr != NULL) {
      buf = taosDecodeStringTo(buf, pSma->expr);
    } else {
C
Cary Xu 已提交
2090
      tdDestroyTSma(pSma);
C
Cary Xu 已提交
2091 2092
      return NULL;
    }
C
Cary Xu 已提交
2093

C
Cary Xu 已提交
2094
  } else {
C
Cary Xu 已提交
2095
    pSma->expr = NULL;
C
Cary Xu 已提交
2096 2097
  }

C
Cary Xu 已提交
2098 2099 2100 2101 2102 2103
  if (pSma->tagsFilterLen > 0) {
    pSma->tagsFilter = (char*)calloc(pSma->tagsFilterLen, 1);
    if (pSma->tagsFilter != NULL) {
      buf = taosDecodeStringTo(buf, pSma->tagsFilter);
    } else {
      tdDestroyTSma(pSma);
C
Cary Xu 已提交
2104 2105
      return NULL;
    }
C
Cary Xu 已提交
2106

C
Cary Xu 已提交
2107
  } else {
C
Cary Xu 已提交
2108
    pSma->tagsFilter = NULL;
C
Cary Xu 已提交
2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
  }

  return buf;
}

static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
  buf = taosDecodeFixedU32(buf, &pSW->number);

  pSW->tSma = (STSma*)calloc(pSW->number, sizeof(STSma));
  if (pSW->tSma == NULL) {
    return NULL;
  }

  for (uint32_t i = 0; i < pSW->number; ++i) {
    if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
      for (uint32_t j = i; j >= 0; --i) {
C
Cary Xu 已提交
2125
        tdDestroyTSma(pSW->tSma + j);
C
Cary Xu 已提交
2126 2127 2128 2129 2130 2131 2132
      }
      free(pSW->tSma);
      return NULL;
    }
  }
  return buf;
}
L
Liu Jicong 已提交
2133

L
Liu Jicong 已提交
2134
typedef struct {
2135 2136 2137
  int64_t uid;
  int32_t numOfRows;
  char*   colData;
L
Liu Jicong 已提交
2138 2139
} SMqTbData;

L
Liu Jicong 已提交
2140
typedef struct {
L
Liu Jicong 已提交
2141 2142 2143 2144 2145 2146 2147 2148
  char       topicName[TSDB_TOPIC_FNAME_LEN];
  int64_t    committedOffset;
  int64_t    reqOffset;
  int64_t    rspOffset;
  int32_t    skipLogNum;
  int32_t    bodyLen;
  int32_t    numOfTb;
  SMqTbData* tbData;
L
Liu Jicong 已提交
2149 2150
} SMqTopicData;

L
Liu Jicong 已提交
2151 2152 2153 2154 2155 2156
typedef struct {
  int8_t  mqMsgType;
  int32_t code;
  int32_t epoch;
} SMqRspHead;

L
Liu Jicong 已提交
2157
typedef struct {
L
Liu Jicong 已提交
2158 2159
  int64_t         consumerId;
  SSchemaWrapper* schemas;
L
Liu Jicong 已提交
2160 2161 2162
  int64_t         reqOffset;
  int64_t         rspOffset;
  int32_t         skipLogNum;
L
Liu Jicong 已提交
2163
  int32_t         numOfTopics;
2164
  SArray*         pBlockData;  // SArray<SSDataBlock>
L
Liu Jicong 已提交
2165
} SMqPollRsp;
L
Liu Jicong 已提交
2166

L
Liu Jicong 已提交
2167
// one req for one vg+topic
L
Liu Jicong 已提交
2168
typedef struct {
2169
  SMsgHead head;
L
Liu Jicong 已提交
2170

2171 2172
  int64_t consumerId;
  int64_t blockingTime;
L
Liu Jicong 已提交
2173
  int32_t epoch;
L
Liu Jicong 已提交
2174
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
2175

L
Liu Jicong 已提交
2176
  int64_t currentOffset;
2177
  char    topic[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
2178
} SMqPollReq;
L
Liu Jicong 已提交
2179

L
Liu Jicong 已提交
2180
typedef struct {
L
Liu Jicong 已提交
2181
  int32_t vgId;
L
Liu Jicong 已提交
2182
  int64_t offset;
L
Liu Jicong 已提交
2183 2184 2185
  SEpSet  epSet;
} SMqSubVgEp;

L
Liu Jicong 已提交
2186
typedef struct {
L
Liu Jicong 已提交
2187
  char    topic[TSDB_TOPIC_FNAME_LEN];
2188
  SArray* vgs;  // SArray<SMqSubVgEp>
L
Liu Jicong 已提交
2189 2190
} SMqSubTopicEp;

L
Liu Jicong 已提交
2191
typedef struct {
L
Liu Jicong 已提交
2192
  int64_t consumerId;
L
Liu Jicong 已提交
2193
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
2194 2195 2196
  SArray* topics;  // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;

L
Liu Jicong 已提交
2197 2198 2199
struct tmq_message_t {
  SMqRspHead head;
  union {
L
Liu Jicong 已提交
2200
    SMqPollRsp       consumeRsp;
L
Liu Jicong 已提交
2201 2202
    SMqCMGetSubEpRsp getEpRsp;
  };
L
Liu Jicong 已提交
2203
  void* extra;
L
Liu Jicong 已提交
2204 2205
};

2206
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
L
Liu Jicong 已提交
2207

L
Liu Jicong 已提交
2208 2209
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
2210
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
L
Liu Jicong 已提交
2211
  tlen += taosEncodeFixedI64(buf, pVgEp->offset);
L
Liu Jicong 已提交
2212 2213 2214 2215 2216 2217
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
L
Liu Jicong 已提交
2218
  buf = taosDecodeFixedI64(buf, &pVgEp->offset);
L
Liu Jicong 已提交
2219 2220 2221 2222
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return buf;
}

L
Liu Jicong 已提交
2223
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
2224
  taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
L
Liu Jicong 已提交
2225 2226
}

L
Liu Jicong 已提交
2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopicEp->topic);
  int32_t sz = taosArrayGetSize(pTopicEp->vgs);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
    tlen += tEncodeSMqSubVgEp(buf, pVgEp);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
  buf = taosDecodeStringTo(buf, pTopicEp->topic);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
  if (pTopicEp->vgs == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
    SMqSubVgEp vgEp;
    buf = tDecodeSMqSubVgEp(buf, &vgEp);
    taosArrayPush(pTopicEp->vgs, &vgEp);
  }
  return buf;
}

static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
  tlen += taosEncodeString(buf, pRsp->cgroup);
  int32_t sz = taosArrayGetSize(pRsp->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
    tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
  buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
  buf = taosDecodeStringTo(buf, pRsp->cgroup);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
  if (pRsp->topics == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
    SMqSubTopicEp topicEp;
    buf = tDecodeSMqSubTopicEp(buf, &topicEp);
    taosArrayPush(pRsp->topics, &topicEp);
  }
  return buf;
}
L
Liu Jicong 已提交
2284

L
Liu Jicong 已提交
2285 2286
#pragma pack(pop)

D
dapan1121 已提交
2287 2288 2289 2290 2291
#ifdef __cplusplus
}
#endif

#endif /*_TD_COMMON_TAOS_MSG_H_*/