tq.h 7.0 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "mallocator.h"
L
Liu Jicong 已提交
20
#include "os.h"
L
Liu Jicong 已提交
21
#include "tlist.h"
L
Liu Jicong 已提交
22
#include "tutil.h"
L
Liu Jicong 已提交
23

H
refact  
Hongze Cheng 已提交
24 25 26 27
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28
typedef struct STqMsgHead {
L
Liu Jicong 已提交
29
  int32_t protoVer;
L
Liu Jicong 已提交
30
  int32_t msgType;
L
Liu Jicong 已提交
31
  int64_t cgId;
L
Liu Jicong 已提交
32
  int64_t clientId;
L
Liu Jicong 已提交
33
} STqMsgHead;
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35
typedef struct STqOneAck {
L
Liu Jicong 已提交
36 37
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
38
} STqOneAck;
L
Liu Jicong 已提交
39

L
Liu Jicong 已提交
40
typedef struct STqAcks {
L
Liu Jicong 已提交
41
  int32_t ackNum;
L
Liu Jicong 已提交
42
  // should be sorted
L
Liu Jicong 已提交
43 44
  STqOneAck acks[];
} STqAcks;
L
Liu Jicong 已提交
45

L
Liu Jicong 已提交
46 47 48 49 50
typedef struct STqSetCurReq {
  STqMsgHead head;
  int64_t    topicId;
  int64_t    offset;
} STqSetCurReq;
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
typedef struct STqConsumeReq {
L
Liu Jicong 已提交
53 54
  STqMsgHead head;
  STqAcks    acks;
L
Liu Jicong 已提交
55
} STqConsumeReq;
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
typedef struct STqMsgContent {
L
Liu Jicong 已提交
58 59 60
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
61
} STqMsgContent;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
typedef struct STqConsumeRsp {
L
Liu Jicong 已提交
64
  STqMsgHead    head;
L
Liu Jicong 已提交
65
  int64_t       bodySize;
L
Liu Jicong 已提交
66
  STqMsgContent msgs[];
L
Liu Jicong 已提交
67
} STqConsumeRsp;
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69 70
typedef struct STqSubscribeReq {
  STqMsgHead head;
L
Liu Jicong 已提交
71 72
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
73
} STqSubscribeReq;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75 76
typedef struct STqSubscribeRsp {
  STqMsgHead head;
L
Liu Jicong 已提交
77 78
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
79
} STqSubscribeRsp;
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
L
Liu Jicong 已提交
83

L
Liu Jicong 已提交
84 85
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
typedef struct STqTopicVhandle {
L
Liu Jicong 已提交
88
  int64_t topicId;
L
Liu Jicong 已提交
89 90 91 92
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
93
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
94
} STqTopicVhandle;
L
Liu Jicong 已提交
95

L
Liu Jicong 已提交
96 97
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
98 99 100 101 102 103 104 105 106 107
typedef struct STqExec {
  void* runtimeEnv;
  // return type will be SSDataBlock
  void* (*exec)(void* runtimeEnv);
  // inputData type will be submitblk
  void* (*assign)(void* runtimeEnv, void* inputData);
  char* (*serialize)(struct STqExec*);
  struct STqExec* (*deserialize)(char*);
} STqExec;

L
Liu Jicong 已提交
108
typedef struct STqBufferItem {
L
Liu Jicong 已提交
109
  int64_t offset;
L
Liu Jicong 已提交
110
  // executors are identical but not concurrent
L
Liu Jicong 已提交
111
  // so there must be a copy in each item
L
Liu Jicong 已提交
112 113 114 115
  STqExec* executor;
  int32_t  status;
  int64_t  size;
  void*    content;
L
Liu Jicong 已提交
116
} STqMsgItem;
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118
typedef struct STqTopic {
L
Liu Jicong 已提交
119 120 121
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
122 123 124 125 126 127 128
  int64_t    nextConsumeOffset;
  int64_t    floatingCursor;
  int64_t    topicId;
  int32_t    head;
  int32_t    tail;
  STqMsgItem buffer[TQ_BUFFER_SIZE];
} STqTopic;
L
Liu Jicong 已提交
129 130

typedef struct STqListHandle {
L
Liu Jicong 已提交
131
  STqTopic              topic;
L
Liu Jicong 已提交
132
  struct STqListHandle* next;
L
Liu Jicong 已提交
133
} STqList;
L
Liu Jicong 已提交
134

L
Liu Jicong 已提交
135 136 137 138 139 140 141 142
typedef struct STqGroup {
  int64_t  cId;
  int64_t  cgId;
  void*    ahandle;
  int32_t  topicNum;
  STqList* head;
  SList*   topicList;  // SList<STqTopic>
} STqGroup;
L
Liu Jicong 已提交
143 144

typedef struct STqQueryExec {
L
Liu Jicong 已提交
145 146 147
  void*       src;
  STqMsgItem* dest;
  void*       executor;
L
Liu Jicong 已提交
148 149 150 151 152 153 154 155
} STqQueryExec;

typedef struct STqQueryMsg {
  STqQueryExec*       exec;
  struct STqQueryMsg* next;
} STqQueryMsg;

typedef struct STqLogReader {
L
Liu Jicong 已提交
156
  void* logHandle;
L
Liu Jicong 已提交
157 158 159 160
  int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
161
} STqLogReader;
L
Liu Jicong 已提交
162

H
refact  
Hongze Cheng 已提交
163
typedef struct STqCfg {
L
Liu Jicong 已提交
164
  // TODO
H
refact  
Hongze Cheng 已提交
165
} STqCfg;
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167 168 169 170
typedef struct STqMemRef {
  SMemAllocatorFactory* pAlloctorFactory;
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
171

L
Liu Jicong 已提交
172
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
173 174 175 176 177
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
178
} STqSerializedHead;
L
Liu Jicong 已提交
179

L
Liu Jicong 已提交
180 181 182
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
183 184 185 186 187

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
188
// key + offset + size
L
Liu Jicong 已提交
189
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
190
// 4096 / 24
L
Liu Jicong 已提交
191
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
192
// 24 * 170
L
Liu Jicong 已提交
193
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
194
// 4096 - 4080
L
Liu Jicong 已提交
195 196
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
197 198
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
199
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
200
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
201

L
Liu Jicong 已提交
202
#define TQ_SVER 0
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204 205 206
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
207 208

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
209
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
210

L
Liu Jicong 已提交
211
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
212

L
Liu Jicong 已提交
213
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
214 215

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
216 217

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
218

L
Liu Jicong 已提交
219
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
220 221 222 223 224
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
225
} STqMetaHandle;
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227 228 229 230 231 232 233
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
234
} STqMetaList;
L
Liu Jicong 已提交
235

L
Liu Jicong 已提交
236
typedef struct STqMetaStore {
L
Liu Jicong 已提交
237 238 239
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
240

L
Liu Jicong 已提交
241 242 243
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
244 245
  int idxFd;

L
Liu Jicong 已提交
246 247 248 249 250
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
251
} STqMetaStore;
L
Liu Jicong 已提交
252

L
Liu Jicong 已提交
253
typedef struct STQ {
L
Liu Jicong 已提交
254 255
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
256 257 258 259
  char*         path;
  STqCfg*       tqConfig;
  STqLogReader* tqLogReader;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
260
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
261
  STqExec*      tqExec;
L
Liu Jicong 已提交
262 263 264
} STQ;

// open in each vnode
L
Liu Jicong 已提交
265 266
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac,
            STqExec* tqExec);
L
Liu Jicong 已提交
267
void tqClose(STQ*);
L
Liu Jicong 已提交
268

L
Liu Jicong 已提交
269
// void* will be replace by a msg type
L
Liu Jicong 已提交
270
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
271
int tqCommit(STQ*);
L
Liu Jicong 已提交
272
int tqConsume(STQ*, STqConsumeReq*);
H
refact  
Hongze Cheng 已提交
273

L
Liu Jicong 已提交
274 275 276 277 278 279 280 281 282 283 284
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);

STqTopic* tqFindTopic(STqGroup*, int64_t topicId);

STqGroup* tqGetGroup(STQ*, int64_t clientId);

STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqRegisterContext(STqGroup*, void* ahandle);
int       tqSendLaunchQuery(STqMsgItem*, int64_t offset);
L
Liu Jicong 已提交
285

L
Liu Jicong 已提交
286
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
L
Liu Jicong 已提交
287

L
Liu Jicong 已提交
288
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
L
Liu Jicong 已提交
289

L
Liu Jicong 已提交
290
static int tqQueryExecuting(int32_t status) { return status; }
L
Liu Jicong 已提交
291

H
refact  
Hongze Cheng 已提交
292 293 294 295
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
296
#endif /*_TD_TQ_H_*/