tq.h 7.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "mallocator.h"
L
Liu Jicong 已提交
20
#include "os.h"
L
Liu Jicong 已提交
21
#include "tutil.h"
L
Liu Jicong 已提交
22

H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27
typedef struct TmqMsgHead {
L
Liu Jicong 已提交
28
  int32_t protoVer;
L
Liu Jicong 已提交
29
  int32_t msgType;
L
Liu Jicong 已提交
30
  int64_t cgId;
L
Liu Jicong 已提交
31
  int64_t clientId;
L
Liu Jicong 已提交
32
} TmqMsgHead;
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
typedef struct TmqOneAck {
L
Liu Jicong 已提交
35 36
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
37
} TmqOneAck;
L
Liu Jicong 已提交
38

L
Liu Jicong 已提交
39
typedef struct TmqAcks {
L
Liu Jicong 已提交
40
  int32_t ackNum;
L
Liu Jicong 已提交
41
  // should be sorted
L
Liu Jicong 已提交
42 43
  TmqOneAck acks[];
} TmqAcks;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
// TODO: put msgs into common
L
Liu Jicong 已提交
46 47
typedef struct TmqConnectReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
48
  TmqAcks    acks;
L
Liu Jicong 已提交
49
} TmqConnectReq;
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52
typedef struct TmqConnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
53
  int8_t     status;
L
Liu Jicong 已提交
54
} TmqConnectRsp;
L
Liu Jicong 已提交
55

L
Liu Jicong 已提交
56 57 58
typedef struct TmqDisconnectReq {
  TmqMsgHead head;
} TmqDiscconectReq;
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60 61
typedef struct TmqDisconnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
62
  int8_t     status;
L
Liu Jicong 已提交
63
} TmqDisconnectRsp;
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
typedef struct TmqConsumeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
67
  TmqAcks    acks;
L
Liu Jicong 已提交
68
} TmqConsumeReq;
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70
typedef struct TmqMsgContent {
L
Liu Jicong 已提交
71 72 73
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
74
} TmqMsgContent;
L
Liu Jicong 已提交
75

L
Liu Jicong 已提交
76 77
typedef struct TmqConsumeRsp {
  TmqMsgHead    head;
L
Liu Jicong 已提交
78
  int64_t       bodySize;
L
Liu Jicong 已提交
79 80
  TmqMsgContent msgs[];
} TmqConsumeRsp;
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83
typedef struct TmqSubscribeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
84 85
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
86
} TmqSubscribeReq;
L
Liu Jicong 已提交
87

L
Liu Jicong 已提交
88 89
typedef struct tmqSubscribeRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
90 91
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
92
} TmqSubscribeRsp;
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94 95
typedef struct TmqHeartbeatReq {
} TmqHeartbeatReq;
L
Liu Jicong 已提交
96

L
Liu Jicong 已提交
97 98
typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
typedef struct STqTopicVhandle {
L
Liu Jicong 已提交
101
  int64_t topicId;
L
Liu Jicong 已提交
102 103 104 105
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
106
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
107
} STqTopicVhandle;
L
Liu Jicong 已提交
108

L
Liu Jicong 已提交
109 110
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
111
typedef struct STqBufferItem {
L
Liu Jicong 已提交
112
  int64_t offset;
L
Liu Jicong 已提交
113
  // executors are identical but not concurrent
L
Liu Jicong 已提交
114
  // so there must be a copy in each item
L
Liu Jicong 已提交
115
  void*   executor;
L
Liu Jicong 已提交
116
  int64_t size;
L
Liu Jicong 已提交
117
  void*   content;
L
Liu Jicong 已提交
118
} STqBufferItem;
L
Liu Jicong 已提交
119

L
Liu Jicong 已提交
120
typedef struct STqBufferHandle {
L
Liu Jicong 已提交
121 122 123
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  int64_t       nextConsumeOffset;
  int64_t       floatingCursor;
  int64_t       topicId;
  int32_t       head;
  int32_t       tail;
  STqBufferItem buffer[TQ_BUFFER_SIZE];
} STqBufferHandle;

typedef struct STqListHandle {
  STqBufferHandle       bufHandle;
  struct STqListHandle* next;
} STqListHandle;

typedef struct STqGroupHandle {
  int64_t        cId;
  int64_t        cgId;
  void*          ahandle;
  int32_t        topicNum;
  STqListHandle* head;
} STqGroupHandle;

typedef struct STqQueryExec {
  void*          src;
  STqBufferItem* dest;
  void*          executor;
} STqQueryExec;

typedef struct STqQueryMsg {
  STqQueryExec*       exec;
  struct STqQueryMsg* next;
} STqQueryMsg;

typedef struct STqLogReader {
L
Liu Jicong 已提交
157
  void* logHandle;
L
Liu Jicong 已提交
158 159 160 161
  int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
162
} STqLogReader;
L
Liu Jicong 已提交
163

H
refact  
Hongze Cheng 已提交
164
typedef struct STqCfg {
L
Liu Jicong 已提交
165
  // TODO
H
refact  
Hongze Cheng 已提交
166
} STqCfg;
L
Liu Jicong 已提交
167

L
Liu Jicong 已提交
168 169 170 171
typedef struct STqMemRef {
  SMemAllocatorFactory* pAlloctorFactory;
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
172

L
Liu Jicong 已提交
173
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
174 175 176 177 178
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
179
} STqSerializedHead;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181 182 183
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
184 185 186 187 188

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
189
// key + offset + size
L
Liu Jicong 已提交
190
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
191
// 4096 / 24
L
Liu Jicong 已提交
192
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
193
// 24 * 170
L
Liu Jicong 已提交
194
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
195
// 4096 - 4080
L
Liu Jicong 已提交
196 197
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
198 199
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
200
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
201
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
202

L
Liu Jicong 已提交
203
#define TQ_SVER 0
L
Liu Jicong 已提交
204

L
Liu Jicong 已提交
205 206 207
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
208 209

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
210
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
211

L
Liu Jicong 已提交
212
static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
213

L
Liu Jicong 已提交
214
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
215 216

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
217 218

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
219 220 221 222 223 224 225

typedef struct TqMetaHandle {
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
226
} STqMetaHandle;
L
Liu Jicong 已提交
227 228

typedef struct TqMetaList {
L
Liu Jicong 已提交
229
  STqMetaHandle      handle;
L
Liu Jicong 已提交
230
  struct TqMetaList* next;
L
Liu Jicong 已提交
231 232
  // struct TqMetaList* inTxnPrev;
  // struct TqMetaList* inTxnNext;
L
Liu Jicong 已提交
233 234
  struct TqMetaList* unpersistPrev;
  struct TqMetaList* unpersistNext;
L
Liu Jicong 已提交
235
} STqMetaList;
L
Liu Jicong 已提交
236 237

typedef struct TqMetaStore {
L
Liu Jicong 已提交
238 239 240 241 242 243 244 245 246 247 248 249
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
  int            idxFd;
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
250
} STqMetaStore;
L
Liu Jicong 已提交
251

L
Liu Jicong 已提交
252 253 254
typedef struct STQ {
  // the collection of group handle
  // the handle of kvstore
L
Liu Jicong 已提交
255 256 257 258
  char*         path;
  STqCfg*       tqConfig;
  STqLogReader* tqLogReader;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
259
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
260 261 262
} STQ;

// open in each vnode
L
Liu Jicong 已提交
263
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
264
void tqDestroy(STQ*);
L
Liu Jicong 已提交
265

L
Liu Jicong 已提交
266
// void* will be replace by a msg type
L
Liu Jicong 已提交
267
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
268 269
int tqCommit(STQ*);

L
Liu Jicong 已提交
270
int tqConsume(STQ*, TmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
271

L
Liu Jicong 已提交
272
STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
273

L
Liu Jicong 已提交
274 275 276 277 278 279 280
STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int             tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int             tqMoveOffsetToNext(STqGroupHandle*);
int             tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int             tqRegisterContext(STqGroupHandle*, void* ahandle);
int             tqLaunchQuery(STqGroupHandle*);
int             tqSendLaunchQuery(STqGroupHandle*);
L
Liu Jicong 已提交
281

L
Liu Jicong 已提交
282
int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead);
L
Liu Jicong 已提交
283

L
Liu Jicong 已提交
284
const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle);
L
Liu Jicong 已提交
285

H
refact  
Hongze Cheng 已提交
286 287 288 289
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
290
#endif /*_TD_TQ_H_*/