tq.h 6.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "mallocator.h"
L
Liu Jicong 已提交
20
#include "os.h"
L
Liu Jicong 已提交
21
#include "tlist.h"
L
Liu Jicong 已提交
22
#include "tutil.h"
L
Liu Jicong 已提交
23

H
refact  
Hongze Cheng 已提交
24 25 26 27
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
28
typedef struct STqMsgHead {
L
Liu Jicong 已提交
29
  int32_t protoVer;
L
Liu Jicong 已提交
30
  int32_t msgType;
L
Liu Jicong 已提交
31
  int64_t cgId;
L
Liu Jicong 已提交
32
  int64_t clientId;
L
Liu Jicong 已提交
33
} STqMsgHead;
L
Liu Jicong 已提交
34

L
Liu Jicong 已提交
35
typedef struct STqOneAck {
L
Liu Jicong 已提交
36 37
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
38
} STqOneAck;
L
Liu Jicong 已提交
39

L
Liu Jicong 已提交
40
typedef struct STqAcks {
L
Liu Jicong 已提交
41
  int32_t ackNum;
L
Liu Jicong 已提交
42
  // should be sorted
L
Liu Jicong 已提交
43 44
  STqOneAck acks[];
} STqAcks;
L
Liu Jicong 已提交
45

L
Liu Jicong 已提交
46 47 48 49 50
typedef struct STqSetCurReq {
  STqMsgHead head;
  int64_t    topicId;
  int64_t    offset;
} STqSetCurReq;
L
Liu Jicong 已提交
51

L
Liu Jicong 已提交
52
typedef struct STqConsumeReq {
L
Liu Jicong 已提交
53 54
  STqMsgHead head;
  STqAcks    acks;
L
Liu Jicong 已提交
55
} STqConsumeReq;
L
Liu Jicong 已提交
56

L
Liu Jicong 已提交
57
typedef struct STqMsgContent {
L
Liu Jicong 已提交
58 59 60
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
61
} STqMsgContent;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63
typedef struct STqConsumeRsp {
L
Liu Jicong 已提交
64
  STqMsgHead    head;
L
Liu Jicong 已提交
65
  int64_t       bodySize;
L
Liu Jicong 已提交
66
  STqMsgContent msgs[];
L
Liu Jicong 已提交
67
} STqConsumeRsp;
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69 70
typedef struct STqSubscribeReq {
  STqMsgHead head;
L
Liu Jicong 已提交
71 72
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
73
} STqSubscribeReq;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75 76
typedef struct STqSubscribeRsp {
  STqMsgHead head;
L
Liu Jicong 已提交
77 78
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
79
} STqSubscribeRsp;
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
L
Liu Jicong 已提交
83

L
Liu Jicong 已提交
84 85
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87
typedef struct STqTopicVhandle {
L
Liu Jicong 已提交
88
  int64_t topicId;
L
Liu Jicong 已提交
89 90 91 92
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
93
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
94
} STqTopicVhandle;
L
Liu Jicong 已提交
95

L
Liu Jicong 已提交
96 97
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
98
typedef struct STqBufferItem {
L
Liu Jicong 已提交
99
  int64_t offset;
L
Liu Jicong 已提交
100
  // executors are identical but not concurrent
L
Liu Jicong 已提交
101
  // so there must be a copy in each item
L
Liu Jicong 已提交
102
  void*   executor;
L
Liu Jicong 已提交
103
  int32_t status;
L
Liu Jicong 已提交
104
  int64_t size;
L
Liu Jicong 已提交
105
  void*   content;
L
Liu Jicong 已提交
106
} STqMsgItem;
L
Liu Jicong 已提交
107

L
Liu Jicong 已提交
108
typedef struct STqTopic {
L
Liu Jicong 已提交
109 110 111
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
112 113 114 115 116 117 118
  int64_t    nextConsumeOffset;
  int64_t    floatingCursor;
  int64_t    topicId;
  int32_t    head;
  int32_t    tail;
  STqMsgItem buffer[TQ_BUFFER_SIZE];
} STqTopic;
L
Liu Jicong 已提交
119 120

typedef struct STqListHandle {
L
Liu Jicong 已提交
121
  STqTopic              topic;
L
Liu Jicong 已提交
122
  struct STqListHandle* next;
L
Liu Jicong 已提交
123
} STqList;
L
Liu Jicong 已提交
124

L
Liu Jicong 已提交
125 126 127 128 129 130 131 132
typedef struct STqGroup {
  int64_t  cId;
  int64_t  cgId;
  void*    ahandle;
  int32_t  topicNum;
  STqList* head;
  SList*   topicList;  // SList<STqTopic>
} STqGroup;
L
Liu Jicong 已提交
133 134

typedef struct STqQueryExec {
L
Liu Jicong 已提交
135 136 137
  void*       src;
  STqMsgItem* dest;
  void*       executor;
L
Liu Jicong 已提交
138 139 140 141 142 143 144 145
} STqQueryExec;

typedef struct STqQueryMsg {
  STqQueryExec*       exec;
  struct STqQueryMsg* next;
} STqQueryMsg;

typedef struct STqLogReader {
L
Liu Jicong 已提交
146
  void* logHandle;
L
Liu Jicong 已提交
147 148 149 150
  int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
151
} STqLogReader;
L
Liu Jicong 已提交
152

H
refact  
Hongze Cheng 已提交
153
typedef struct STqCfg {
L
Liu Jicong 已提交
154
  // TODO
H
refact  
Hongze Cheng 已提交
155
} STqCfg;
L
Liu Jicong 已提交
156

L
Liu Jicong 已提交
157 158 159 160
typedef struct STqMemRef {
  SMemAllocatorFactory* pAlloctorFactory;
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
163 164 165 166 167
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
168
} STqSerializedHead;
L
Liu Jicong 已提交
169

L
Liu Jicong 已提交
170 171 172
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
173 174 175 176 177

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
178
// key + offset + size
L
Liu Jicong 已提交
179
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
180
// 4096 / 24
L
Liu Jicong 已提交
181
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
182
// 24 * 170
L
Liu Jicong 已提交
183
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
184
// 4096 - 4080
L
Liu Jicong 已提交
185 186
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
187 188
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
189
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
190
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
191

L
Liu Jicong 已提交
192
#define TQ_SVER 0
L
Liu Jicong 已提交
193

L
Liu Jicong 已提交
194 195 196
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
197 198

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
199
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
202

L
Liu Jicong 已提交
203
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
204 205

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
206 207

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
208

L
Liu Jicong 已提交
209
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
210 211 212 213 214
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
215
} STqMetaHandle;
L
Liu Jicong 已提交
216

L
Liu Jicong 已提交
217 218 219 220 221 222 223
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
224
} STqMetaList;
L
Liu Jicong 已提交
225

L
Liu Jicong 已提交
226
typedef struct STqMetaStore {
L
Liu Jicong 已提交
227 228 229
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
230

L
Liu Jicong 已提交
231 232 233
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
234 235
  int idxFd;

L
Liu Jicong 已提交
236 237 238 239 240
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
241
} STqMetaStore;
L
Liu Jicong 已提交
242

L
Liu Jicong 已提交
243
typedef struct STQ {
L
Liu Jicong 已提交
244 245
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
246 247 248 249
  char*         path;
  STqCfg*       tqConfig;
  STqLogReader* tqLogReader;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
250
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
251 252 253
} STQ;

// open in each vnode
L
Liu Jicong 已提交
254
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
255
void tqClose(STQ*);
L
Liu Jicong 已提交
256

L
Liu Jicong 已提交
257
// void* will be replace by a msg type
L
Liu Jicong 已提交
258
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
259
int tqCommit(STQ*);
L
Liu Jicong 已提交
260
int tqConsume(STQ*, STqConsumeReq*);
H
refact  
Hongze Cheng 已提交
261

L
Liu Jicong 已提交
262 263 264 265 266 267 268 269 270 271 272
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);

STqTopic* tqFindTopic(STqGroup*, int64_t topicId);

STqGroup* tqGetGroup(STQ*, int64_t clientId);

STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqRegisterContext(STqGroup*, void* ahandle);
int       tqSendLaunchQuery(STqMsgItem*, int64_t offset);
L
Liu Jicong 已提交
273

L
Liu Jicong 已提交
274
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
L
Liu Jicong 已提交
275

L
Liu Jicong 已提交
276
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
L
Liu Jicong 已提交
277

L
Liu Jicong 已提交
278
static int tqQueryExecuting(int32_t status) { return status; }
L
Liu Jicong 已提交
279

H
refact  
Hongze Cheng 已提交
280 281 282 283
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
284
#endif /*_TD_TQ_H_*/