tq.h 6.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "common.h"
L
Liu Jicong 已提交
20
#include "mallocator.h"
L
Liu Jicong 已提交
21
#include "os.h"
L
Liu Jicong 已提交
22 23
#include "taoserror.h"
#include "taosmsg.h"
L
Liu Jicong 已提交
24
#include "tlist.h"
L
Liu Jicong 已提交
25
#include "tutil.h"
L
Liu Jicong 已提交
26

H
refact  
Hongze Cheng 已提交
27 28 29 30
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
31
typedef struct STqMsgHead {
L
Liu Jicong 已提交
32
  int32_t protoVer;
L
Liu Jicong 已提交
33
  int32_t msgType;
L
Liu Jicong 已提交
34
  int64_t cgId;
L
Liu Jicong 已提交
35
  int64_t clientId;
L
Liu Jicong 已提交
36
} STqMsgHead;
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38
typedef struct STqOneAck {
L
Liu Jicong 已提交
39 40
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
41
} STqOneAck;
L
Liu Jicong 已提交
42

L
Liu Jicong 已提交
43
typedef struct STqAcks {
L
Liu Jicong 已提交
44
  int32_t ackNum;
L
Liu Jicong 已提交
45
  // should be sorted
L
Liu Jicong 已提交
46 47
  STqOneAck acks[];
} STqAcks;
L
Liu Jicong 已提交
48

L
Liu Jicong 已提交
49 50 51 52 53
typedef struct STqSetCurReq {
  STqMsgHead head;
  int64_t    topicId;
  int64_t    offset;
} STqSetCurReq;
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55
typedef struct STqConsumeReq {
L
Liu Jicong 已提交
56 57
  STqMsgHead head;
  STqAcks    acks;
L
Liu Jicong 已提交
58
} STqConsumeReq;
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60
typedef struct STqMsgContent {
L
Liu Jicong 已提交
61 62 63
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
64
} STqMsgContent;
L
Liu Jicong 已提交
65

L
Liu Jicong 已提交
66
typedef struct STqConsumeRsp {
L
Liu Jicong 已提交
67
  STqMsgHead    head;
L
Liu Jicong 已提交
68
  int64_t       bodySize;
L
Liu Jicong 已提交
69
  STqMsgContent msgs[];
L
Liu Jicong 已提交
70
} STqConsumeRsp;
L
Liu Jicong 已提交
71

L
Liu Jicong 已提交
72 73
typedef struct STqSubscribeReq {
  STqMsgHead head;
L
Liu Jicong 已提交
74 75
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
76
} STqSubscribeReq;
L
Liu Jicong 已提交
77

L
Liu Jicong 已提交
78 79
typedef struct STqSubscribeRsp {
  STqMsgHead head;
L
Liu Jicong 已提交
80 81
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
82
} STqSubscribeRsp;
L
Liu Jicong 已提交
83

L
Liu Jicong 已提交
84 85
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87 88
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90
typedef struct STqTopicVhandle {
L
Liu Jicong 已提交
91
  int64_t topicId;
L
Liu Jicong 已提交
92 93 94 95
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
96
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
97
} STqTopicVhandle;
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99 100
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
101 102
typedef struct STqExec {
  void* runtimeEnv;
L
Liu Jicong 已提交
103 104 105
  SSDataBlock* (*exec)(void* runtimeEnv);
  void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData);
  void (*clear)(void* runtimeEnv);
L
Liu Jicong 已提交
106 107 108 109
  char* (*serialize)(struct STqExec*);
  struct STqExec* (*deserialize)(char*);
} STqExec;

L
Liu Jicong 已提交
110
typedef struct STqBufferItem {
L
Liu Jicong 已提交
111
  int64_t offset;
L
Liu Jicong 已提交
112
  // executors are identical but not concurrent
L
Liu Jicong 已提交
113
  // so there must be a copy in each item
L
Liu Jicong 已提交
114 115 116 117
  STqExec* executor;
  int32_t  status;
  int64_t  size;
  void*    content;
L
Liu Jicong 已提交
118
} STqMsgItem;
L
Liu Jicong 已提交
119

L
Liu Jicong 已提交
120
typedef struct STqTopic {
L
Liu Jicong 已提交
121 122 123
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
124 125 126 127 128 129 130
  int64_t    nextConsumeOffset;
  int64_t    floatingCursor;
  int64_t    topicId;
  int32_t    head;
  int32_t    tail;
  STqMsgItem buffer[TQ_BUFFER_SIZE];
} STqTopic;
L
Liu Jicong 已提交
131 132

typedef struct STqListHandle {
L
Liu Jicong 已提交
133
  STqTopic              topic;
L
Liu Jicong 已提交
134
  struct STqListHandle* next;
L
Liu Jicong 已提交
135
} STqList;
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
typedef struct STqGroup {
L
Liu Jicong 已提交
138
  int64_t  clientId;
L
Liu Jicong 已提交
139 140 141 142 143
  int64_t  cgId;
  void*    ahandle;
  int32_t  topicNum;
  STqList* head;
  SList*   topicList;  // SList<STqTopic>
L
Liu Jicong 已提交
144
  void*    returnMsg;  // SVReadMsg
L
Liu Jicong 已提交
145
} STqGroup;
L
Liu Jicong 已提交
146 147

typedef struct STqQueryMsg {
L
Liu Jicong 已提交
148
  STqMsgItem*         item;
L
Liu Jicong 已提交
149 150 151 152
  struct STqQueryMsg* next;
} STqQueryMsg;

typedef struct STqLogReader {
L
Liu Jicong 已提交
153
  void* logHandle;
L
Liu Jicong 已提交
154 155 156 157
  int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
158
} STqLogReader;
L
Liu Jicong 已提交
159

H
refact  
Hongze Cheng 已提交
160
typedef struct STqCfg {
L
Liu Jicong 已提交
161
  // TODO
H
refact  
Hongze Cheng 已提交
162
} STqCfg;
L
Liu Jicong 已提交
163

L
Liu Jicong 已提交
164 165 166 167
typedef struct STqMemRef {
  SMemAllocatorFactory* pAlloctorFactory;
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
170 171 172 173 174
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
175
} STqSerializedHead;
L
Liu Jicong 已提交
176

L
Liu Jicong 已提交
177 178 179
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
180 181 182 183 184

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
185
// key + offset + size
L
Liu Jicong 已提交
186
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
187
// 4096 / 24
L
Liu Jicong 已提交
188
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
189
// 24 * 170
L
Liu Jicong 已提交
190
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
191
// 4096 - 4080
L
Liu Jicong 已提交
192 193
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
194 195
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
196
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
197
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
198

L
Liu Jicong 已提交
199
#define TQ_SVER 0
L
Liu Jicong 已提交
200

L
Liu Jicong 已提交
201 202 203
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
204 205

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
206
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
207

L
Liu Jicong 已提交
208
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
209

L
Liu Jicong 已提交
210
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
211 212

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
213 214

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
215

L
Liu Jicong 已提交
216
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
217 218 219 220 221
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
222
} STqMetaHandle;
L
Liu Jicong 已提交
223

L
Liu Jicong 已提交
224 225 226 227 228 229 230
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
231
} STqMetaList;
L
Liu Jicong 已提交
232

L
Liu Jicong 已提交
233
typedef struct STqMetaStore {
L
Liu Jicong 已提交
234 235 236
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
237

L
Liu Jicong 已提交
238 239 240
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
241 242
  int idxFd;

L
Liu Jicong 已提交
243 244 245 246 247
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
248
} STqMetaStore;
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250
typedef struct STQ {
L
Liu Jicong 已提交
251 252
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
253 254 255 256
  char*         path;
  STqCfg*       tqConfig;
  STqLogReader* tqLogReader;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
257
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
258 259 260
} STQ;

// open in each vnode
L
Liu Jicong 已提交
261
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
262
void tqClose(STQ*);
L
Liu Jicong 已提交
263

L
Liu Jicong 已提交
264
// void* will be replace by a msg type
L
Liu Jicong 已提交
265
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
266
int tqCommit(STQ*);
L
Liu Jicong 已提交
267
int tqConsume(STQ*, STqConsumeReq*);
H
refact  
Hongze Cheng 已提交
268

L
Liu Jicong 已提交
269 270 271 272 273 274 275 276 277 278 279
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);

STqTopic* tqFindTopic(STqGroup*, int64_t topicId);

STqGroup* tqGetGroup(STQ*, int64_t clientId);

STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqRegisterContext(STqGroup*, void* ahandle);
int       tqSendLaunchQuery(STqMsgItem*, int64_t offset);
L
Liu Jicong 已提交
280

L
Liu Jicong 已提交
281
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
L
Liu Jicong 已提交
282

L
Liu Jicong 已提交
283
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
L
Liu Jicong 已提交
284

L
Liu Jicong 已提交
285
static int tqQueryExecuting(int32_t status) { return status; }
L
Liu Jicong 已提交
286

H
refact  
Hongze Cheng 已提交
287 288 289 290
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
291
#endif /*_TD_TQ_H_*/