tq.h 7.5 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "common.h"
L
Liu Jicong 已提交
20
#include "mallocator.h"
L
Liu Jicong 已提交
21
#include "os.h"
L
Liu Jicong 已提交
22
#include "taoserror.h"
H
Hongze Cheng 已提交
23
#include "tmsg.h"
L
Liu Jicong 已提交
24
#include "tlist.h"
L
Liu Jicong 已提交
25
#include "trpc.h"
L
Liu Jicong 已提交
26
#include "ttimer.h"
L
Liu Jicong 已提交
27
#include "tutil.h"
L
Liu Jicong 已提交
28

H
refact  
Hongze Cheng 已提交
29 30 31 32
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
33
typedef struct STqMsgHead {
L
Liu Jicong 已提交
34
  int32_t protoVer;
L
Liu Jicong 已提交
35
  int32_t msgType;
L
Liu Jicong 已提交
36
  int64_t cgId;
L
Liu Jicong 已提交
37
  int64_t clientId;
L
Liu Jicong 已提交
38
} STqMsgHead;
L
Liu Jicong 已提交
39

L
Liu Jicong 已提交
40
typedef struct STqOneAck {
L
Liu Jicong 已提交
41 42
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
43
} STqOneAck;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
typedef struct STqAcks {
L
Liu Jicong 已提交
46
  int32_t ackNum;
L
Liu Jicong 已提交
47
  // should be sorted
L
Liu Jicong 已提交
48 49
  STqOneAck acks[];
} STqAcks;
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52 53 54 55
typedef struct STqSetCurReq {
  STqMsgHead head;
  int64_t    topicId;
  int64_t    offset;
} STqSetCurReq;
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
typedef struct STqConsumeReq {
L
Liu Jicong 已提交
58
  STqMsgHead head;
L
Liu Jicong 已提交
59
  int64_t    blockingTime;  // milisec
L
Liu Jicong 已提交
60
  STqAcks    acks;
L
Liu Jicong 已提交
61
} STqConsumeReq;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
typedef struct STqMsgContent {
L
Liu Jicong 已提交
64 65 66
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
67
} STqMsgContent;
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
typedef struct STqConsumeRsp {
L
Liu Jicong 已提交
70
  STqMsgHead    head;
L
Liu Jicong 已提交
71
  int64_t       bodySize;
L
Liu Jicong 已提交
72
  STqMsgContent msgs[];
L
Liu Jicong 已提交
73
} STqConsumeRsp;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75 76
typedef struct STqSubscribeReq {
  STqMsgHead head;
L
Liu Jicong 已提交
77 78
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
79
} STqSubscribeReq;
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82
typedef struct STqSubscribeRsp {
  STqMsgHead head;
L
Liu Jicong 已提交
83 84
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
85
} STqSubscribeRsp;
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87 88
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90 91
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
L
Liu Jicong 已提交
92

L
Liu Jicong 已提交
93
typedef struct STqTopicVhandle {
L
Liu Jicong 已提交
94
  int64_t topicId;
L
Liu Jicong 已提交
95 96 97 98
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
99
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
100
} STqTopicVhandle;
L
Liu Jicong 已提交
101

L
Liu Jicong 已提交
102 103
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
104 105
typedef struct STqExec {
  void* runtimeEnv;
L
Liu Jicong 已提交
106
  SSDataBlock* (*exec)(void* runtimeEnv);
L
Liu Jicong 已提交
107
  void* (*assign)(void* runtimeEnv, void* inputData);
L
Liu Jicong 已提交
108
  void (*clear)(void* runtimeEnv);
L
Liu Jicong 已提交
109 110 111 112
  char* (*serialize)(struct STqExec*);
  struct STqExec* (*deserialize)(char*);
} STqExec;

L
Liu Jicong 已提交
113 114 115 116 117
typedef struct STqRspHandle {
  void* handle;
  void* ahandle;
} STqRspHandle;

L
Liu Jicong 已提交
118 119 120
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;

typedef struct STqTopic STqTopic;
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
typedef struct STqBufferItem {
L
Liu Jicong 已提交
123
  int64_t offset;
L
Liu Jicong 已提交
124
  // executors are identical but not concurrent
L
Liu Jicong 已提交
125
  // so there must be a copy in each item
L
Liu Jicong 已提交
126 127 128 129 130
  STqExec*  executor;
  int32_t   status;
  int64_t   size;
  void*     content;
  STqTopic* pTopic;
L
Liu Jicong 已提交
131
} STqMsgItem;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133
struct STqTopic {
L
Liu Jicong 已提交
134 135 136
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
137 138
  // int32_t    head;
  // int32_t    tail;
L
Liu Jicong 已提交
139 140 141
  int64_t    nextConsumeOffset;
  int64_t    floatingCursor;
  int64_t    topicId;
L
Liu Jicong 已提交
142
  void*      logReader;
L
Liu Jicong 已提交
143
  STqMsgItem buffer[TQ_BUFFER_SIZE];
L
Liu Jicong 已提交
144
};
L
Liu Jicong 已提交
145 146

typedef struct STqListHandle {
L
Liu Jicong 已提交
147
  STqTopic              topic;
L
Liu Jicong 已提交
148
  struct STqListHandle* next;
L
Liu Jicong 已提交
149
} STqList;
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
typedef struct STqGroup {
L
Liu Jicong 已提交
152 153 154 155 156
  int64_t clientId;
  int64_t cgId;
  void*   ahandle;
  int32_t topicNum;
  STqList*     head;
L
Liu Jicong 已提交
157 158
  SList*       topicList;  // SList<STqTopic>
  STqRspHandle rspHandle;
L
Liu Jicong 已提交
159
} STqGroup;
L
Liu Jicong 已提交
160 161

typedef struct STqQueryMsg {
L
Liu Jicong 已提交
162
  STqMsgItem*         item;
L
Liu Jicong 已提交
163 164 165
  struct STqQueryMsg* next;
} STqQueryMsg;

L
Liu Jicong 已提交
166
typedef struct STqLogHandle {
L
Liu Jicong 已提交
167
  void* logHandle;
L
Liu Jicong 已提交
168 169 170 171
  void* (*openLogReader)(void* logHandle);
  void (*closeLogReader)(void* logReader);
  int32_t (*logRead)(void* logReader, void** data, int64_t ver);

L
Liu Jicong 已提交
172 173 174
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
175
} STqLogHandle;
L
Liu Jicong 已提交
176

H
refact  
Hongze Cheng 已提交
177
typedef struct STqCfg {
L
Liu Jicong 已提交
178
  // TODO
H
refact  
Hongze Cheng 已提交
179
} STqCfg;
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181
typedef struct STqMemRef {
L
Liu Jicong 已提交
182
  SMemAllocatorFactory* pAllocatorFactory;
L
Liu Jicong 已提交
183 184
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
185

L
Liu Jicong 已提交
186
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
187 188 189 190 191
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
192
} STqSerializedHead;
L
Liu Jicong 已提交
193

L
Liu Jicong 已提交
194 195 196
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
197 198 199 200 201

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
202
// key + offset + size
L
Liu Jicong 已提交
203
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
204
// 4096 / 24
L
Liu Jicong 已提交
205
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
206
// 24 * 170
L
Liu Jicong 已提交
207
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
208
// 4096 - 4080
L
Liu Jicong 已提交
209 210
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
211 212
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
213
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
214
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
215

L
Liu Jicong 已提交
216
#define TQ_SVER 0
L
Liu Jicong 已提交
217

L
Liu Jicong 已提交
218 219 220
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
221 222

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
223
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
224

L
Liu Jicong 已提交
225
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
228 229

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
230 231

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
232

L
Liu Jicong 已提交
233
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
234 235 236 237 238
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
239
} STqMetaHandle;
L
Liu Jicong 已提交
240

L
Liu Jicong 已提交
241 242 243 244 245 246 247
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
248
} STqMetaList;
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250
typedef struct STqMetaStore {
L
Liu Jicong 已提交
251 252 253
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
254 255
  // topics that are not connectted
  STqMetaList*   unconnectTopic;
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257 258 259
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
260 261
  int idxFd;

L
Liu Jicong 已提交
262 263 264 265 266
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
267
} STqMetaStore;
L
Liu Jicong 已提交
268

L
Liu Jicong 已提交
269
typedef struct STQ {
L
Liu Jicong 已提交
270 271
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
272 273
  char*         path;
  STqCfg*       tqConfig;
L
Liu Jicong 已提交
274
  STqLogHandle* tqLogHandle;
L
Liu Jicong 已提交
275
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
276
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
277 278
} STQ;

L
Liu Jicong 已提交
279 280 281 282 283 284 285 286 287 288 289
typedef struct STqMgmt {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

static STqMgmt tqMgmt;

// init once
int  tqInit();
void tqCleanUp();

L
Liu Jicong 已提交
290
// open in each vnode
L
Liu Jicong 已提交
291
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
292
void tqClose(STQ*);
L
Liu Jicong 已提交
293

L
Liu Jicong 已提交
294
// void* will be replace by a msg type
L
Liu Jicong 已提交
295
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
296
int tqCommit(STQ*);
L
Liu Jicong 已提交
297
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
H
refact  
Hongze Cheng 已提交
298

L
Liu Jicong 已提交
299 300 301 302 303 304 305 306 307 308 309
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);

STqTopic* tqFindTopic(STqGroup*, int64_t topicId);

STqGroup* tqGetGroup(STQ*, int64_t clientId);

STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqRegisterContext(STqGroup*, void* ahandle);
int       tqSendLaunchQuery(STqMsgItem*, int64_t offset);
L
Liu Jicong 已提交
310

L
Liu Jicong 已提交
311
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
L
Liu Jicong 已提交
312

L
Liu Jicong 已提交
313
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
L
Liu Jicong 已提交
314

L
Liu Jicong 已提交
315
static int tqQueryExecuting(int32_t status) { return status; }
L
Liu Jicong 已提交
316

H
refact  
Hongze Cheng 已提交
317 318 319 320
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
321
#endif /*_TD_TQ_H_*/