tq.h 6.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "os.h"
L
Liu Jicong 已提交
20
#include "tutil.h"
L
Liu Jicong 已提交
21
#include "mallocator.h"
L
Liu Jicong 已提交
22

H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27
typedef struct TmqMsgHead {
L
Liu Jicong 已提交
28
  int32_t protoVer;
L
Liu Jicong 已提交
29
  int32_t msgType;
L
Liu Jicong 已提交
30
  int64_t cgId;
L
Liu Jicong 已提交
31
  int64_t clientId;
L
Liu Jicong 已提交
32
} TmqMsgHead;
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
typedef struct TmqOneAck {
L
Liu Jicong 已提交
35 36
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
37
} TmqOneAck;
L
Liu Jicong 已提交
38

L
Liu Jicong 已提交
39
typedef struct TmqAcks {
L
Liu Jicong 已提交
40
  int32_t ackNum;
L
Liu Jicong 已提交
41
  // should be sorted
L
Liu Jicong 已提交
42 43
  TmqOneAck acks[];
} TmqAcks;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
// TODO: put msgs into common
L
Liu Jicong 已提交
46 47
typedef struct TmqConnectReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
48
  TmqAcks    acks;
L
Liu Jicong 已提交
49
} TmqConnectReq;
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52
typedef struct TmqConnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
53
  int8_t     status;
L
Liu Jicong 已提交
54
} TmqConnectRsp;
L
Liu Jicong 已提交
55

L
Liu Jicong 已提交
56 57 58
typedef struct TmqDisconnectReq {
  TmqMsgHead head;
} TmqDiscconectReq;
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60 61
typedef struct TmqDisconnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
62
  int8_t     status;
L
Liu Jicong 已提交
63
} TmqDisconnectRsp;
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
typedef struct TmqConsumeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
67
  TmqAcks    acks;
L
Liu Jicong 已提交
68
} TmqConsumeReq;
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70
typedef struct TmqMsgContent {
L
Liu Jicong 已提交
71 72 73
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
74
} TmqMsgContent;
L
Liu Jicong 已提交
75

L
Liu Jicong 已提交
76 77
typedef struct TmqConsumeRsp {
  TmqMsgHead    head;
L
Liu Jicong 已提交
78
  int64_t       bodySize;
L
Liu Jicong 已提交
79 80
  TmqMsgContent msgs[];
} TmqConsumeRsp;
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83
typedef struct TmqSubscribeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
84 85
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
86
} TmqSubscribeReq;
L
Liu Jicong 已提交
87

L
Liu Jicong 已提交
88 89
typedef struct tmqSubscribeRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
90 91
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
92
} TmqSubscribeRsp;
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94 95
typedef struct TmqHeartbeatReq {
} TmqHeartbeatReq;
L
Liu Jicong 已提交
96

L
Liu Jicong 已提交
97 98
typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
typedef struct TqTopicVhandle {
L
Liu Jicong 已提交
101
  int64_t topicId;
L
Liu Jicong 已提交
102 103 104 105
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
106
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
107
} TqTopicVhandle;
L
Liu Jicong 已提交
108 109


L
Liu Jicong 已提交
110 111
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
112
typedef struct TqBufferItem {
L
Liu Jicong 已提交
113
  int64_t offset;
L
Liu Jicong 已提交
114
  // executors are identical but not concurrent
L
Liu Jicong 已提交
115
  // so there must be a copy in each item
L
Liu Jicong 已提交
116
  void*   executor;
L
Liu Jicong 已提交
117
  int64_t size;
L
Liu Jicong 已提交
118
  void*   content;
L
Liu Jicong 已提交
119
} TqBufferItem;
L
Liu Jicong 已提交
120

L
Liu Jicong 已提交
121
typedef struct TqBufferHandle {
L
Liu Jicong 已提交
122 123 124 125 126 127 128
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
  int64_t      nextConsumeOffset;
  int64_t      topicId;
  int32_t      head;
  int32_t      tail;
L
Liu Jicong 已提交
129 130
  TqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle;
L
Liu Jicong 已提交
131

L
Liu Jicong 已提交
132
typedef struct TqListHandle {
L
Liu Jicong 已提交
133
  TqBufferHandle       bufHandle;
L
Liu Jicong 已提交
134 135
  struct TqListHandle* next;
} TqListHandle;
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
typedef struct TqGroupHandle {
L
Liu Jicong 已提交
138 139 140 141 142
  int64_t       cId;
  int64_t       cgId;
  void*         ahandle;
  int32_t       topicNum;
  TqListHandle* head;
L
Liu Jicong 已提交
143
} TqGroupHandle;
L
Liu Jicong 已提交
144

L
Liu Jicong 已提交
145
typedef struct TqQueryExec {
L
Liu Jicong 已提交
146
  void*         src;
L
Liu Jicong 已提交
147
  TqBufferItem* dest;
L
Liu Jicong 已提交
148
  void*         executor;
L
Liu Jicong 已提交
149
} TqQueryExec;
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
typedef struct TqQueryMsg {
L
Liu Jicong 已提交
152 153
  TqQueryExec*       exec;
  struct TqQueryMsg* next;
L
Liu Jicong 已提交
154
} TqQueryMsg;
L
Liu Jicong 已提交
155

L
Liu Jicong 已提交
156 157
typedef struct TqLogReader {
  void* logHandle;
L
Liu Jicong 已提交
158 159 160 161
  int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*logGetFirstVer)(void* logHandle);
  int64_t (*logGetSnapshotVer)(void* logHandle);
  int64_t (*logGetLastVer)(void* logHandle);
L
Liu Jicong 已提交
162 163
} TqLogReader;

H
refact  
Hongze Cheng 已提交
164
typedef struct STqCfg {
L
Liu Jicong 已提交
165
  // TODO
H
refact  
Hongze Cheng 已提交
166
} STqCfg;
L
Liu Jicong 已提交
167

L
Liu Jicong 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
typedef struct TqMemRef {
  SMemAllocatorFactory *pAlloctorFactory;
  SMemAllocator *pAllocator;
} TqMemRef;

typedef struct TqSerializedHead {
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
} TqSerializedHead;

typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead);
typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj);
typedef void (*TqDeleteFun)(void*);

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_SIZE 24
//4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
//24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
//4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16

#define TQ_ACTION_CONST      0
#define TQ_ACTION_INUSE      1
#define TQ_ACTION_INUSE_CONT 2
#define TQ_ACTION_INTXN      3

#define TQ_SVER              0

//TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE    0
#define TQ_UPDATE_APPEND     1

#define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT  2

static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
  return tqConfigFlag & TQ_UPDATE_APPEND;
}

static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
  return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN  (void*)&TQ_CONST_DELETE

typedef struct TqMetaHandle {
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
229
} STqMetaHandle;
L
Liu Jicong 已提交
230 231

typedef struct TqMetaList {
L
Liu Jicong 已提交
232
  STqMetaHandle handle;
L
Liu Jicong 已提交
233 234 235 236 237
  struct TqMetaList* next;
  //struct TqMetaList* inTxnPrev;
  //struct TqMetaList* inTxnNext;
  struct TqMetaList* unpersistPrev;
  struct TqMetaList* unpersistNext;
L
Liu Jicong 已提交
238
} STqMetaList;
L
Liu Jicong 已提交
239 240

typedef struct TqMetaStore {
L
Liu Jicong 已提交
241
  STqMetaList*      bucket[TQ_BUCKET_SIZE];
L
Liu Jicong 已提交
242
  //a table head
L
Liu Jicong 已提交
243
  STqMetaList*      unpersistHead;
L
Liu Jicong 已提交
244 245 246 247 248 249 250 251 252
 //TODO:temporaral use, to be replaced by unified tfile
  int              fileFd;
  //TODO:temporaral use, to be replaced by unified tfile
  int              idxFd;
  char*            dirPath;
  int32_t          tqConfigFlag;
  TqSerializeFun   pSerializer;
  TqDeserializeFun pDeserializer;
  TqDeleteFun      pDeleter;
L
Liu Jicong 已提交
253
} STqMetaStore;
L
Liu Jicong 已提交
254

L
Liu Jicong 已提交
255 256 257
typedef struct STQ {
  // the collection of group handle
  // the handle of kvstore
L
Liu Jicong 已提交
258
  char*  path;
L
Liu Jicong 已提交
259
  STqCfg*      tqConfig;
L
Liu Jicong 已提交
260
  TqLogReader* tqLogReader; 
L
Liu Jicong 已提交
261
  TqMemRef     tqMemRef;
L
Liu Jicong 已提交
262
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
263 264 265
} STQ;

// open in each vnode
H
refact  
Hongze Cheng 已提交
266
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
L
Liu Jicong 已提交
267
void tqDestroy(STQ*);
L
Liu Jicong 已提交
268

L
Liu Jicong 已提交
269
// void* will be replace by a msg type
L
Liu Jicong 已提交
270
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
271 272
int tqCommit(STQ*);

L
Liu Jicong 已提交
273
int tqConsume(STQ*, TmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
274

L
Liu Jicong 已提交
275
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
276

L
Liu Jicong 已提交
277
TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
L
Liu Jicong 已提交
278
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
L
Liu Jicong 已提交
279
int tqMoveOffsetToNext(TqGroupHandle*);
L
Liu Jicong 已提交
280
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
L
Liu Jicong 已提交
281 282 283
int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*);
L
Liu Jicong 已提交
284

L
Liu Jicong 已提交
285
int   tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead);
L
Liu Jicong 已提交
286

L
Liu Jicong 已提交
287
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle);
L
Liu Jicong 已提交
288

H
refact  
Hongze Cheng 已提交
289 290 291 292
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
293
#endif /*_TD_TQ_H_*/