tmsg.h 69.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_COMMON_TAOS_MSG_H_
#define _TD_COMMON_TAOS_MSG_H_

#include "taosdef.h"
#include "taoserror.h"
H
more  
Hongze Cheng 已提交
21
#include "tarray.h"
H
more  
Hongze Cheng 已提交
22
#include "tcoding.h"
S
Shengliang Guan 已提交
23
#include "tencode.h"
L
Liu Jicong 已提交
24
#include "thash.h"
H
Hongze Cheng 已提交
25
#include "tlist.h"
D
dapan1121 已提交
26
#include "tname.h"
L
Liu Jicong 已提交
27
#include "trow.h"
L
Liu Jicong 已提交
28
#include "tuuid.h"
D
dapan1121 已提交
29

S
Shengliang Guan 已提交
30 31 32 33
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
34
/* ------------------------ MESSAGE DEFINITIONS ------------------------ */
H
Hongze Cheng 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#define TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"

#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#define TD_MSG_SEG_CODE_
#include "tmsgdef.h"

#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
#undef TD_MSG_INFO_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"

S
Shengliang Guan 已提交
53 54
extern char*   tMsgInfo[];
extern int32_t tMsgDict[];
H
Hongze Cheng 已提交
55 56

#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
L
Liu Jicong 已提交
57 58 59
#define TMSG_SEG_SEQ(TYPE)  ((TYPE)&0xff)
#define TMSG_INFO(TYPE)     tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)]
#define TMSG_INDEX(TYPE)    (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
D
dapan1121 已提交
60

H
Hongze Cheng 已提交
61 62
typedef uint16_t tmsg_t;

H
Hongze Cheng 已提交
63
/* ------------------------ OTHER DEFINITIONS ------------------------ */
D
dapan1121 已提交
64
// IE type
L
Liu Jicong 已提交
65 66 67 68
#define TSDB_IE_TYPE_SEC         1
#define TSDB_IE_TYPE_META        2
#define TSDB_IE_TYPE_MGMT_IP     3
#define TSDB_IE_TYPE_DNODE_CFG   4
D
dapan1121 已提交
69
#define TSDB_IE_TYPE_NEW_VERSION 5
L
Liu Jicong 已提交
70
#define TSDB_IE_TYPE_DNODE_EXT   6
D
dapan1121 已提交
71 72
#define TSDB_IE_TYPE_DNODE_STATE 7

L
Liu Jicong 已提交
73
typedef enum {
74
  HEARTBEAT_TYPE_MQ = 0,
L
Liu Jicong 已提交
75
  HEARTBEAT_TYPE_QUERY,
L
Liu Jicong 已提交
76 77 78 79 80
  // types can be added here
  //
  HEARTBEAT_TYPE_MAX
} EHbType;

D
dapan1121 已提交
81
enum {
D
dapan1121 已提交
82 83
  HEARTBEAT_KEY_DBINFO = 1,
  HEARTBEAT_KEY_STBINFO,
D
dapan1121 已提交
84
  HEARTBEAT_KEY_MQ_TMP,
D
dapan1121 已提交
85 86
};

S
Shengliang Guan 已提交
87 88
typedef enum _mgmt_table {
  TSDB_MGMT_TABLE_START,
D
dapan1121 已提交
89 90 91 92 93 94
  TSDB_MGMT_TABLE_ACCT,
  TSDB_MGMT_TABLE_USER,
  TSDB_MGMT_TABLE_DB,
  TSDB_MGMT_TABLE_TABLE,
  TSDB_MGMT_TABLE_DNODE,
  TSDB_MGMT_TABLE_MNODE,
S
Shengliang Guan 已提交
95 96 97
  TSDB_MGMT_TABLE_QNODE,
  TSDB_MGMT_TABLE_SNODE,
  TSDB_MGMT_TABLE_BNODE,
D
dapan1121 已提交
98
  TSDB_MGMT_TABLE_VGROUP,
S
Shengliang Guan 已提交
99
  TSDB_MGMT_TABLE_STB,
D
dapan1121 已提交
100 101 102 103 104
  TSDB_MGMT_TABLE_MODULE,
  TSDB_MGMT_TABLE_QUERIES,
  TSDB_MGMT_TABLE_STREAMS,
  TSDB_MGMT_TABLE_VARIABLES,
  TSDB_MGMT_TABLE_CONNS,
S
Shengliang Guan 已提交
105
  TSDB_MGMT_TABLE_TRANS,
D
dapan1121 已提交
106 107 108
  TSDB_MGMT_TABLE_GRANTS,
  TSDB_MGMT_TABLE_VNODES,
  TSDB_MGMT_TABLE_CLUSTER,
S
Shengliang Guan 已提交
109
  TSDB_MGMT_TABLE_STREAMTABLES,
D
dapan1121 已提交
110
  TSDB_MGMT_TABLE_TP,
S
Shengliang 已提交
111
  TSDB_MGMT_TABLE_FUNC,
S
sma  
Shengliang Guan 已提交
112
  TSDB_MGMT_TABLE_INDEX,
D
dapan1121 已提交
113
  TSDB_MGMT_TABLE_MAX,
S
Shengliang Guan 已提交
114
} EShowType;
D
dapan1121 已提交
115

116 117 118 119
#define TSDB_ALTER_TABLE_ADD_TAG             1
#define TSDB_ALTER_TABLE_DROP_TAG            2
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME     3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL      4
L
Liu Jicong 已提交
120 121
#define TSDB_ALTER_TABLE_ADD_COLUMN          5
#define TSDB_ALTER_TABLE_DROP_COLUMN         6
S
Shengliang Guan 已提交
122
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
L
Liu Jicong 已提交
123
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES    8
124 125
#define TSDB_ALTER_TABLE_UPDATE_OPTIONS      9
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME  10
D
dapan1121 已提交
126

L
Liu Jicong 已提交
127 128
#define TSDB_FILL_NONE      0
#define TSDB_FILL_NULL      1
H
Hongze Cheng 已提交
129
#define TSDB_FILL_SET_VALUE 2
L
Liu Jicong 已提交
130 131 132 133 134 135 136 137 138 139
#define TSDB_FILL_LINEAR    3
#define TSDB_FILL_PREV      4
#define TSDB_FILL_NEXT      5

#define TSDB_ALTER_USER_PASSWD          0x1
#define TSDB_ALTER_USER_SUPERUSER       0x2
#define TSDB_ALTER_USER_ADD_READ_DB     0x3
#define TSDB_ALTER_USER_REMOVE_READ_DB  0x4
#define TSDB_ALTER_USER_CLEAR_READ_DB   0x5
#define TSDB_ALTER_USER_ADD_WRITE_DB    0x6
S
Shengliang Guan 已提交
140
#define TSDB_ALTER_USER_REMOVE_WRITE_DB 0x7
L
Liu Jicong 已提交
141
#define TSDB_ALTER_USER_CLEAR_WRITE_DB  0x8
S
Shengliang Guan 已提交
142

D
dapan1121 已提交
143 144
#define TSDB_ALTER_USER_PRIVILEGES 0x2

H
Hongze Cheng 已提交
145
#define TSDB_KILL_MSG_LEN 30
D
dapan1121 已提交
146

D
dapan1121 已提交
147 148
#define TSDB_TABLE_NUM_UNIT 100000

L
Liu Jicong 已提交
149
#define TSDB_VN_READ_ACCCESS  ((char)0x1)
H
Hongze Cheng 已提交
150
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
L
Liu Jicong 已提交
151
#define TSDB_VN_ALL_ACCCESS   (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
D
dapan1121 已提交
152

H
Hongze Cheng 已提交
153
#define TSDB_COL_NORMAL 0x0u  // the normal column of the table
L
Liu Jicong 已提交
154 155 156 157
#define TSDB_COL_TAG    0x1u  // the tag column type
#define TSDB_COL_UDC    0x2u  // the user specified normal string column, it is a dummy column
#define TSDB_COL_TMP    0x4u  // internal column generated by the previous operators
#define TSDB_COL_NULL   0x8u  // the column filter NULL or not
D
dapan1121 已提交
158

L
Liu Jicong 已提交
159
#define TSDB_COL_IS_TAG(f)        (((f & (~(TSDB_COL_NULL))) & TSDB_COL_TAG) != 0)
H
Hongze Cheng 已提交
160
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
L
Liu Jicong 已提交
161 162
#define TSDB_COL_IS_UD_COL(f)     ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
#define TSDB_COL_REQ_NULL(f)      (((f)&TSDB_COL_NULL) != 0)
S
Shengliang Guan 已提交
163

L
Liu Jicong 已提交
164 165
#define TD_SUPER_TABLE  TSDB_SUPER_TABLE
#define TD_CHILD_TABLE  TSDB_CHILD_TABLE
S
Shengliang Guan 已提交
166 167
#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE

S
Shengliang Guan 已提交
168
typedef struct {
S
Shengliang Guan 已提交
169
  int32_t vgId;
D
dapan1121 已提交
170 171
  char*   dbFName;
  char*   tbName;
D
dapan1121 已提交
172 173
} SBuildTableMetaInput;

S
Shengliang Guan 已提交
174
typedef struct {
D
dapan1121 已提交
175
  char    db[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
176
  int64_t dbId;
177
  int32_t vgVersion;
L
Liu Jicong 已提交
178
  int32_t numOfTable;  // unit is TSDB_TABLE_NUM_UNIT
D
dapan1121 已提交
179 180
} SBuildUseDBInput;

S
Shengliang Guan 已提交
181 182 183 184 185 186
typedef struct SField {
  char    name[TSDB_COL_NAME_LEN];
  uint8_t type;
  int32_t bytes;
} SField;

S
Shengliang Guan 已提交
187
typedef struct SRetention {
S
sma  
Shengliang Guan 已提交
188 189 190 191
  int32_t freq;
  int32_t keep;
  int8_t  freqUnit;
  int8_t  keepUnit;
S
Shengliang Guan 已提交
192 193
} SRetention;

D
dapan1121 已提交
194 195 196
#pragma pack(push, 1)

// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
H
Haojun Liao 已提交
197
typedef struct SEp {
D
dapan1121 已提交
198
  char     fqdn[TSDB_FQDN_LEN];
D
dapan1121 已提交
199
  uint16_t port;
H
Haojun Liao 已提交
200
} SEp;
D
dapan1121 已提交
201

S
Shengliang Guan 已提交
202
typedef struct {
D
dapan1121 已提交
203
  int32_t contLen;
L
Liu Jicong 已提交
204
  int32_t vgId;
D
dapan1121 已提交
205 206 207 208
} SMsgHead;

// Submit message for one table
typedef struct SSubmitBlk {
209
  int64_t uid;        // table unique id
L
Liu Jicong 已提交
210
  int64_t suid;       // stable id
211 212 213 214
  int32_t sversion;   // data schema version
  int32_t dataLen;    // data part length, not including the SSubmitBlk head
  int32_t schemaLen;  // schema length, if length is 0, no schema exists
  int16_t numOfRows;  // total number of rows in current submit block
C
Cary Xu 已提交
215
  int16_t padding;    // TODO just for padding here
216
  char    data[];
D
dapan1121 已提交
217 218 219
} SSubmitBlk;

// Submit message for this TSDB
S
Shengliang Guan 已提交
220
typedef struct {
S
Shengliang Guan 已提交
221
  SMsgHead header;
H
Hongze Cheng 已提交
222
  int64_t  version;
S
Shengliang Guan 已提交
223 224 225
  int32_t  length;
  int32_t  numOfBlocks;
  char     blocks[];
S
Shengliang Guan 已提交
226
} SSubmitReq;
D
dapan1121 已提交
227

H
Hongze Cheng 已提交
228 229 230
typedef struct {
  int32_t totalLen;
  int32_t len;
231
  STSRow* row;
H
Hongze Cheng 已提交
232 233 234
} SSubmitBlkIter;

typedef struct {
C
Cary Xu 已提交
235 236 237
  int32_t     totalLen;
  int32_t     len;
  const void* pMsg;
H
Hongze Cheng 已提交
238 239
} SSubmitMsgIter;

C
Cary Xu 已提交
240
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
S
Shengliang Guan 已提交
241 242
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
C
Cary Xu 已提交
243
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
H
Hongze Cheng 已提交
244

D
dapan1121 已提交
245 246 247 248 249
typedef struct {
  int32_t index;  // index of failed block in submit blocks
  int32_t vnode;  // vnode index of failed block
  int32_t sid;    // table index of failed block
  int32_t code;   // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
S
Shengliang Guan 已提交
250
} SSubmitRspBlock;
D
dapan1121 已提交
251 252

typedef struct {
S
Shengliang Guan 已提交
253 254 255 256 257 258 259
  int32_t         code;          // 0-success, > 0 error code
  int32_t         numOfRows;     // number of records the client is trying to write
  int32_t         affectedRows;  // number of records actually written
  int32_t         failedRows;    // number of failed records (exclude duplicate records)
  int32_t         numOfFailedBlocks;
  SSubmitRspBlock failedBlocks[];
} SSubmitRsp;
D
dapan1121 已提交
260 261

typedef struct SSchema {
262
  int8_t   type;
dengyihao's avatar
dengyihao 已提交
263
  int8_t   index;  // default is 0, not index created
264 265 266
  col_id_t colId;
  int32_t  bytes;
  char     name[TSDB_COL_NAME_LEN];
D
dapan1121 已提交
267 268
} SSchema;

C
Cary Xu 已提交
269 270 271 272 273 274 275 276
typedef struct {
  int8_t   type;
  int8_t   sma;  // ETsdbBSmaType and default is TSDB_BSMA_TYPE_I
  col_id_t colId;
  int32_t  bytes;
  char     name[TSDB_COL_NAME_LEN];
} SSchemaEx;

dengyihao's avatar
dengyihao 已提交
277 278
#define SSCHMEA_TYPE(s)  ((s)->type)
#define SSCHMEA_SMA(s)   ((s)->sma)
C
Cary Xu 已提交
279 280
#define SSCHMEA_COLID(s) ((s)->colId)
#define SSCHMEA_BYTES(s) ((s)->bytes)
dengyihao's avatar
dengyihao 已提交
281
#define SSCHMEA_NAME(s)  ((s)->name)
C
Cary Xu 已提交
282

D
dapan1121 已提交
283
typedef struct {
S
Shengliang Guan 已提交
284 285
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  igExists;
S
sma  
Shengliang Guan 已提交
286 287 288
  float   xFilesFactor;
  int32_t aggregationMethod;
  int32_t delay;
S
sma  
Shengliang Guan 已提交
289
  int32_t ttl;
S
Shengliang Guan 已提交
290
  int32_t numOfColumns;
S
Shengliang Guan 已提交
291
  int32_t numOfTags;
S
sma  
Shengliang Guan 已提交
292
  int32_t numOfSmas;
S
sma  
Shengliang Guan 已提交
293
  int32_t commentLen;
S
sma  
Shengliang Guan 已提交
294 295 296
  SArray* pColumns;  // array of SField
  SArray* pTags;     // array of SField
  SArray* pSmas;     // array of SField
S
sma  
Shengliang Guan 已提交
297
  char*   comment;
S
Shengliang Guan 已提交
298
} SMCreateStbReq;
D
dapan1121 已提交
299

S
Shengliang Guan 已提交
300 301
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
S
Shengliang Guan 已提交
302
void    tFreeSMCreateStbReq(SMCreateStbReq* pReq);
S
Shengliang Guan 已提交
303

D
dapan1121 已提交
304 305 306
typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
307
} SMDropStbReq;
S
Shengliang Guan 已提交
308

S
Shengliang Guan 已提交
309 310
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
S
Shengliang Guan 已提交
311

S
Shengliang Guan 已提交
312 313 314
typedef struct {
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  alterType;
S
Shengliang Guan 已提交
315 316
  int32_t numOfFields;
  SArray* pFields;
S
Shengliang Guan 已提交
317
} SMAltertbReq;
D
dapan1121 已提交
318

S
Shengliang Guan 已提交
319 320 321
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void    tFreeSMAltertbReq(SMAltertbReq* pReq);
D
dapan1121 已提交
322 323

typedef struct SEpSet {
324 325 326
  int8_t inUse;
  int8_t numOfEps;
  SEp    eps[TSDB_MAX_REPLICA];
D
dapan1121 已提交
327 328
} SEpSet;

S
Shengliang Guan 已提交
329 330 331 332
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void*   taosDecodeSEpSet(void* buf, SEpSet* pEp);
L
Liu Jicong 已提交
333

D
dapan1121 已提交
334 335
typedef struct {
  int32_t pid;
S
Shengliang Guan 已提交
336 337
  char    app[TSDB_APP_NAME_LEN];
  char    db[TSDB_DB_NAME_LEN];
338
  int64_t startTime;
S
Shengliang Guan 已提交
339
} SConnectReq;
L
Liu Jicong 已提交
340

S
Shengliang Guan 已提交
341 342
int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
343

L
Liu Jicong 已提交
344 345 346 347 348 349
typedef struct {
  int32_t acctId;
  int64_t clusterId;
  int32_t connId;
  int8_t  superUser;
  SEpSet  epSet;
350
  char    sVersion[128];
L
Liu Jicong 已提交
351
} SConnectRsp;
L
Liu Jicong 已提交
352

S
Shengliang Guan 已提交
353 354 355
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);

L
Liu Jicong 已提交
356 357 358 359 360 361 362 363 364 365
typedef struct {
  char    user[TSDB_USER_LEN];
  char    pass[TSDB_PASSWORD_LEN];
  int32_t maxUsers;
  int32_t maxDbs;
  int32_t maxTimeSeries;
  int32_t maxStreams;
  int32_t accessState;  // Configured only by command
  int64_t maxStorage;   // In unit of GB
} SCreateAcctReq, SAlterAcctReq;
L
Liu Jicong 已提交
366

S
Shengliang Guan 已提交
367 368
int32_t tSerializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pReq);
S
Shengliang Guan 已提交
369

L
Liu Jicong 已提交
370 371 372
typedef struct {
  char user[TSDB_USER_LEN];
} SDropUserReq, SDropAcctReq;
D
dapan1121 已提交
373

S
Shengliang Guan 已提交
374 375
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
S
Shengliang Guan 已提交
376

D
dapan1121 已提交
377
typedef struct {
S
Shengliang Guan 已提交
378 379
  int8_t createType;
  int8_t superUser;  // denote if it is a super user or not
S
Shengliang Guan 已提交
380
  char   user[TSDB_USER_LEN];
S
Shengliang Guan 已提交
381
  char   pass[TSDB_USET_PASSWORD_LEN];
S
Shengliang Guan 已提交
382 383
} SCreateUserReq;

S
Shengliang Guan 已提交
384 385
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
S
Shengliang Guan 已提交
386 387 388

typedef struct {
  int8_t alterType;
S
Shengliang Guan 已提交
389
  int8_t superUser;
S
Shengliang Guan 已提交
390
  char   user[TSDB_USER_LEN];
S
Shengliang Guan 已提交
391
  char   pass[TSDB_USET_PASSWORD_LEN];
S
Shengliang Guan 已提交
392 393 394
  char   dbname[TSDB_DB_FNAME_LEN];
} SAlterUserReq;

S
Shengliang Guan 已提交
395 396
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
D
dapan1121 已提交
397

S
Shengliang Guan 已提交
398 399 400 401
typedef struct {
  char user[TSDB_USER_LEN];
} SGetUserAuthReq;

S
Shengliang Guan 已提交
402 403
int32_t tSerializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq* pReq);
S
Shengliang Guan 已提交
404 405 406 407 408 409 410 411

typedef struct {
  char      user[TSDB_USER_LEN];
  int8_t    superAuth;
  SHashObj* readDbs;
  SHashObj* writeDbs;
} SGetUserAuthRsp;

S
Shengliang Guan 已提交
412 413
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
S
Shengliang Guan 已提交
414

D
dapan1121 已提交
415
typedef struct {
S
Shengliang Guan 已提交
416 417 418
  int16_t colId;     // column id
  int16_t colIndex;  // column index in colList if it is a normal column or index in tagColList if a tag
  int16_t flag;      // denote if it is a tag or a normal column
419
  char    name[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
420 421
} SColIndex;

S
Shengliang Guan 已提交
422
typedef struct {
D
dapan1121 已提交
423 424
  int16_t lowerRelOptr;
  int16_t upperRelOptr;
S
Shengliang Guan 已提交
425
  int16_t filterstr;  // denote if current column is char(binary/nchar)
D
dapan1121 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442

  union {
    struct {
      int64_t lowerBndi;
      int64_t upperBndi;
    };
    struct {
      double lowerBndd;
      double upperBndd;
    };
    struct {
      int64_t pz;
      int64_t len;
    };
  };
} SColumnFilterInfo;

S
Shengliang Guan 已提交
443
typedef struct {
S
Shengliang Guan 已提交
444 445 446
  int16_t numOfFilters;
  union {
    int64_t            placeholder;
H
Hongze Cheng 已提交
447
    SColumnFilterInfo* filterInfo;
D
dapan1121 已提交
448 449 450
  };
} SColumnFilterList;
/*
L
Liu Jicong 已提交
451
 * for client side struct, only column id, type, bytes are necessary
D
dapan1121 已提交
452 453
 * But for data in vnode side, we need all the following information.
 */
S
Shengliang Guan 已提交
454
typedef struct {
H
Haojun Liao 已提交
455
  union {
456 457
    col_id_t colId;
    int16_t  slotId;
H
Haojun Liao 已提交
458 459
  };

L
Liu Jicong 已提交
460 461 462 463
  int16_t type;
  int32_t bytes;
  uint8_t precision;
  uint8_t scale;
D
dapan1121 已提交
464 465
} SColumnInfo;

S
Shengliang Guan 已提交
466
typedef struct {
L
Liu Jicong 已提交
467 468
  int64_t uid;
  TSKEY   key;  // last accessed ts, for subscription
D
dapan1121 已提交
469 470 471 472 473 474 475 476
} STableIdInfo;

typedef struct STimeWindow {
  TSKEY skey;
  TSKEY ekey;
} STimeWindow;

typedef struct {
S
Shengliang Guan 已提交
477 478 479 480
  int32_t tsOffset;       // offset value in current msg body, NOTE: ts list is compressed
  int32_t tsLen;          // total length of ts comp block
  int32_t tsNumOfBlocks;  // ts comp block numbers
  int32_t tsOrder;        // ts comp block order
D
dapan1121 已提交
481 482
} STsBufInfo;

S
Shengliang Guan 已提交
483
typedef struct {
S
Shengliang Guan 已提交
484
  int32_t tz;  // query client timezone
D
dapan1121 已提交
485 486
  char    intervalUnit;
  char    slidingUnit;
L
Liu Jicong 已提交
487
  char
dengyihao's avatar
dengyihao 已提交
488
          offsetUnit;  // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
H
Haojun Liao 已提交
489
  int8_t  precision;
D
dapan1121 已提交
490 491 492 493 494 495
  int64_t interval;
  int64_t sliding;
  int64_t offset;
} SInterval;

typedef struct {
S
Shengliang Guan 已提交
496
  int32_t code;
D
dapan1121 已提交
497 498
} SQueryTableRsp;

L
Liu Jicong 已提交
499
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
D
dapan1121 已提交
500

L
Liu Jicong 已提交
501
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
D
dapan1121 已提交
502

D
dapan1121 已提交
503
typedef struct {
504
  char    db[TSDB_DB_FNAME_LEN];
505
  int32_t numOfVgroups;
S
Shengliang Guan 已提交
506 507 508 509 510 511
  int32_t cacheBlockSize;  // MB
  int32_t totalBlocks;
  int32_t daysPerFile;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
S
Shengliang Guan 已提交
512 513
  int32_t minRows;
  int32_t maxRows;
S
Shengliang Guan 已提交
514 515
  int32_t commitTime;
  int32_t fsyncPeriod;
D
dapan1121 已提交
516
  int32_t ttl;
S
Shengliang Guan 已提交
517
  int8_t  walLevel;
S
Shengliang Guan 已提交
518 519 520 521 522 523
  int8_t  precision;  // time resolution
  int8_t  compression;
  int8_t  replications;
  int8_t  quorum;
  int8_t  update;
  int8_t  cacheLastRow;
S
Shengliang Guan 已提交
524
  int8_t  ignoreExist;
X
Xiaoyu Wang 已提交
525
  int8_t  streamMode;
D
dapan1121 已提交
526
  int8_t  singleSTable;
S
Shengliang Guan 已提交
527 528
  int32_t numOfRetensions;
  SArray* pRetensions;  // SRetention
S
Shengliang Guan 已提交
529
} SCreateDbReq;
S
Shengliang Guan 已提交
530

S
Shengliang Guan 已提交
531 532
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
int32_t tDeserializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
S
Shengliang Guan 已提交
533
void    tFreeSCreateDbReq(SCreateDbReq* pReq);
S
Shengliang Guan 已提交
534

S
Shengliang Guan 已提交
535
typedef struct {
536
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
537 538 539 540 541 542 543 544
  int32_t totalBlocks;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
  int32_t fsyncPeriod;
  int8_t  walLevel;
  int8_t  quorum;
  int8_t  cacheLastRow;
X
Xiaoyu Wang 已提交
545
  int8_t  replications;
S
Shengliang Guan 已提交
546
} SAlterDbReq;
S
Shengliang Guan 已提交
547

S
Shengliang Guan 已提交
548 549 550
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);

S
Shengliang Guan 已提交
551
typedef struct {
S
Shengliang Guan 已提交
552
  char   db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
553
  int8_t ignoreNotExists;
S
Shengliang Guan 已提交
554
} SDropDbReq;
S
Shengliang Guan 已提交
555

S
Shengliang Guan 已提交
556 557 558
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);

S
Shengliang Guan 已提交
559
typedef struct {
L
Liu Jicong 已提交
560 561
  char    db[TSDB_DB_FNAME_LEN];
  int64_t uid;
S
Shengliang Guan 已提交
562 563
} SDropDbRsp;

S
Shengliang Guan 已提交
564 565 566
int32_t tSerializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);

S
Shengliang Guan 已提交
567 568
typedef struct {
  char    db[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
569
  int64_t dbId;
570
  int32_t vgVersion;
L
Liu Jicong 已提交
571
  int32_t numOfTable;  // unit is TSDB_TABLE_NUM_UNIT
S
Shengliang Guan 已提交
572
} SUseDbReq;
S
Shengliang Guan 已提交
573

S
Shengliang Guan 已提交
574 575
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
int32_t tDeserializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
S
Shengliang Guan 已提交
576 577

typedef struct {
L
Liu Jicong 已提交
578 579 580 581 582 583
  char    db[TSDB_DB_FNAME_LEN];
  int64_t uid;
  int32_t vgVersion;
  int32_t vgNum;
  int8_t  hashMethod;
  SArray* pVgroupInfos;  // Array of SVgroupInfo
S
Shengliang Guan 已提交
584 585
} SUseDbRsp;

L
Liu Jicong 已提交
586
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
S
Shengliang Guan 已提交
587
int32_t tDeserializeSUseDbRsp(void* buf, int32_t bufLen, SUseDbRsp* pRsp);
L
Liu Jicong 已提交
588 589
int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t tDeserializeSUseDbRspImp(SCoder* pDecoder, SUseDbRsp* pRsp);
S
Shengliang Guan 已提交
590 591
void    tFreeSUsedbRsp(SUseDbRsp* pRsp);

D
dapan1121 已提交
592
typedef struct {
dengyihao's avatar
dengyihao 已提交
593
  char db[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625
} SDbCfgReq;

int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq);

typedef struct {
  int32_t numOfVgroups;
  int32_t cacheBlockSize;
  int32_t totalBlocks;
  int32_t daysPerFile;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
  int32_t minRows;
  int32_t maxRows;
  int32_t commitTime;
  int32_t fsyncPeriod;
  int32_t ttl;
  int8_t  walLevel;
  int8_t  precision;
  int8_t  compression;
  int8_t  replications;
  int8_t  quorum;
  int8_t  update;
  int8_t  cacheLastRow;
  int8_t  streamMode;
  int8_t  singleSTable;
} SDbCfgRsp;

int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp);
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp);

D
dapan1121 已提交
626 627 628 629 630 631 632 633
typedef struct {
  int32_t rowNum;
} SQnodeListReq;

int32_t tSerializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);
int32_t tDeserializeSQnodeListReq(void* buf, int32_t bufLen, SQnodeListReq* pReq);

typedef struct {
L
Liu Jicong 已提交
634
  SArray* epSetList;  // SArray<SEpSet>
D
dapan1121 已提交
635 636 637 638 639 640
} SQnodeListRsp;

int32_t tSerializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
int32_t tDeserializeSQnodeListRsp(void* buf, int32_t bufLen, SQnodeListRsp* pRsp);
void    tFreeSQnodeListRsp(SQnodeListRsp* pRsp);

S
Shengliang Guan 已提交
641 642 643 644 645 646 647
typedef struct {
  SArray* pArray;  // Array of SUseDbRsp
} SUseDbBatchRsp;

int32_t tSerializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
int32_t tDeserializeSUseDbBatchRsp(void* buf, int32_t bufLen, SUseDbBatchRsp* pRsp);
void    tFreeSUseDbBatchRsp(SUseDbBatchRsp* pRsp);
S
Shengliang Guan 已提交
648 649

typedef struct {
S
Shengliang Guan 已提交
650
  char db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
651 652 653 654
} SSyncDbReq, SCompactDbReq;

int32_t tSerializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
int32_t tDeserializeSSyncDbReq(void* buf, int32_t bufLen, SSyncDbReq* pReq);
D
dapan1121 已提交
655 656

typedef struct {
S
Shengliang Guan 已提交
657
  char    name[TSDB_FUNC_NAME_LEN];
S
Shengliang 已提交
658
  int8_t  igExists;
S
Shengliang Guan 已提交
659 660
  int8_t  funcType;
  int8_t  scriptType;
S
Shengliang Guan 已提交
661
  int8_t  outputType;
S
Shengliang Guan 已提交
662
  int32_t outputLen;
S
Shengliang Guan 已提交
663
  int32_t bufSize;
S
Shengliang 已提交
664
  int64_t signature;
S
Shengliang Guan 已提交
665 666
  int32_t commentSize;
  int32_t codeSize;
S
Shengliang Guan 已提交
667 668
  char    pComment[TSDB_FUNC_COMMENT_LEN];
  char    pCode[TSDB_FUNC_CODE_LEN];
S
Shengliang Guan 已提交
669
} SCreateFuncReq;
D
dapan1121 已提交
670

S
Shengliang Guan 已提交
671 672 673
int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);
int32_t tDeserializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq);

D
dapan1121 已提交
674
typedef struct {
S
Shengliang 已提交
675 676
  char   name[TSDB_FUNC_NAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
677
} SDropFuncReq;
S
Shengliang Guan 已提交
678

S
Shengliang Guan 已提交
679 680 681
int32_t tSerializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);
int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq);

S
Shengliang Guan 已提交
682 683
typedef struct {
  int32_t numOfFuncs;
S
Shengliang Guan 已提交
684
  SArray* pFuncNames;
S
Shengliang Guan 已提交
685
} SRetrieveFuncReq;
D
dapan1121 已提交
686

S
Shengliang Guan 已提交
687 688 689
int32_t tSerializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);
int32_t tDeserializeSRetrieveFuncReq(void* buf, int32_t bufLen, SRetrieveFuncReq* pReq);

D
dapan1121 已提交
690 691
typedef struct {
  char    name[TSDB_FUNC_NAME_LEN];
S
Shengliang Guan 已提交
692 693 694 695
  int8_t  funcType;
  int8_t  scriptType;
  int8_t  outputType;
  int32_t outputLen;
D
dapan1121 已提交
696
  int32_t bufSize;
S
Shengliang 已提交
697
  int64_t signature;
S
Shengliang Guan 已提交
698 699
  int32_t commentSize;
  int32_t codeSize;
S
Shengliang Guan 已提交
700 701
  char    pComment[TSDB_FUNC_COMMENT_LEN];
  char    pCode[TSDB_FUNC_CODE_LEN];
S
Shengliang Guan 已提交
702
} SFuncInfo;
D
dapan1121 已提交
703 704

typedef struct {
S
Shengliang Guan 已提交
705
  int32_t numOfFuncs;
S
Shengliang Guan 已提交
706
  SArray* pFuncInfos;
S
Shengliang Guan 已提交
707
} SRetrieveFuncRsp;
D
dapan1121 已提交
708

S
Shengliang Guan 已提交
709 710 711
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);

D
dapan1121 已提交
712
typedef struct {
S
Shengliang Guan 已提交
713
  int32_t statusInterval;
S
Shengliang Guan 已提交
714
  int64_t checkTime;                  // 1970-01-01 00:00:00.000
S
Shengliang Guan 已提交
715 716 717
  char    timezone[TD_TIMEZONE_LEN];  // tsTimezone
  char    locale[TD_LOCALE_LEN];      // tsLocale
  char    charset[TD_LOCALE_LEN];     // tsCharset
S
Shengliang Guan 已提交
718
} SClusterCfg;
D
dapan1121 已提交
719

720 721 722 723 724 725 726 727 728 729 730 731
typedef struct {
  int32_t openVnodes;
  int32_t totalVnodes;
  int32_t masterNum;
  int64_t numOfSelectReqs;
  int64_t numOfInsertReqs;
  int64_t numOfInsertSuccessReqs;
  int64_t numOfBatchInsertReqs;
  int64_t numOfBatchInsertSuccessReqs;
  int64_t errors;
} SVnodesStat;

S
Shengliang Guan 已提交
732 733 734
typedef struct {
  int32_t vgId;
  int8_t  role;
S
Shengliang Guan 已提交
735 736
  int64_t numOfTables;
  int64_t numOfTimeSeries;
S
Shengliang Guan 已提交
737 738 739
  int64_t totalStorage;
  int64_t compStorage;
  int64_t pointsWritten;
S
Shengliang Guan 已提交
740 741 742 743 744
  int64_t numOfSelectReqs;
  int64_t numOfInsertReqs;
  int64_t numOfInsertSuccessReqs;
  int64_t numOfBatchInsertReqs;
  int64_t numOfBatchInsertSuccessReqs;
S
Shengliang Guan 已提交
745 746 747
} SVnodeLoad;

typedef struct {
S
Shengliang Guan 已提交
748 749
  int32_t     sver;  // software version
  int64_t     dver;  // dnode table version in sdb
S
Shengliang Guan 已提交
750
  int32_t     dnodeId;
751
  int64_t     clusterId;
S
Shengliang Guan 已提交
752 753
  int64_t     rebootTime;
  int64_t     updateTime;
S
Shengliang Guan 已提交
754 755
  int32_t     numOfCores;
  int32_t     numOfSupportVnodes;
S
Shengliang Guan 已提交
756 757
  char        dnodeEp[TSDB_EP_LEN];
  SClusterCfg clusterCfg;
S
Shengliang Guan 已提交
758
  SArray*     pVloads;  // array of SVnodeLoad
S
Shengliang Guan 已提交
759
} SStatusReq;
D
dapan1121 已提交
760

S
Shengliang Guan 已提交
761 762
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
S
Shengliang Guan 已提交
763

D
dapan1121 已提交
764
typedef struct {
S
Shengliang Guan 已提交
765
  int32_t dnodeId;
766
  int64_t clusterId;
D
dapan1121 已提交
767 768 769
} SDnodeCfg;

typedef struct {
770 771 772
  int32_t id;
  int8_t  isMnode;
  SEp     ep;
D
dapan1121 已提交
773 774 775
} SDnodeEp;

typedef struct {
776
  int64_t   dver;
S
Shengliang Guan 已提交
777
  SDnodeCfg dnodeCfg;
S
Shengliang Guan 已提交
778
  SArray*   pDnodeEps;  // Array of SDnodeEp
D
dapan1121 已提交
779 780
} SStatusRsp;

S
Shengliang Guan 已提交
781 782
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);
S
shm  
Shengliang Guan 已提交
783
void    tFreeSStatusRsp(SStatusRsp* pRsp);
S
Shengliang Guan 已提交
784 785

typedef struct {
S
Shengliang Guan 已提交
786 787 788 789 790
  int32_t reserved;
} SMTimerReq;

int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq);
S
Shengliang Guan 已提交
791

D
dapan1121 已提交
792
typedef struct {
S
Shengliang Guan 已提交
793 794 795
  int32_t  id;
  uint16_t port;                 // node sync Port
  char     fqdn[TSDB_FQDN_LEN];  // node FQDN
S
Shengliang Guan 已提交
796 797 798
} SReplica;

typedef struct {
S
Shengliang Guan 已提交
799
  int32_t  vgId;
S
Shengliang Guan 已提交
800
  int32_t  dnodeId;
801
  char     db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
802
  int64_t  dbUid;
803
  int32_t  vgVersion;
S
Shengliang Guan 已提交
804 805 806 807 808 809
  int32_t  cacheBlockSize;
  int32_t  totalBlocks;
  int32_t  daysPerFile;
  int32_t  daysToKeep0;
  int32_t  daysToKeep1;
  int32_t  daysToKeep2;
S
Shengliang Guan 已提交
810 811 812
  int32_t  minRows;
  int32_t  maxRows;
  int32_t  commitTime;
S
Shengliang Guan 已提交
813
  int32_t  fsyncPeriod;
D
dapan1121 已提交
814 815
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
816
  int8_t   hashMethod;
S
Shengliang Guan 已提交
817
  int8_t   walLevel;
S
Shengliang Guan 已提交
818 819 820
  int8_t   precision;
  int8_t   compression;
  int8_t   quorum;
S
Shengliang Guan 已提交
821 822
  int8_t   update;
  int8_t   cacheLastRow;
S
Shengliang Guan 已提交
823
  int8_t   replica;
S
Shengliang Guan 已提交
824
  int8_t   selfIndex;
L
Liu Jicong 已提交
825
  int8_t   streamMode;
S
Shengliang Guan 已提交
826
  SReplica replicas[TSDB_MAX_REPLICA];
S
sma  
Shengliang Guan 已提交
827 828
  int32_t  numOfRetensions;
  SArray*  pRetensions;  // SRetention
S
Shengliang Guan 已提交
829
} SCreateVnodeReq, SAlterVnodeReq;
D
dapan1121 已提交
830

S
Shengliang Guan 已提交
831 832
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
S
sma  
Shengliang Guan 已提交
833
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq);
S
Shengliang Guan 已提交
834

S
Shengliang Guan 已提交
835
typedef struct {
L
Liu Jicong 已提交
836 837 838 839
  int32_t vgId;
  int32_t dnodeId;
  int64_t dbUid;
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
840
} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq;
S
Shengliang Guan 已提交
841

S
Shengliang Guan 已提交
842 843
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
S
Shengliang Guan 已提交
844

D
dapan1121 已提交
845
typedef struct {
D
dapan1121 已提交
846
  SMsgHead header;
D
dapan1121 已提交
847 848
  char     dbFName[TSDB_DB_FNAME_LEN];
  char     tbName[TSDB_TABLE_NAME_LEN];
S
Shengliang Guan 已提交
849
} STableInfoReq;
D
dapan1121 已提交
850

S
Shengliang Guan 已提交
851 852 853
int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
int32_t tDeserializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);

D
dapan1121 已提交
854
typedef struct {
S
Shengliang Guan 已提交
855
  int8_t  metaClone;  // create local clone of the cached table meta
D
dapan1121 已提交
856 857 858 859
  int32_t numOfVgroups;
  int32_t numOfTables;
  int32_t numOfUdfs;
  char    tableNames[];
S
Shengliang Guan 已提交
860
} SMultiTableInfoReq;
D
dapan1121 已提交
861

H
Haojun Liao 已提交
862
// todo refactor
D
dapan1121 已提交
863
typedef struct SVgroupInfo {
864 865 866
  int32_t  vgId;
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
867
  SEpSet   epSet;
L
Liu Jicong 已提交
868 869 870 871
  union {
    int32_t numOfTable;  // unit is TSDB_TABLE_NUM_UNIT
    int32_t taskId;      // used in stream
  };
D
dapan1121 已提交
872 873
} SVgroupInfo;

D
dapan1121 已提交
874
typedef struct {
H
Haojun Liao 已提交
875 876
  int32_t     numOfVgroups;
  SVgroupInfo vgroups[];
S
Shengliang Guan 已提交
877
} SVgroupsInfo;
D
dapan1121 已提交
878

S
Shengliang Guan 已提交
879
typedef struct {
D
dapan1121 已提交
880 881 882
  char     tbName[TSDB_TABLE_NAME_LEN];
  char     stbName[TSDB_TABLE_NAME_LEN];
  char     dbFName[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
883
  int64_t  dbId;
S
Shengliang Guan 已提交
884 885 886 887 888 889 890
  int32_t  numOfTags;
  int32_t  numOfColumns;
  int8_t   precision;
  int8_t   tableType;
  int8_t   update;
  int32_t  sversion;
  int32_t  tversion;
L
Liu Jicong 已提交
891 892
  uint64_t suid;
  uint64_t tuid;
S
Shengliang Guan 已提交
893
  int32_t  vgId;
S
Shengliang Guan 已提交
894
  SSchema* pSchemas;
S
Shengliang Guan 已提交
895
} STableMetaRsp;
D
dapan1121 已提交
896

S
Shengliang Guan 已提交
897 898 899 900 901 902 903 904 905 906 907 908
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void    tFreeSTableMetaRsp(STableMetaRsp* pRsp);

typedef struct {
  SArray* pArray;  // Array of STableMetaRsp
} STableMetaBatchRsp;

int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
void    tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);

S
Shengliang Guan 已提交
909
typedef struct {
S
Shengliang Guan 已提交
910 911 912 913 914 915 916 917
  int32_t numOfTables;
  int32_t numOfVgroup;
  int32_t numOfUdf;
  int32_t contLen;
  int8_t  compressed;  // denote if compressed or not
  int32_t rawLen;      // size before compress
  uint8_t metaClone;   // make meta clone after retrieve meta from mnode
  char    meta[];
D
dapan1121 已提交
918 919 920 921 922
} SMultiTableMeta;

typedef struct {
  int32_t dataLen;
  char    name[TSDB_TABLE_FNAME_LEN];
H
Hongze Cheng 已提交
923
  char*   data;
D
dapan1121 已提交
924 925 926 927 928 929 930 931
} STagData;

/*
 * sql: show tables like '%a_%'
 * payload is the query condition, e.g., '%a_%'
 * payloadLen is the length of payload
 */
typedef struct {
L
Liu Jicong 已提交
932
  int32_t type;
933
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
934 935
  int32_t payloadLen;
  char*   payload;
S
Shengliang Guan 已提交
936
} SShowReq;
D
dapan1121 已提交
937

S
Shengliang Guan 已提交
938 939 940 941
int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
void    tFreeSShowReq(SShowReq* pReq);

S
Shengliang Guan 已提交
942
typedef struct {
943
  int64_t       showId;
S
Shengliang Guan 已提交
944
  STableMetaRsp tableMeta;
S
Shengliang Guan 已提交
945
} SShowRsp, SVShowTablesRsp;
D
dapan1121 已提交
946

S
Shengliang Guan 已提交
947 948 949
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void    tFreeSShowRsp(SShowRsp* pRsp);
D
dapan1121 已提交
950

951
typedef struct {
952 953
  int32_t type;
  char    db[TSDB_DB_FNAME_LEN];
H
Haojun Liao 已提交
954
  char    tb[TSDB_TABLE_NAME_LEN];
955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971
  int64_t showId;
  int8_t  free;
} SRetrieveTableReq;

int32_t tSerializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);
int32_t tDeserializeSRetrieveTableReq(void* buf, int32_t bufLen, SRetrieveTableReq* pReq);

typedef struct {
  int64_t useconds;
  int8_t  completed;  // all results are returned to client
  int8_t  precision;
  int8_t  compressed;
  int32_t compLen;
  int32_t numOfRows;
  char    data[];
} SRetrieveTableRsp;

H
Haojun Liao 已提交
972
typedef struct {
L
Liu Jicong 已提交
973 974 975 976 977 978 979 980
  int64_t handle;
  int64_t useconds;
  int8_t  completed;  // all results are returned to client
  int8_t  precision;
  int8_t  compressed;
  int32_t compLen;
  int32_t numOfRows;
  char    data[];
H
Haojun Liao 已提交
981 982
} SRetrieveMetaTableRsp;

D
dapan1121 已提交
983 984 985 986
typedef struct SExplainExecInfo {
  uint64_t startupCost;
  uint64_t totalCost;
  uint64_t numOfRows;
dengyihao's avatar
dengyihao 已提交
987
  void*    verboseInfo;
D
dapan1121 已提交
988 989
} SExplainExecInfo;

D
dapan1121 已提交
990
typedef struct {
D
dapan1121 已提交
991
  int32_t           numOfPlans;
dengyihao's avatar
dengyihao 已提交
992
  SExplainExecInfo* subplanInfo;
D
dapan1121 已提交
993 994
} SExplainRsp;

D
dapan1121 已提交
995 996
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
D
dapan1121 已提交
997

D
dapan1121 已提交
998
typedef struct {
S
Shengliang Guan 已提交
999
  char    fqdn[TSDB_FQDN_LEN];  // end point, hostname:port
1000
  int32_t port;
S
Shengliang Guan 已提交
1001
} SCreateDnodeReq;
S
Shengliang Guan 已提交
1002

S
Shengliang Guan 已提交
1003 1004 1005
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
int32_t tDeserializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);

S
Shengliang Guan 已提交
1006 1007
typedef struct {
  int32_t dnodeId;
1008 1009
  char    fqdn[TSDB_FQDN_LEN];
  int32_t port;
S
Shengliang Guan 已提交
1010
} SDropDnodeReq;
S
Shengliang Guan 已提交
1011

S
Shengliang Guan 已提交
1012 1013 1014
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);

S
Shengliang Guan 已提交
1015 1016
typedef struct {
  int32_t dnodeId;
1017
  char    config[TSDB_DNODE_CONFIG_LEN];
S
Shengliang Guan 已提交
1018
  char    value[TSDB_DNODE_VALUE_LEN];
S
Shengliang Guan 已提交
1019
} SMCfgDnodeReq, SDCfgDnodeReq;
D
dapan1121 已提交
1020

S
Shengliang Guan 已提交
1021 1022 1023
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);

S
Shengliang Guan 已提交
1024 1025
typedef struct {
  int32_t dnodeId;
dengyihao's avatar
dengyihao 已提交
1026 1027 1028
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq, SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq,
    SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq, SMCreateBnodeReq, SMDropBnodeReq,
    SDCreateBnodeReq, SDDropBnodeReq;
S
Shengliang Guan 已提交
1029

1030 1031
int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
int32_t tDeserializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
S
Shengliang Guan 已提交
1032

D
dapan1121 已提交
1033
typedef struct {
S
Shengliang Guan 已提交
1034 1035
  int32_t  dnodeId;
  int8_t   replica;
S
Shengliang Guan 已提交
1036
  SReplica replicas[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
1037
} SDCreateMnodeReq, SDAlterMnodeReq;
D
dapan1121 已提交
1038

S
Shengliang Guan 已提交
1039 1040 1041
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);

D
dapan1121 已提交
1042
typedef struct {
S
Shengliang Guan 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
  char    sql[TSDB_SHOW_SQL_LEN];
  int32_t queryId;
  int64_t useconds;
  int64_t stime;
  int64_t qId;
  int64_t sqlObjId;
  int32_t pid;
  char    fqdn[TSDB_FQDN_LEN];
  int8_t  stableQuery;
  int32_t numOfSub;
  char    subSqlInfo[TSDB_SHOW_SUBQUERY_LEN];  // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
D
dapan1121 已提交
1054 1055 1056
} SQueryDesc;

typedef struct {
S
Shengliang Guan 已提交
1057 1058 1059 1060 1061 1062
  int32_t connId;
  int32_t pid;
  int32_t numOfQueries;
  int32_t numOfStreams;
  char    app[TSDB_APP_NAME_LEN];
  char    pData[];
S
Shengliang Guan 已提交
1063
} SHeartBeatReq;
D
dapan1121 已提交
1064 1065

typedef struct {
S
Shengliang Guan 已提交
1066 1067 1068 1069 1070 1071
  int32_t connId;
  int32_t queryId;
  int32_t streamId;
  int32_t totalDnodes;
  int32_t onlineDnodes;
  int8_t  killConnection;
S
Shengliang Guan 已提交
1072
  int8_t  align[3];
S
Shengliang Guan 已提交
1073
  SEpSet  epSet;
D
dapan1121 已提交
1074 1075
} SHeartBeatRsp;

S
Shengliang Guan 已提交
1076 1077 1078
typedef struct {
  int32_t connId;
  int32_t queryId;
S
Shengliang Guan 已提交
1079
} SKillQueryReq;
S
Shengliang Guan 已提交
1080

S
Shengliang Guan 已提交
1081 1082 1083
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
int32_t tDeserializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);

S
Shengliang Guan 已提交
1084 1085
typedef struct {
  int32_t connId;
S
Shengliang Guan 已提交
1086
} SKillConnReq;
D
dapan1121 已提交
1087

S
Shengliang Guan 已提交
1088 1089 1090
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);

S
Shengliang Guan 已提交
1091 1092 1093 1094 1095 1096 1097
typedef struct {
  int32_t transId;
} SKillTransReq;

int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);

D
dapan1121 已提交
1098 1099 1100 1101
typedef struct {
  char user[TSDB_USER_LEN];
  char spi;
  char encrypt;
1102 1103
  char secret[TSDB_PASSWORD_LEN];
  char ckey[TSDB_PASSWORD_LEN];
S
Shengliang Guan 已提交
1104
} SAuthReq, SAuthRsp;
D
dapan1121 已提交
1105

S
Shengliang Guan 已提交
1106 1107 1108
int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);

D
dapan1121 已提交
1109
typedef struct {
S
Shengliang Guan 已提交
1110 1111 1112
  int8_t finished;
  char   name[TSDB_STEP_NAME_LEN];
  char   desc[TSDB_STEP_DESC_LEN];
S
Shengliang Guan 已提交
1113
} SStartupReq;
D
dapan1121 已提交
1114

1115 1116 1117 1118 1119 1120 1121
/**
 * The layout of the query message payload is as following:
 * +--------------------+---------------------------------+
 * |Sql statement       | Physical plan                   |
 * |(denoted by sqlLen) |(In JSON, denoted by contentLen) |
 * +--------------------+---------------------------------+
 */
L
Liu Jicong 已提交
1122
typedef struct SSubQueryMsg {
D
dapan1121 已提交
1123
  SMsgHead header;
D
dapan1121 已提交
1124
  uint64_t sId;
H
Hongze Cheng 已提交
1125 1126
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1127
  int64_t  refId;
D
dapan1121 已提交
1128
  int8_t   taskType;
D
dapan1121 已提交
1129
  int8_t   explain;
1130
  uint32_t sqlLen;  // the query sql,
1131
  uint32_t phyLen;
H
Hongze Cheng 已提交
1132
  char     msg[];
D
dapan1121 已提交
1133
} SSubQueryMsg;
D
dapan 已提交
1134

1135 1136 1137 1138 1139 1140 1141
typedef struct {
  SMsgHead header;
  uint64_t sId;
  uint64_t queryId;
  uint64_t taskId;
} SSinkDataReq;

D
dapan1121 已提交
1142 1143 1144 1145 1146 1147 1148
typedef struct {
  SMsgHead header;
  uint64_t sId;
  uint64_t queryId;
  uint64_t taskId;
} SQueryContinueReq;

S
Shengliang Guan 已提交
1149
typedef struct {
D
dapan1121 已提交
1150
  SMsgHead header;
D
dapan1121 已提交
1151
  uint64_t sId;
H
Hongze Cheng 已提交
1152 1153
  uint64_t queryId;
  uint64_t taskId;
S
Shengliang Guan 已提交
1154
} SResReadyReq;
D
dapan 已提交
1155

S
Shengliang Guan 已提交
1156
typedef struct {
D
dapan1121 已提交
1157 1158 1159
  int32_t code;
} SResReadyRsp;

S
Shengliang Guan 已提交
1160
typedef struct {
D
dapan1121 已提交
1161
  SMsgHead header;
D
dapan1121 已提交
1162
  uint64_t sId;
H
Hongze Cheng 已提交
1163 1164
  uint64_t queryId;
  uint64_t taskId;
S
Shengliang Guan 已提交
1165
} SResFetchReq;
D
dapan 已提交
1166

S
Shengliang Guan 已提交
1167
typedef struct {
D
dapan1121 已提交
1168
  SMsgHead header;
D
dapan1121 已提交
1169
  uint64_t sId;
S
Shengliang Guan 已提交
1170
} SSchTasksStatusReq;
D
dapan1121 已提交
1171

S
Shengliang Guan 已提交
1172
typedef struct {
H
Hongze Cheng 已提交
1173 1174
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1175
  int64_t  refId;
H
Hongze Cheng 已提交
1176
  int8_t   status;
D
dapan1121 已提交
1177 1178
} STaskStatus;

S
Shengliang Guan 已提交
1179
typedef struct {
L
Liu Jicong 已提交
1180 1181
  int64_t refId;
  SArray* taskStatus;  // SArray<STaskStatus>
D
dapan1121 已提交
1182 1183
} SSchedulerStatusRsp;

D
dapan1121 已提交
1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198
typedef struct {
  uint64_t queryId;
  uint64_t taskId;
  int8_t   action;
} STaskAction;

typedef struct SQueryNodeEpId {
  int32_t nodeId;  // vgId or qnodeId
  SEp     ep;
} SQueryNodeEpId;

typedef struct {
  SMsgHead       header;
  uint64_t       sId;
  SQueryNodeEpId epId;
L
Liu Jicong 已提交
1199
  SArray*        taskAction;  // SArray<STaskAction>
D
dapan1121 已提交
1200 1201
} SSchedulerHbReq;

L
Liu Jicong 已提交
1202 1203 1204
int32_t tSerializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
int32_t tDeserializeSSchedulerHbReq(void* buf, int32_t bufLen, SSchedulerHbReq* pReq);
void    tFreeSSchedulerHbReq(SSchedulerHbReq* pReq);
D
dapan1121 已提交
1205 1206 1207

typedef struct {
  SQueryNodeEpId epId;
L
Liu Jicong 已提交
1208
  SArray*        taskStatus;  // SArray<STaskStatus>
D
dapan1121 已提交
1209 1210
} SSchedulerHbRsp;

L
Liu Jicong 已提交
1211 1212 1213
int32_t tSerializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
int32_t tDeserializeSSchedulerHbRsp(void* buf, int32_t bufLen, SSchedulerHbRsp* pRsp);
void    tFreeSSchedulerHbRsp(SSchedulerHbRsp* pRsp);
D
dapan1121 已提交
1214

S
Shengliang Guan 已提交
1215
typedef struct {
D
dapan1121 已提交
1216
  SMsgHead header;
D
dapan1121 已提交
1217
  uint64_t sId;
H
Hongze Cheng 已提交
1218 1219
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1220
  int64_t  refId;
S
Shengliang Guan 已提交
1221
} STaskCancelReq;
D
dapan1121 已提交
1222

S
Shengliang Guan 已提交
1223
typedef struct {
D
dapan1121 已提交
1224 1225 1226
  int32_t code;
} STaskCancelRsp;

S
Shengliang Guan 已提交
1227
typedef struct {
D
dapan1121 已提交
1228
  SMsgHead header;
D
dapan1121 已提交
1229
  uint64_t sId;
H
Hongze Cheng 已提交
1230 1231
  uint64_t queryId;
  uint64_t taskId;
D
dapan1121 已提交
1232
  int64_t  refId;
S
Shengliang Guan 已提交
1233
} STaskDropReq;
D
dapan1121 已提交
1234

S
Shengliang Guan 已提交
1235
typedef struct {
D
dapan1121 已提交
1236 1237 1238
  int32_t code;
} STaskDropRsp;

L
Liu Jicong 已提交
1239
typedef struct {
L
Liu Jicong 已提交
1240
  char   name[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1241
  char   outputSTbName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
1242 1243
  int8_t igExists;
  char*  sql;
L
Liu Jicong 已提交
1244
  char*  ast;
L
Liu Jicong 已提交
1245
} SCMCreateStreamReq;
L
Liu Jicong 已提交
1246

L
Liu Jicong 已提交
1247 1248 1249 1250 1251 1252 1253 1254
typedef struct {
  int64_t streamId;
} SCMCreateStreamRsp;

int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateStreamReq* pReq);
int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq);
void    tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq);

L
Liu Jicong 已提交
1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
typedef struct {
  char    name[TSDB_TOPIC_FNAME_LEN];
  int64_t streamId;
  char*   sql;
  char*   executorMsg;
} SMVCreateStreamReq, SMSCreateStreamReq;

typedef struct {
  int64_t streamId;
} SMVCreateStreamRsp, SMSCreateStreamRsp;

L
Liu Jicong 已提交
1266 1267 1268 1269
typedef struct {
  char   name[TSDB_TOPIC_FNAME_LEN];
  int8_t igExists;
  char*  sql;
1270 1271
  char*  ast;
  char   subscribeDbName[TSDB_DB_NAME_LEN];
L
Liu Jicong 已提交
1272 1273 1274 1275 1276
} SCMCreateTopicReq;

int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
int32_t tDeserializeSCMCreateTopicReq(void* buf, int32_t bufLen, SCMCreateTopicReq* pReq);
void    tFreeSCMCreateTopicReq(SCMCreateTopicReq* pReq);
L
Liu Jicong 已提交
1277 1278 1279

typedef struct {
  int64_t topicId;
L
Liu Jicong 已提交
1280
} SCMCreateTopicRsp;
L
Liu Jicong 已提交
1281

L
Liu Jicong 已提交
1282 1283
int32_t tSerializeSCMCreateTopicRsp(void* buf, int32_t bufLen, const SCMCreateTopicRsp* pRsp);
int32_t tDeserializeSCMCreateTopicRsp(void* buf, int32_t bufLen, SCMCreateTopicRsp* pRsp);
L
Liu Jicong 已提交
1284 1285

typedef struct {
L
Liu Jicong 已提交
1286
  int32_t topicNum;
L
Liu Jicong 已提交
1287
  int64_t consumerId;
L
Liu Jicong 已提交
1288
  char*   consumerGroup;
L
Liu Jicong 已提交
1289
  SArray* topicNames;  // SArray<char*>
L
Liu Jicong 已提交
1290 1291
} SCMSubscribeReq;

S
Shengliang Guan 已提交
1292 1293
static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1294
  tlen += taosEncodeFixedI32(buf, pReq->topicNum);
L
Liu Jicong 已提交
1295
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
L
Liu Jicong 已提交
1296
  tlen += taosEncodeString(buf, pReq->consumerGroup);
L
Liu Jicong 已提交
1297

S
Shengliang Guan 已提交
1298
  for (int32_t i = 0; i < pReq->topicNum; i++) {
L
Liu Jicong 已提交
1299
    tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i));
L
Liu Jicong 已提交
1300
  }
L
Liu Jicong 已提交
1301 1302 1303 1304
  return tlen;
}

static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq* pReq) {
L
Liu Jicong 已提交
1305
  buf = taosDecodeFixedI32(buf, &pReq->topicNum);
L
Liu Jicong 已提交
1306
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
L
Liu Jicong 已提交
1307
  buf = taosDecodeString(buf, &pReq->consumerGroup);
L
Liu Jicong 已提交
1308
  pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
S
Shengliang Guan 已提交
1309
  for (int32_t i = 0; i < pReq->topicNum; i++) {
L
Liu Jicong 已提交
1310
    char* name;
L
Liu Jicong 已提交
1311 1312
    buf = taosDecodeString(buf, &name);
    taosArrayPush(pReq->topicNames, &name);
L
Liu Jicong 已提交
1313
  }
L
Liu Jicong 已提交
1314 1315 1316
  return buf;
}

L
Liu Jicong 已提交
1317
typedef struct SMqSubTopic {
L
Liu Jicong 已提交
1318
  int32_t vgId;
L
Liu Jicong 已提交
1319 1320 1321 1322 1323
  int64_t topicId;
  SEpSet  epSet;
} SMqSubTopic;

typedef struct {
L
Liu Jicong 已提交
1324
  int32_t     topicNum;
L
Liu Jicong 已提交
1325
  SMqSubTopic topics[];
L
Liu Jicong 已提交
1326 1327
} SCMSubscribeRsp;

S
Shengliang Guan 已提交
1328 1329
static FORCE_INLINE int32_t tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1330
  tlen += taosEncodeFixedI32(buf, pRsp->topicNum);
S
Shengliang Guan 已提交
1331
  for (int32_t i = 0; i < pRsp->topicNum; i++) {
L
Liu Jicong 已提交
1332 1333 1334 1335
    tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId);
    tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId);
    tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet);
  }
L
Liu Jicong 已提交
1336 1337 1338 1339
  return tlen;
}

static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) {
L
Liu Jicong 已提交
1340
  buf = taosDecodeFixedI32(buf, &pRsp->topicNum);
S
Shengliang Guan 已提交
1341
  for (int32_t i = 0; i < pRsp->topicNum; i++) {
L
Liu Jicong 已提交
1342 1343 1344 1345
    buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId);
    buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId);
    buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet);
  }
L
Liu Jicong 已提交
1346 1347 1348 1349 1350 1351 1352 1353
  return buf;
}

typedef struct {
  int64_t topicId;
  int64_t consumerId;
  int64_t consumerGroupId;
  int64_t offset;
L
Liu Jicong 已提交
1354 1355 1356
  char*   sql;
  char*   logicalPlan;
  char*   physicalPlan;
L
Liu Jicong 已提交
1357 1358
} SMVSubscribeReq;

S
Shengliang Guan 已提交
1359 1360
static FORCE_INLINE int32_t tSerializeSMVSubscribeReq(void** buf, SMVSubscribeReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381
  tlen += taosEncodeFixedI64(buf, pReq->topicId);
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
  tlen += taosEncodeFixedI64(buf, pReq->consumerGroupId);
  tlen += taosEncodeFixedI64(buf, pReq->offset);
  tlen += taosEncodeString(buf, pReq->sql);
  tlen += taosEncodeString(buf, pReq->logicalPlan);
  tlen += taosEncodeString(buf, pReq->physicalPlan);
  return tlen;
}

static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->topicId);
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
  buf = taosDecodeFixedI64(buf, &pReq->consumerGroupId);
  buf = taosDecodeFixedI64(buf, &pReq->offset);
  buf = taosDecodeString(buf, &pReq->sql);
  buf = taosDecodeString(buf, &pReq->logicalPlan);
  buf = taosDecodeString(buf, &pReq->physicalPlan);
  return buf;
}

L
Liu Jicong 已提交
1382
typedef struct {
1383 1384 1385 1386
  const char* key;
  SArray*     lostConsumers;     // SArray<int64_t>
  SArray*     removedConsumers;  // SArray<int64_t>
  SArray*     newConsumers;      // SArray<int64_t>
L
Liu Jicong 已提交
1387 1388 1389
} SMqRebSubscribe;

static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
wafwerar's avatar
wafwerar 已提交
1390
  SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe));
L
Liu Jicong 已提交
1391 1392 1393
  if (pRebSub == NULL) {
    goto _err;
  }
L
Liu Jicong 已提交
1394
  pRebSub->key = strdup(key);
L
Liu Jicong 已提交
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411
  pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->lostConsumers == NULL) {
    goto _err;
  }
  pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->removedConsumers == NULL) {
    goto _err;
  }
  pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
  if (pRebSub->newConsumers == NULL) {
    goto _err;
  }
  return pRebSub;
_err:
  taosArrayDestroy(pRebSub->lostConsumers);
  taosArrayDestroy(pRebSub->removedConsumers);
  taosArrayDestroy(pRebSub->newConsumers);
wafwerar's avatar
wafwerar 已提交
1412
  taosMemoryFreeClear(pRebSub);
L
Liu Jicong 已提交
1413 1414 1415
  return NULL;
}

L
Liu Jicong 已提交
1416
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
1417
// deserialization
L
Liu Jicong 已提交
1418
typedef struct {
1419
  SHashObj* rebSubHash;  // SHashObj<key, SMqRebSubscribe>
L
Liu Jicong 已提交
1420 1421
} SMqDoRebalanceMsg;

L
Liu Jicong 已提交
1422
typedef struct {
L
Liu Jicong 已提交
1423
  int64_t status;
L
Liu Jicong 已提交
1424 1425
} SMVSubscribeRsp;

L
Liu Jicong 已提交
1426 1427 1428
typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
S
Shengliang Guan 已提交
1429 1430
} SMDropTopicReq;

S
Shengliang Guan 已提交
1431
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
S
Shengliang Guan 已提交
1432
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
L
Liu Jicong 已提交
1433 1434 1435 1436 1437

typedef struct {
  char    name[TSDB_TABLE_FNAME_LEN];
  int8_t  alterType;
  SSchema schema;
S
Shengliang Guan 已提交
1438
} SAlterTopicReq;
L
Liu Jicong 已提交
1439 1440 1441 1442

typedef struct {
  SMsgHead head;
  char     name[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
1443
  int64_t  tuid;
L
Liu Jicong 已提交
1444 1445 1446 1447 1448
  int32_t  sverson;
  int32_t  execLen;
  char*    executor;
  int32_t  sqlLen;
  char*    sql;
S
Shengliang Guan 已提交
1449
} SDCreateTopicReq;
L
Liu Jicong 已提交
1450 1451 1452 1453

typedef struct {
  SMsgHead head;
  char     name[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
1454
  int64_t  tuid;
S
Shengliang Guan 已提交
1455
} SDDropTopicReq;
S
Shengliang Guan 已提交
1456

C
Cary Xu 已提交
1457
typedef struct {
C
Cary Xu 已提交
1458 1459 1460 1461
  float      xFilesFactor;
  int32_t    delay;
  int8_t     nFuncIds;
  func_id_t* pFuncIds;
C
Cary Xu 已提交
1462 1463
} SRSmaParam;

H
more  
Hongze Cheng 已提交
1464
typedef struct SVCreateTbReq {
L
Liu Jicong 已提交
1465
  int64_t  ver;  // use a general definition
D
dapan1121 已提交
1466
  char*    dbFName;
H
more  
Hongze Cheng 已提交
1467 1468 1469 1470
  char*    name;
  uint32_t ttl;
  uint32_t keep;
  union {
C
Cary Xu 已提交
1471
    uint8_t info;
H
more  
Hongze Cheng 已提交
1472
    struct {
C
Cary Xu 已提交
1473 1474 1475 1476 1477 1478
      uint8_t rollup : 1;  // 1 means rollup sma
      uint8_t type : 7;
    };
  };
  union {
    struct {
dengyihao's avatar
dengyihao 已提交
1479 1480 1481 1482 1483 1484
      tb_uid_t    suid;
      col_id_t    nCols;
      col_id_t    nBSmaCols;
      SSchemaEx*  pSchema;
      col_id_t    nTagCols;
      SSchema*    pTagSchema;
C
Cary Xu 已提交
1485
      SRSmaParam* pRSmaParam;
H
more  
Hongze Cheng 已提交
1486 1487 1488 1489 1490 1491
    } stbCfg;
    struct {
      tb_uid_t suid;
      SKVRow   pTag;
    } ctbCfg;
    struct {
dengyihao's avatar
dengyihao 已提交
1492 1493 1494
      col_id_t    nCols;
      col_id_t    nBSmaCols;
      SSchemaEx*  pSchema;
C
Cary Xu 已提交
1495
      SRSmaParam* pRSmaParam;
H
more  
Hongze Cheng 已提交
1496 1497
    } ntbCfg;
  };
S
Shengliang Guan 已提交
1498
} SVCreateTbReq, SVUpdateTbReq;
H
more  
Hongze Cheng 已提交
1499

H
more  
Hongze Cheng 已提交
1500
typedef struct {
D
dapan1121 已提交
1501
  int32_t code;
S
Shengliang Guan 已提交
1502
} SVCreateTbRsp, SVUpdateTbRsp;
H
more  
Hongze Cheng 已提交
1503

S
Shengliang Guan 已提交
1504 1505
int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void*   tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
H
more  
Hongze Cheng 已提交
1506

H
more  
Hongze Cheng 已提交
1507
typedef struct {
L
Liu Jicong 已提交
1508 1509
  int64_t ver;  // use a general definition
  SArray* pArray;
H
more  
Hongze Cheng 已提交
1510
} SVCreateTbBatchReq;
H
more  
Hongze Cheng 已提交
1511

D
dapan 已提交
1512 1513
int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq);
void*   tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq);
D
dapan1121 已提交
1514

S
Shengliang Guan 已提交
1515
typedef struct {
L
Liu Jicong 已提交
1516
  SArray* rspList;  // SArray<SVCreateTbRsp>
S
Shengliang Guan 已提交
1517
} SVCreateTbBatchRsp;
H
more  
Hongze Cheng 已提交
1518

L
Liu Jicong 已提交
1519 1520
int32_t tSerializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatchRsp* pRsp);
S
Shengliang Guan 已提交
1521 1522

typedef struct {
L
Liu Jicong 已提交
1523
  int64_t  ver;
S
Shengliang Guan 已提交
1524 1525 1526
  char*    name;
  uint8_t  type;
  tb_uid_t suid;
S
Shengliang Guan 已提交
1527 1528 1529
} SVDropTbReq;

typedef struct {
L
Liu Jicong 已提交
1530
  int tmp;  // TODO: to avoid compile error
S
Shengliang Guan 已提交
1531 1532
} SVDropTbRsp;

S
Shengliang Guan 已提交
1533 1534 1535
int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq);
void*   tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq);

S
Shengliang Guan 已提交
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
typedef struct {
  SMsgHead head;
  int64_t  uid;
  int32_t  tid;
  int16_t  tversion;
  int16_t  colId;
  int8_t   type;
  int16_t  bytes;
  int32_t  tagValLen;
  int16_t  numOfTags;
  int32_t  schemaLen;
  char     data[];
} SUpdateTagValReq;

typedef struct {
  SMsgHead head;
} SUpdateTagValRsp;

typedef struct {
H
Hongze Cheng 已提交
1555 1556 1557
  SMsgHead head;
} SVShowTablesReq;

S
Shengliang Guan 已提交
1558
typedef struct {
H
Hongze Cheng 已提交
1559
  SMsgHead head;
H
Haojun Liao 已提交
1560
  int32_t  id;
H
Hongze Cheng 已提交
1561 1562
} SVShowTablesFetchReq;

S
Shengliang Guan 已提交
1563
typedef struct {
H
Hongze Cheng 已提交
1564 1565 1566 1567 1568 1569 1570 1571 1572
  int64_t useconds;
  int8_t  completed;  // all results are returned to client
  int8_t  precision;
  int8_t  compressed;
  int32_t compLen;
  int32_t numOfRows;
  char    data[];
} SVShowTablesFetchRsp;

L
Liu Jicong 已提交
1573 1574 1575
typedef struct SMqCMGetSubEpReq {
  int64_t consumerId;
  int32_t epoch;
L
Liu Jicong 已提交
1576
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1577 1578
} SMqCMGetSubEpReq;

L
Liu Jicong 已提交
1579 1580 1581 1582 1583 1584 1585 1586
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pMsg->contLen);
  tlen += taosEncodeFixedI32(buf, pMsg->vgId);
  return tlen;
}

typedef struct SMqHbRsp {
1587
  int8_t status;  // idle or not
L
Liu Jicong 已提交
1588
  int8_t vnodeChanged;
1589
  int8_t epChanged;  // should use new epset
L
Liu Jicong 已提交
1590 1591 1592 1593
  int8_t reserved;
  SEpSet epSet;
} SMqHbRsp;

S
Shengliang Guan 已提交
1594 1595
static FORCE_INLINE int32_t taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611
  tlen += taosEncodeFixedI8(buf, pRsp->status);
  tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
  tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
  tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
  buf = taosDecodeFixedI8(buf, &pRsp->status);
  buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
  buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
  buf = taosDecodeSEpSet(buf, &pRsp->epSet);
  return buf;
}

typedef struct SMqHbOneTopicBatchRsp {
1612
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1613 1614 1615
  SArray* rsps;  // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;

S
Shengliang Guan 已提交
1616 1617
static FORCE_INLINE int32_t taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641
  tlen += taosEncodeString(buf, pBatchRsp->topicName);
  int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
    tlen += taosEncodeSMqHbRsp(buf, pRsp);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
  int32_t sz;
  buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
  buf = taosDecodeFixedI32(buf, &sz);
  pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
  for (int32_t i = 0; i < sz; i++) {
    SMqHbRsp rsp;
    buf = taosDecodeSMqHbRsp(buf, &rsp);
    buf = taosArrayPush(pBatchRsp->rsps, &rsp);
  }
  return buf;
}

typedef struct SMqHbBatchRsp {
1642 1643
  int64_t consumerId;
  SArray* batchRsps;  // SArray<SMqHbOneTopicBatchRsp>
L
Liu Jicong 已提交
1644 1645
} SMqHbBatchRsp;

S
Shengliang Guan 已提交
1646 1647
static FORCE_INLINE int32_t taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1648 1649 1650 1651
  tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
  int32_t sz;
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
1652
    SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*)taosArrayGet(pBatchRsp->batchRsps, i);
L
Liu Jicong 已提交
1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663
    tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
  buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
  for (int32_t i = 0; i < sz; i++) {
1664
    SMqHbOneTopicBatchRsp rsp;
L
Liu Jicong 已提交
1665 1666 1667 1668 1669 1670 1671
    buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
    buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
  }
  return buf;
}

typedef struct {
D
dapan1121 已提交
1672
  int32_t key;
L
Liu Jicong 已提交
1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694
  int32_t valueLen;
  void*   value;
} SKv;

typedef struct {
  int32_t connId;
  int32_t hbType;
} SClientHbKey;

typedef struct {
  SClientHbKey connKey;
  SHashObj*    info;  // hash<Skv.key, Skv>
} SClientHbReq;

typedef struct {
  int64_t reqId;
  SArray* reqs;  // SArray<SClientHbReq>
} SClientHbBatchReq;

typedef struct {
  SClientHbKey connKey;
  int32_t      status;
D
dapan1121 已提交
1695
  SArray*      info;  // Array<Skv>
L
Liu Jicong 已提交
1696 1697 1698 1699 1700 1701 1702 1703
} SClientHbRsp;

typedef struct {
  int64_t reqId;
  int64_t rspId;
  SArray* rsps;  // SArray<SClientHbRsp>
} SClientHbBatchRsp;

1704
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
L
Liu Jicong 已提交
1705

1706 1707
static FORCE_INLINE void tFreeReqKvHash(SHashObj* info) {
  void* pIter = taosHashIterate(info, NULL);
D
dapan1121 已提交
1708 1709
  while (pIter != NULL) {
    SKv* kv = (SKv*)pIter;
wafwerar's avatar
wafwerar 已提交
1710
    taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
1711 1712 1713 1714
    pIter = taosHashIterate(info, pIter);
  }
}

1715
static FORCE_INLINE void tFreeClientHbReq(void* pReq) {
L
Liu Jicong 已提交
1716
  SClientHbReq* req = (SClientHbReq*)pReq;
D
dapan1121 已提交
1717 1718 1719 1720
  if (req->info) {
    tFreeReqKvHash(req->info);
    taosHashCleanup(req->info);
  }
L
Liu Jicong 已提交
1721 1722
}

S
Shengliang Guan 已提交
1723 1724
int32_t tSerializeSClientHbBatchReq(void* buf, int32_t bufLen, const SClientHbBatchReq* pReq);
int32_t tDeserializeSClientHbBatchReq(void* buf, int32_t bufLen, SClientHbBatchReq* pReq);
L
Liu Jicong 已提交
1725 1726

static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
1727
  SClientHbBatchReq* req = (SClientHbBatchReq*)pReq;
L
Liu Jicong 已提交
1728 1729 1730 1731 1732
  if (deep) {
    taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
  } else {
    taosArrayDestroy(req->reqs);
  }
wafwerar's avatar
wafwerar 已提交
1733
  taosMemoryFree(pReq);
L
Liu Jicong 已提交
1734 1735
}

1736 1737
static FORCE_INLINE void tFreeClientKv(void* pKv) {
  SKv* kv = (SKv*)pKv;
D
dapan1121 已提交
1738
  if (kv) {
wafwerar's avatar
wafwerar 已提交
1739
    taosMemoryFreeClear(kv->value);
D
dapan1121 已提交
1740 1741 1742
  }
}

1743
static FORCE_INLINE void tFreeClientHbRsp(void* pRsp) {
D
dapan1121 已提交
1744 1745 1746 1747 1748
  SClientHbRsp* rsp = (SClientHbRsp*)pRsp;
  if (rsp->info) taosArrayDestroyEx(rsp->info, tFreeClientKv);
}

static FORCE_INLINE void tFreeClientHbBatchRsp(void* pRsp) {
1749
  SClientHbBatchRsp* rsp = (SClientHbBatchRsp*)pRsp;
D
dapan1121 已提交
1750 1751 1752
  taosArrayDestroyEx(rsp->rsps, tFreeClientHbRsp);
}

S
Shengliang Guan 已提交
1753 1754
int32_t tSerializeSClientHbBatchRsp(void* buf, int32_t bufLen, const SClientHbBatchRsp* pBatchRsp);
int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchRsp* pBatchRsp);
L
Liu Jicong 已提交
1755

S
Shengliang Guan 已提交
1756 1757 1758
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
  if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
  if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
S
Shengliang Guan 已提交
1759
  if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
S
Shengliang Guan 已提交
1760
  return 0;
L
Liu Jicong 已提交
1761 1762
}

S
Shengliang Guan 已提交
1763 1764 1765
static FORCE_INLINE int32_t tDecodeSKv(SCoder* pDecoder, SKv* pKv) {
  if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
  if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
wafwerar's avatar
wafwerar 已提交
1766
  pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
S
Shengliang Guan 已提交
1767 1768 1769
  if (pKv->value == NULL) return -1;
  if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1770 1771
}

S
Shengliang Guan 已提交
1772 1773 1774 1775
static FORCE_INLINE int32_t tEncodeSClientHbKey(SCoder* pEncoder, const SClientHbKey* pKey) {
  if (tEncodeI32(pEncoder, pKey->connId) < 0) return -1;
  if (tEncodeI32(pEncoder, pKey->hbType) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1776 1777
}

S
Shengliang Guan 已提交
1778 1779 1780 1781
static FORCE_INLINE int32_t tDecodeSClientHbKey(SCoder* pDecoder, SClientHbKey* pKey) {
  if (tDecodeI32(pDecoder, &pKey->connId) < 0) return -1;
  if (tDecodeI32(pDecoder, &pKey->hbType) < 0) return -1;
  return 0;
L
Liu Jicong 已提交
1782 1783 1784 1785 1786 1787
}

typedef struct SMqHbVgInfo {
  int32_t vgId;
} SMqHbVgInfo;

S
Shengliang Guan 已提交
1788 1789
static FORCE_INLINE int32_t taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805
  tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
  buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
  return buf;
}

typedef struct SMqHbTopicInfo {
  int32_t epoch;
  int64_t topicUid;
  char    name[TSDB_TOPIC_FNAME_LEN];
  SArray* pVgInfo;
} SMqHbTopicInfo;

S
Shengliang Guan 已提交
1806 1807
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835
  tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
  tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
  tlen += taosEncodeString(buf, pTopicInfo->name);
  int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
    tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
  buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
  buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
  buf = taosDecodeStringTo(buf, pTopicInfo->name);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
  for (int32_t i = 0; i < sz; i++) {
    SMqHbVgInfo vgInfo;
    buf = taosDecodeSMqVgInfo(buf, &vgInfo);
    taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
  }
  return buf;
}

typedef struct SMqHbMsg {
1836 1837 1838 1839
  int32_t status;  // ask hb endpoint
  int32_t epoch;
  int64_t consumerId;
  SArray* pTopics;  // SArray<SMqHbTopicInfo>
L
Liu Jicong 已提交
1840 1841
} SMqHbMsg;

S
Shengliang Guan 已提交
1842 1843
static FORCE_INLINE int32_t taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1844 1845 1846 1847 1848
  tlen += taosEncodeFixedI32(buf, pMsg->status);
  tlen += taosEncodeFixedI32(buf, pMsg->epoch);
  tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
  int32_t sz = taosArrayGetSize(pMsg->pTopics);
  tlen += taosEncodeFixedI32(buf, sz);
S
Shengliang Guan 已提交
1849
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862
    SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
    tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
  }
  return tlen;
}

static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
  buf = taosDecodeFixedI32(buf, &pMsg->status);
  buf = taosDecodeFixedI32(buf, &pMsg->epoch);
  buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
S
Shengliang Guan 已提交
1863
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
1864 1865 1866 1867 1868 1869 1870
    SMqHbTopicInfo topicInfo;
    buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
    taosArrayPush(pMsg->pTopics, &topicInfo);
  }
  return buf;
}

L
Liu Jicong 已提交
1871
typedef struct {
1872 1873
  int64_t leftForVer;
  int32_t vgId;
L
Liu Jicong 已提交
1874
  int32_t epoch;
1875 1876
  int64_t consumerId;
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1877
  char    cgroup[TSDB_CGROUP_LEN];
1878 1879 1880 1881
  char*   sql;
  char*   logicalPlan;
  char*   physicalPlan;
  char*   qmsg;
L
Liu Jicong 已提交
1882 1883 1884 1885
} SMqSetCVgReq;

static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
1886
  tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
L
Liu Jicong 已提交
1887
  tlen += taosEncodeFixedI32(buf, pReq->vgId);
L
Liu Jicong 已提交
1888
  tlen += taosEncodeFixedI32(buf, pReq->epoch);
L
Liu Jicong 已提交
1889
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
L
Liu Jicong 已提交
1890
  tlen += taosEncodeString(buf, pReq->topicName);
L
Liu Jicong 已提交
1891
  tlen += taosEncodeString(buf, pReq->cgroup);
L
Liu Jicong 已提交
1892 1893 1894
  tlen += taosEncodeString(buf, pReq->sql);
  tlen += taosEncodeString(buf, pReq->logicalPlan);
  tlen += taosEncodeString(buf, pReq->physicalPlan);
L
Liu Jicong 已提交
1895
  tlen += taosEncodeString(buf, pReq->qmsg);
L
Liu Jicong 已提交
1896 1897 1898 1899
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
L
Liu Jicong 已提交
1900
  buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
L
Liu Jicong 已提交
1901
  buf = taosDecodeFixedI32(buf, &pReq->vgId);
L
Liu Jicong 已提交
1902
  buf = taosDecodeFixedI32(buf, &pReq->epoch);
L
Liu Jicong 已提交
1903
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
L
Liu Jicong 已提交
1904
  buf = taosDecodeStringTo(buf, pReq->topicName);
L
Liu Jicong 已提交
1905
  buf = taosDecodeStringTo(buf, pReq->cgroup);
L
Liu Jicong 已提交
1906 1907 1908
  buf = taosDecodeString(buf, &pReq->sql);
  buf = taosDecodeString(buf, &pReq->logicalPlan);
  buf = taosDecodeString(buf, &pReq->physicalPlan);
L
Liu Jicong 已提交
1909
  buf = taosDecodeString(buf, &pReq->qmsg);
L
Liu Jicong 已提交
1910 1911 1912
  return buf;
}

L
Liu Jicong 已提交
1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943
typedef struct {
  int64_t leftForVer;
  int32_t vgId;
  int32_t epoch;
  int64_t consumerId;
  char    topicName[TSDB_TOPIC_FNAME_LEN];
} SMqCancelConnReq;

static FORCE_INLINE int32_t tEncodeSMqCancelConnReq(void** buf, const SMqCancelConnReq* pReq) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
  tlen += taosEncodeFixedI32(buf, pReq->vgId);
  tlen += taosEncodeFixedI32(buf, pReq->epoch);
  tlen += taosEncodeFixedI64(buf, pReq->consumerId);
  tlen += taosEncodeString(buf, pReq->topicName);
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqCancelConnReq(void* buf, SMqCancelConnReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
  buf = taosDecodeFixedI32(buf, &pReq->vgId);
  buf = taosDecodeFixedI32(buf, &pReq->epoch);
  buf = taosDecodeFixedI64(buf, &pReq->consumerId);
  buf = taosDecodeStringTo(buf, pReq->topicName);
  return buf;
}

typedef struct {
  int8_t reserved;
} SMqCancelConnRsp;

L
Liu Jicong 已提交
1944
typedef struct {
1945 1946 1947 1948
  int64_t leftForVer;
  int32_t vgId;
  int64_t oldConsumerId;
  int64_t newConsumerId;
L
Liu Jicong 已提交
1949
  char*   topic;
L
Liu Jicong 已提交
1950 1951 1952 1953 1954 1955 1956 1957
} SMqMVRebReq;

static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
  tlen += taosEncodeFixedI32(buf, pReq->vgId);
  tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId);
  tlen += taosEncodeFixedI64(buf, pReq->newConsumerId);
L
Liu Jicong 已提交
1958
  tlen += taosEncodeString(buf, pReq->topic);
L
Liu Jicong 已提交
1959 1960 1961 1962 1963 1964 1965 1966
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) {
  buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
  buf = taosDecodeFixedI32(buf, &pReq->vgId);
  buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId);
  buf = taosDecodeFixedI64(buf, &pReq->newConsumerId);
L
Liu Jicong 已提交
1967
  buf = taosDecodeString(buf, &pReq->topic);
L
Liu Jicong 已提交
1968 1969 1970 1971
  return buf;
}

typedef struct {
L
Liu Jicong 已提交
1972 1973 1974 1975
  SMsgHead header;
  int32_t  vgId;
  int64_t  consumerId;
  char     topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1976
  char     cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1977 1978
} SMqSetCVgRsp;

L
Liu Jicong 已提交
1979 1980 1981 1982 1983
typedef struct {
  SMsgHead header;
  int32_t  vgId;
  int64_t  consumerId;
  char     topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1984
  char     cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1985 1986
} SMqMVRebRsp;

L
Liu Jicong 已提交
1987 1988 1989 1990
typedef struct {
  int32_t vgId;
  int64_t offset;
  char    topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
1991
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
1992 1993 1994 1995 1996
} SMqOffset;

typedef struct {
  int32_t    num;
  SMqOffset* offsets;
L
Liu Jicong 已提交
1997
} SMqCMCommitOffsetReq;
L
Liu Jicong 已提交
1998 1999 2000

typedef struct {
  int32_t reserved;
L
Liu Jicong 已提交
2001
} SMqCMCommitOffsetRsp;
L
Liu Jicong 已提交
2002 2003 2004

int32_t tEncodeSMqOffset(SCoder* encoder, const SMqOffset* pOffset);
int32_t tDecodeSMqOffset(SCoder* decoder, SMqOffset* pOffset);
L
Liu Jicong 已提交
2005 2006
int32_t tEncodeSMqCMCommitOffsetReq(SCoder* encoder, const SMqCMCommitOffsetReq* pReq);
int32_t tDecodeSMqCMCommitOffsetReq(SCoder* decoder, SMqCMCommitOffsetReq* pReq);
L
Liu Jicong 已提交
2007

L
Liu Jicong 已提交
2008 2009
typedef struct {
  uint32_t nCols;
C
Cary Xu 已提交
2010 2011 2012 2013
  union {
    SSchema*   pSchema;
    SSchemaEx* pSchemaEx;
  };
L
Liu Jicong 已提交
2014 2015
} SSchemaWrapper;

S
Shengliang Guan 已提交
2016
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
L
Liu Jicong 已提交
2017 2018 2019
  int32_t tlen = 0;
  tlen += taosEncodeFixedI8(buf, pSchema->type);
  tlen += taosEncodeFixedI32(buf, pSchema->bytes);
2020
  tlen += taosEncodeFixedI16(buf, pSchema->colId);
L
Liu Jicong 已提交
2021 2022 2023 2024
  tlen += taosEncodeString(buf, pSchema->name);
  return tlen;
}

S
Shengliang Guan 已提交
2025
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
L
Liu Jicong 已提交
2026 2027
  buf = taosDecodeFixedI8(buf, &pSchema->type);
  buf = taosDecodeFixedI32(buf, &pSchema->bytes);
2028
  buf = taosDecodeFixedI16(buf, &pSchema->colId);
L
Liu Jicong 已提交
2029 2030 2031 2032
  buf = taosDecodeStringTo(buf, pSchema->name);
  return buf;
}

S
Shengliang Guan 已提交
2033 2034 2035
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
  if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
  if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
2036
  if (tEncodeI16(pEncoder, pSchema->colId) < 0) return -1;
S
Shengliang Guan 已提交
2037 2038 2039 2040 2041 2042 2043
  if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
  return 0;
}

static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
  if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
  if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
2044
  if (tDecodeI16(pDecoder, &pSchema->colId) < 0) return -1;
S
Shengliang Guan 已提交
2045 2046 2047 2048
  if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
  return 0;
}

L
Liu Jicong 已提交
2049
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
L
Liu Jicong 已提交
2050 2051
  int32_t tlen = 0;
  tlen += taosEncodeFixedU32(buf, pSW->nCols);
2052
  for (int32_t i = 0; i < pSW->nCols; i++) {
S
Shengliang Guan 已提交
2053
    tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
L
Liu Jicong 已提交
2054 2055 2056 2057
  }
  return tlen;
}

L
Liu Jicong 已提交
2058
static FORCE_INLINE void* taosDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW) {
L
Liu Jicong 已提交
2059
  buf = taosDecodeFixedU32(buf, &pSW->nCols);
wafwerar's avatar
wafwerar 已提交
2060
  pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
L
Liu Jicong 已提交
2061 2062 2063
  if (pSW->pSchema == NULL) {
    return NULL;
  }
2064

2065
  for (int32_t i = 0; i < pSW->nCols; i++) {
S
Shengliang Guan 已提交
2066
    buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
L
Liu Jicong 已提交
2067 2068 2069
  }
  return buf;
}
S
Shengliang Guan 已提交
2070

L
Liu Jicong 已提交
2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SCoder* pEncoder, const SSchemaWrapper* pSW) {
  if (tEncodeU32(pEncoder, pSW->nCols) < 0) return -1;
  for (int32_t i = 0; i < pSW->nCols; i++) {
    if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
  }
  return pEncoder->pos;
}

static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapper* pSW) {
  if (tDecodeU32(pDecoder, &pSW->nCols) < 0) return -1;
  void* ptr = taosMemoryRealloc(pSW->pSchema, pSW->nCols * sizeof(SSchema));
  if (ptr == NULL) {
    return -1;
  }
  pSW->pSchema = (SSchema*)ptr;
  for (int32_t i = 0; i < pSW->nCols; i++) {
    if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
  }
  return 0;
}

S
Shengliang Guan 已提交
2092 2093 2094 2095 2096 2097 2098
typedef struct {
  char    name[TSDB_TABLE_FNAME_LEN];
  char    stb[TSDB_TABLE_FNAME_LEN];
  int8_t  igExists;
  int8_t  intervalUnit;
  int8_t  slidingUnit;
  int8_t  timezone;
S
sma  
Shengliang Guan 已提交
2099
  int32_t dstVgId;  // for stream
S
Shengliang Guan 已提交
2100 2101 2102
  int64_t interval;
  int64_t offset;
  int64_t sliding;
S
sma  
Shengliang Guan 已提交
2103 2104 2105 2106
  int32_t exprLen;        // strlen + 1
  int32_t tagsFilterLen;  // strlen + 1
  int32_t sqlLen;         // strlen + 1
  int32_t astLen;         // strlen + 1
L
Liu Jicong 已提交
2107
  char*   expr;
S
Shengliang Guan 已提交
2108
  char*   tagsFilter;
S
sma  
Shengliang Guan 已提交
2109 2110
  char*   sql;
  char*   ast;
S
Shengliang Guan 已提交
2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
} SMCreateSmaReq;

int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
int32_t tDeserializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
void    tFreeSMCreateSmaReq(SMCreateSmaReq* pReq);

typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
} SMDropSmaReq;

int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);

C
Cary Xu 已提交
2125
typedef struct {
L
Liu Jicong 已提交
2126 2127 2128
  int8_t   version;       // for compatibility(default 0)
  int8_t   intervalUnit;  // MACRO: TIME_UNIT_XXX
  int8_t   slidingUnit;   // MACRO: TIME_UNIT_XXX
L
Liu Jicong 已提交
2129
  int8_t   timezoneInt;   // sma data expired if timezone changes.
C
Cary Xu 已提交
2130
  char     indexName[TSDB_INDEX_NAME_LEN];
X
Xiaoyu Wang 已提交
2131 2132
  int32_t  exprLen;
  int32_t  tagsFilterLen;
C
Cary Xu 已提交
2133 2134 2135
  int64_t  indexUid;
  tb_uid_t tableUid;  // super/child/common table uid
  int64_t  interval;
L
Liu Jicong 已提交
2136
  int64_t  offset;  // use unit by precision of DB
C
Cary Xu 已提交
2137 2138 2139
  int64_t  sliding;
  char*    expr;  // sma expression
  char*    tagsFilter;
C
Cary Xu 已提交
2140
} STSma;  // Time-range-wise SMA
C
Cary Xu 已提交
2141

C
Cary Xu 已提交
2142
typedef struct {
C
Cary Xu 已提交
2143 2144 2145
  int64_t ver;  // use a general definition
  STSma   tSma;
} SVCreateTSmaReq;
C
Cary Xu 已提交
2146 2147

typedef struct {
C
Cary Xu 已提交
2148 2149 2150
  int8_t  type;  // 0 status report, 1 update data
  int64_t indexUid;
  int64_t skey;  // start TS key of interval/sliding window
C
Cary Xu 已提交
2151 2152 2153 2154
} STSmaMsg;

typedef struct {
  int64_t ver;  // use a general definition
C
Cary Xu 已提交
2155
  int64_t indexUid;
C
Cary Xu 已提交
2156
  char    indexName[TSDB_INDEX_NAME_LEN];
C
Cary Xu 已提交
2157
} SVDropTSmaReq;
2158

C
Cary Xu 已提交
2159
typedef struct {
L
Liu Jicong 已提交
2160
  int tmp;  // TODO: to avoid compile error
C
Cary Xu 已提交
2161 2162 2163 2164 2165 2166
} SVCreateTSmaRsp, SVDropTSmaRsp;

int32_t tSerializeSVCreateTSmaReq(void** buf, SVCreateTSmaReq* pReq);
void*   tDeserializeSVCreateTSmaReq(void* buf, SVCreateTSmaReq* pReq);
int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq);
void*   tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq);
C
Cary Xu 已提交
2167

C
Cary Xu 已提交
2168
// RSma: Rollup SMA
C
Cary Xu 已提交
2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181
typedef struct {
  int64_t  interval;
  int32_t  retention;  // unit: day
  uint16_t days;       // unit: day
  int8_t   intervalUnit;
} SSmaParams;

typedef struct {
  STSma   tsma;
  float   xFilesFactor;
  SArray* smaParams;  // SSmaParams
} SRSma;

C
Cary Xu 已提交
2182 2183 2184 2185 2186
typedef struct {
  uint32_t number;
  STSma*   tSma;
} STSmaWrapper;

C
Cary Xu 已提交
2187
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
C
Cary Xu 已提交
2188
  if (pSma) {
wafwerar's avatar
wafwerar 已提交
2189 2190
    taosMemoryFreeClear(pSma->expr);
    taosMemoryFreeClear(pSma->tagsFilter);
C
Cary Xu 已提交
2191 2192 2193
  }
}

C
Cary Xu 已提交
2194
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
C
Cary Xu 已提交
2195 2196 2197
  if (pSW) {
    if (pSW->tSma) {
      for (uint32_t i = 0; i < pSW->number; ++i) {
C
Cary Xu 已提交
2198
        tdDestroyTSma(pSW->tSma + i);
C
Cary Xu 已提交
2199
      }
wafwerar's avatar
wafwerar 已提交
2200
      taosMemoryFreeClear(pSW->tSma);
C
Cary Xu 已提交
2201
    }
C
Cary Xu 已提交
2202 2203 2204
  }
}

C
Cary Xu 已提交
2205 2206
static FORCE_INLINE void tdFreeTSmaWrapper(STSmaWrapper* pSW) {
  tdDestroyTSmaWrapper(pSW);
wafwerar's avatar
wafwerar 已提交
2207
  taosMemoryFreeClear(pSW);
C
Cary Xu 已提交
2208 2209
}

C
Cary Xu 已提交
2210 2211 2212
static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) {
  int32_t tlen = 0;

C
Cary Xu 已提交
2213 2214 2215
  tlen += taosEncodeFixedI8(buf, pSma->version);
  tlen += taosEncodeFixedI8(buf, pSma->intervalUnit);
  tlen += taosEncodeFixedI8(buf, pSma->slidingUnit);
S
sma  
Shengliang Guan 已提交
2216
  tlen += taosEncodeFixedI8(buf, pSma->timezoneInt);
C
Cary Xu 已提交
2217
  tlen += taosEncodeString(buf, pSma->indexName);
X
Xiaoyu Wang 已提交
2218 2219
  tlen += taosEncodeFixedI32(buf, pSma->exprLen);
  tlen += taosEncodeFixedI32(buf, pSma->tagsFilterLen);
C
Cary Xu 已提交
2220
  tlen += taosEncodeFixedI64(buf, pSma->indexUid);
C
Cary Xu 已提交
2221
  tlen += taosEncodeFixedI64(buf, pSma->tableUid);
C
Cary Xu 已提交
2222
  tlen += taosEncodeFixedI64(buf, pSma->interval);
X
Xiaoyu Wang 已提交
2223
  tlen += taosEncodeFixedI64(buf, pSma->offset);
C
Cary Xu 已提交
2224
  tlen += taosEncodeFixedI64(buf, pSma->sliding);
L
Liu Jicong 已提交
2225

C
Cary Xu 已提交
2226 2227
  if (pSma->exprLen > 0) {
    tlen += taosEncodeString(buf, pSma->expr);
C
Cary Xu 已提交
2228 2229
  }

C
Cary Xu 已提交
2230 2231
  if (pSma->tagsFilterLen > 0) {
    tlen += taosEncodeString(buf, pSma->tagsFilter);
C
Cary Xu 已提交
2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247
  }

  return tlen;
}

static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* pSW) {
  int32_t tlen = 0;

  tlen += taosEncodeFixedU32(buf, pSW->number);
  for (uint32_t i = 0; i < pSW->number; ++i) {
    tlen += tEncodeTSma(buf, pSW->tSma + i);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) {
C
Cary Xu 已提交
2248 2249 2250
  buf = taosDecodeFixedI8(buf, &pSma->version);
  buf = taosDecodeFixedI8(buf, &pSma->intervalUnit);
  buf = taosDecodeFixedI8(buf, &pSma->slidingUnit);
S
sma  
Shengliang Guan 已提交
2251
  buf = taosDecodeFixedI8(buf, &pSma->timezoneInt);
C
Cary Xu 已提交
2252
  buf = taosDecodeStringTo(buf, pSma->indexName);
X
Xiaoyu Wang 已提交
2253 2254
  buf = taosDecodeFixedI32(buf, &pSma->exprLen);
  buf = taosDecodeFixedI32(buf, &pSma->tagsFilterLen);
C
Cary Xu 已提交
2255
  buf = taosDecodeFixedI64(buf, &pSma->indexUid);
C
Cary Xu 已提交
2256
  buf = taosDecodeFixedI64(buf, &pSma->tableUid);
C
Cary Xu 已提交
2257
  buf = taosDecodeFixedI64(buf, &pSma->interval);
X
Xiaoyu Wang 已提交
2258
  buf = taosDecodeFixedI64(buf, &pSma->offset);
C
Cary Xu 已提交
2259 2260
  buf = taosDecodeFixedI64(buf, &pSma->sliding);

C
Cary Xu 已提交
2261
  if (pSma->exprLen > 0) {
2262
    if ((buf = taosDecodeString(buf, &pSma->expr)) == NULL) {
C
Cary Xu 已提交
2263
      tdDestroyTSma(pSma);
C
Cary Xu 已提交
2264 2265 2266
      return NULL;
    }
  } else {
C
Cary Xu 已提交
2267
    pSma->expr = NULL;
C
Cary Xu 已提交
2268 2269
  }

C
Cary Xu 已提交
2270
  if (pSma->tagsFilterLen > 0) {
2271
    if ((buf = taosDecodeString(buf, &pSma->tagsFilter)) == NULL) {
C
Cary Xu 已提交
2272
      tdDestroyTSma(pSma);
C
Cary Xu 已提交
2273 2274 2275
      return NULL;
    }
  } else {
C
Cary Xu 已提交
2276
    pSma->tagsFilter = NULL;
C
Cary Xu 已提交
2277 2278 2279 2280 2281 2282 2283 2284
  }

  return buf;
}

static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
  buf = taosDecodeFixedU32(buf, &pSW->number);

wafwerar's avatar
wafwerar 已提交
2285
  pSW->tSma = (STSma*)taosMemoryCalloc(pSW->number, sizeof(STSma));
C
Cary Xu 已提交
2286 2287 2288 2289 2290 2291 2292
  if (pSW->tSma == NULL) {
    return NULL;
  }

  for (uint32_t i = 0; i < pSW->number; ++i) {
    if ((buf = tDecodeTSma(buf, pSW->tSma + i)) == NULL) {
      for (uint32_t j = i; j >= 0; --i) {
C
Cary Xu 已提交
2293
        tdDestroyTSma(pSW->tSma + j);
C
Cary Xu 已提交
2294
      }
wafwerar's avatar
wafwerar 已提交
2295
      taosMemoryFree(pSW->tSma);
C
Cary Xu 已提交
2296 2297 2298 2299 2300
      return NULL;
    }
  }
  return buf;
}
L
Liu Jicong 已提交
2301

dengyihao's avatar
dengyihao 已提交
2302
typedef struct {
dengyihao's avatar
dengyihao 已提交
2303
  int idx;
dengyihao's avatar
dengyihao 已提交
2304 2305 2306 2307 2308 2309
} SMCreateFullTextReq;

int32_t tSerializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
int32_t tDeserializeSMCreateFullTextReq(void* buf, int32_t bufLen, SMCreateFullTextReq* pReq);
void    tFreeSMCreateFullTextReq(SMCreateFullTextReq* pReq);

dengyihao's avatar
dengyihao 已提交
2310 2311 2312 2313 2314 2315 2316 2317
typedef struct {
  char   name[TSDB_TABLE_FNAME_LEN];
  int8_t igNotExists;
} SMDropFullTextReq;

int32_t tSerializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);
int32_t tDeserializeSMDropFullTextReq(void* buf, int32_t bufLen, SMDropFullTextReq* pReq);

D
dapan1121 已提交
2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335
typedef struct {
  char indexFName[TSDB_INDEX_FNAME_LEN];
} SUserIndexReq;

int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);

typedef struct {
  char dbFName[TSDB_DB_FNAME_LEN];
  char tblFName[TSDB_TABLE_FNAME_LEN];
  char colName[TSDB_COL_NAME_LEN];
  char indexType[TSDB_INDEX_TYPE_LEN];
  char indexExts[TSDB_INDEX_EXTS_LEN];
} SUserIndexRsp;

int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);

L
Liu Jicong 已提交
2336 2337 2338 2339
typedef struct {
  int8_t  mqMsgType;
  int32_t code;
  int32_t epoch;
L
Liu Jicong 已提交
2340
  int64_t consumerId;
L
Liu Jicong 已提交
2341 2342
} SMqRspHead;

L
Liu Jicong 已提交
2343
typedef struct {
2344
  SMsgHead head;
L
Liu Jicong 已提交
2345

2346 2347
  int64_t consumerId;
  int64_t blockingTime;
L
Liu Jicong 已提交
2348
  int32_t epoch;
L
Liu Jicong 已提交
2349
  int8_t  withSchema;
L
Liu Jicong 已提交
2350
  char    cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
2351

dengyihao's avatar
dengyihao 已提交
2352
  int64_t  currentOffset;
L
add log  
Liu Jicong 已提交
2353
  uint64_t reqId;
dengyihao's avatar
dengyihao 已提交
2354
  char     topic[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
2355
} SMqPollReq;
L
Liu Jicong 已提交
2356

L
Liu Jicong 已提交
2357
typedef struct {
L
Liu Jicong 已提交
2358
  int32_t vgId;
L
Liu Jicong 已提交
2359
  int64_t offset;
L
Liu Jicong 已提交
2360 2361 2362
  SEpSet  epSet;
} SMqSubVgEp;

L
Liu Jicong 已提交
2363
typedef struct {
L
Liu Jicong 已提交
2364 2365 2366 2367 2368
  char        topic[TSDB_TOPIC_FNAME_LEN];
  int8_t      isSchemaAdaptive;
  SArray*     vgs;  // SArray<SMqSubVgEp>
  int32_t     numOfFields;
  TAOS_FIELD* fields;
L
Liu Jicong 已提交
2369 2370
} SMqSubTopicEp;

L
Liu Jicong 已提交
2371
typedef struct {
L
Liu Jicong 已提交
2372
  SMqRspHead head;
L
Liu Jicong 已提交
2373 2374 2375 2376 2377
  int64_t    reqOffset;
  int64_t    rspOffset;
  int32_t    skipLogNum;
  // TODO: replace with topic name
  int32_t numOfTopics;
L
Liu Jicong 已提交
2378 2379 2380 2381
  // TODO: remove from msg
  SSchemaWrapper* schema;
  SArray*         pBlockData;  // SArray<SSDataBlock>
} SMqPollRsp;
L
Liu Jicong 已提交
2382

L
Liu Jicong 已提交
2383
typedef struct {
L
Liu Jicong 已提交
2384
  SMqRspHead head;
L
Liu Jicong 已提交
2385 2386 2387
  char       cgroup[TSDB_CGROUP_LEN];
  SArray*    topics;  // SArray<SMqSubTopicEp>
} SMqCMGetSubEpRsp;
L
Liu Jicong 已提交
2388

2389
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); }
L
Liu Jicong 已提交
2390

L
Liu Jicong 已提交
2391 2392
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
2393
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
L
Liu Jicong 已提交
2394
  tlen += taosEncodeFixedI64(buf, pVgEp->offset);
L
Liu Jicong 已提交
2395 2396 2397 2398 2399 2400
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
L
Liu Jicong 已提交
2401
  buf = taosDecodeFixedI64(buf, &pVgEp->offset);
L
Liu Jicong 已提交
2402 2403 2404 2405
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return buf;
}

L
Liu Jicong 已提交
2406
static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
2407
  taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp);
L
Liu Jicong 已提交
2408 2409
}

L
Liu Jicong 已提交
2410 2411 2412
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pTopicEp->topic);
L
Liu Jicong 已提交
2413
  tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
L
Liu Jicong 已提交
2414 2415 2416 2417 2418 2419
  int32_t sz = taosArrayGetSize(pTopicEp->vgs);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
    tlen += tEncodeSMqSubVgEp(buf, pVgEp);
  }
L
Liu Jicong 已提交
2420
  tlen += taosEncodeFixedI32(buf, pTopicEp->numOfFields);
L
fix  
Liu Jicong 已提交
2421
  // tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
L
Liu Jicong 已提交
2422 2423 2424 2425 2426
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
  buf = taosDecodeStringTo(buf, pTopicEp->topic);
L
Liu Jicong 已提交
2427
  buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
L
Liu Jicong 已提交
2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
  if (pTopicEp->vgs == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
    SMqSubVgEp vgEp;
    buf = tDecodeSMqSubVgEp(buf, &vgEp);
    taosArrayPush(pTopicEp->vgs, &vgEp);
  }
L
Liu Jicong 已提交
2439
  buf = taosDecodeFixedI32(buf, &pTopicEp->numOfFields);
L
fix  
Liu Jicong 已提交
2440
  // buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
L
Liu Jicong 已提交
2441 2442 2443 2444 2445
  return buf;
}

static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
  int32_t tlen = 0;
L
Liu Jicong 已提交
2446
  // tlen += taosEncodeString(buf, pRsp->cgroup);
L
Liu Jicong 已提交
2447 2448 2449 2450 2451 2452 2453 2454 2455 2456
  int32_t sz = taosArrayGetSize(pRsp->topics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
    tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
L
Liu Jicong 已提交
2457
  // buf = taosDecodeStringTo(buf, pRsp->cgroup);
L
Liu Jicong 已提交
2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
  pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
  if (pRsp->topics == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
    SMqSubTopicEp topicEp;
    buf = tDecodeSMqSubTopicEp(buf, &topicEp);
    taosArrayPush(pRsp->topics, &topicEp);
  }
  return buf;
}
L
Liu Jicong 已提交
2471 2472
#pragma pack(pop)

D
dapan1121 已提交
2473 2474 2475 2476 2477
#ifdef __cplusplus
}
#endif

#endif /*_TD_COMMON_TAOS_MSG_H_*/