tq.h 7.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "common.h"
L
Liu Jicong 已提交
20
#include "executor.h"
L
Liu Jicong 已提交
21
#include "mallocator.h"
L
Liu Jicong 已提交
22
#include "meta.h"
L
Liu Jicong 已提交
23
#include "os.h"
L
Liu Jicong 已提交
24
#include "scheduler.h"
L
Liu Jicong 已提交
25
#include "taoserror.h"
L
Liu Jicong 已提交
26
#include "tlist.h"
L
Liu Jicong 已提交
27
#include "tmsg.h"
L
Liu Jicong 已提交
28
#include "trpc.h"
L
Liu Jicong 已提交
29
#include "ttimer.h"
L
Liu Jicong 已提交
30
#include "tutil.h"
L
Liu Jicong 已提交
31
#include "vnode.h"
L
Liu Jicong 已提交
32
#include "wal.h"
L
Liu Jicong 已提交
33

H
refact  
Hongze Cheng 已提交
34 35 36 37
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
38
typedef struct STqMsgHead {
L
Liu Jicong 已提交
39
  int32_t protoVer;
L
Liu Jicong 已提交
40
  int32_t msgType;
L
Liu Jicong 已提交
41
  int64_t cgId;
L
Liu Jicong 已提交
42
  int64_t clientId;
L
Liu Jicong 已提交
43
} STqMsgHead;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
typedef struct STqOneAck {
L
Liu Jicong 已提交
46 47
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
48
} STqOneAck;
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50
typedef struct STqAcks {
L
Liu Jicong 已提交
51
  int32_t ackNum;
L
Liu Jicong 已提交
52
  // should be sorted
L
Liu Jicong 已提交
53 54
  STqOneAck acks[];
} STqAcks;
L
Liu Jicong 已提交
55

L
Liu Jicong 已提交
56 57 58 59 60
typedef struct STqSetCurReq {
  STqMsgHead head;
  int64_t    topicId;
  int64_t    offset;
} STqSetCurReq;
L
Liu Jicong 已提交
61

L
Liu Jicong 已提交
62
typedef struct STqConsumeReq {
L
Liu Jicong 已提交
63
  STqMsgHead head;
L
Liu Jicong 已提交
64
  int64_t    blockingTime;  // milisec
L
Liu Jicong 已提交
65
  STqAcks    acks;
L
Liu Jicong 已提交
66
} STqConsumeReq;
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68
typedef struct STqMsgContent {
L
Liu Jicong 已提交
69 70 71
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
72
} STqMsgContent;
L
Liu Jicong 已提交
73

L
Liu Jicong 已提交
74
typedef struct STqConsumeRsp {
L
Liu Jicong 已提交
75
  STqMsgHead    head;
L
Liu Jicong 已提交
76
  int64_t       bodySize;
L
Liu Jicong 已提交
77
  STqMsgContent msgs[];
L
Liu Jicong 已提交
78
} STqConsumeRsp;
L
Liu Jicong 已提交
79

L
Liu Jicong 已提交
80 81
typedef struct STqSubscribeReq {
  STqMsgHead head;
L
Liu Jicong 已提交
82 83
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
84
} STqSubscribeReq;
L
Liu Jicong 已提交
85

L
Liu Jicong 已提交
86 87
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
L
Liu Jicong 已提交
88

L
Liu Jicong 已提交
89 90
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
L
Liu Jicong 已提交
91

L
Liu Jicong 已提交
92 93
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
94 95
typedef struct STqExec {
  void* runtimeEnv;
L
Liu Jicong 已提交
96
  SSDataBlock* (*exec)(void* runtimeEnv);
L
Liu Jicong 已提交
97
  void* (*assign)(void* runtimeEnv, void* inputData);
L
Liu Jicong 已提交
98
  void (*clear)(void* runtimeEnv);
L
Liu Jicong 已提交
99 100 101 102
  char* (*serialize)(struct STqExec*);
  struct STqExec* (*deserialize)(char*);
} STqExec;

L
Liu Jicong 已提交
103 104 105 106 107
typedef struct STqRspHandle {
  void* handle;
  void* ahandle;
} STqRspHandle;

L
Liu Jicong 已提交
108 109 110
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;

typedef struct STqTopic STqTopic;
L
Liu Jicong 已提交
111

L
Liu Jicong 已提交
112
typedef struct STqBufferItem {
L
Liu Jicong 已提交
113
  int64_t offset;
L
Liu Jicong 已提交
114
  // executors are identical but not concurrent
L
Liu Jicong 已提交
115
  // so there must be a copy in each item
L
Liu Jicong 已提交
116 117 118 119 120
  STqExec*  executor;
  int32_t   status;
  int64_t   size;
  void*     content;
  STqTopic* pTopic;
L
Liu Jicong 已提交
121
} STqMsgItem;
L
Liu Jicong 已提交
122

L
Liu Jicong 已提交
123
struct STqTopic {
L
Liu Jicong 已提交
124 125 126
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
L
Liu Jicong 已提交
127 128
  // int32_t    head;
  // int32_t    tail;
L
Liu Jicong 已提交
129 130 131
  int64_t    nextConsumeOffset;
  int64_t    floatingCursor;
  int64_t    topicId;
L
Liu Jicong 已提交
132
  void*      logReader;
L
Liu Jicong 已提交
133
  STqMsgItem buffer[TQ_BUFFER_SIZE];
L
Liu Jicong 已提交
134
};
L
Liu Jicong 已提交
135 136

typedef struct STqListHandle {
L
Liu Jicong 已提交
137
  STqTopic              topic;
L
Liu Jicong 已提交
138
  struct STqListHandle* next;
L
Liu Jicong 已提交
139
} STqList;
L
Liu Jicong 已提交
140

L
Liu Jicong 已提交
141
typedef struct STqGroup {
L
Liu Jicong 已提交
142 143 144 145
  int64_t      clientId;
  int64_t      cgId;
  void*        ahandle;
  int32_t      topicNum;
L
Liu Jicong 已提交
146
  STqList*     head;
L
Liu Jicong 已提交
147 148
  SList*       topicList;  // SList<STqTopic>
  STqRspHandle rspHandle;
L
Liu Jicong 已提交
149
} STqGroup;
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
typedef struct STqTaskItem {
L
Liu Jicong 已提交
152 153 154 155 156 157
  int8_t         status;
  int64_t        offset;
  void*          dst;
  qTaskInfo_t    task;
  STqReadHandle* pReadHandle;
  SSubQueryMsg*  pQueryMsg;
L
Liu Jicong 已提交
158 159 160 161 162 163 164 165 166
} STqTaskItem;

// new version
typedef struct STqBuffer {
  int64_t     firstOffset;
  int64_t     lastOffset;
  STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer;

L
Liu Jicong 已提交
167 168 169 170 171 172 173 174
typedef struct STqTopicHandle {
  char            topicName[TSDB_TOPIC_FNAME_LEN];
  char*           sql;
  char*           logicalPlan;
  char*           physicalPlan;
  int64_t         committedOffset;
  int64_t         currentOffset;
  STqBuffer       buffer;
L
Liu Jicong 已提交
175
  SWalReadHandle* pReadhandle;
L
Liu Jicong 已提交
176 177 178 179
} STqTopicHandle;

typedef struct STqConsumerHandle {
  int64_t consumerId;
L
Liu Jicong 已提交
180
  int64_t epoch;
L
Liu Jicong 已提交
181
  char    cgroup[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
182 183
  SArray* topics;  // SArray<STqClientTopic>
} STqConsumerHandle;
L
Liu Jicong 已提交
184

L
Liu Jicong 已提交
185
typedef struct STqQueryMsg {
L
Liu Jicong 已提交
186
  STqMsgItem*         item;
L
Liu Jicong 已提交
187 188 189 190
  struct STqQueryMsg* next;
} STqQueryMsg;

typedef struct STqMemRef {
L
Liu Jicong 已提交
191
  SMemAllocatorFactory* pAllocatorFactory;
L
Liu Jicong 已提交
192 193
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
194

L
Liu Jicong 已提交
195
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
196 197 198 199 200
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
201
} STqSerializedHead;
L
Liu Jicong 已提交
202

L
Liu Jicong 已提交
203 204 205
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
206 207 208 209 210

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
211
// key + offset + size
L
Liu Jicong 已提交
212
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
213
// 4096 / 24
L
Liu Jicong 已提交
214
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
215
// 24 * 170
L
Liu Jicong 已提交
216
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
217
// 4096 - 4080
L
Liu Jicong 已提交
218 219
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
220 221
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
222
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
223
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
224

L
Liu Jicong 已提交
225
#define TQ_SVER 0
L
Liu Jicong 已提交
226

L
Liu Jicong 已提交
227 228 229
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
230 231

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
232
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
233

L
Liu Jicong 已提交
234
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
235

L
Liu Jicong 已提交
236
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
237 238

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
239 240

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
241

L
Liu Jicong 已提交
242
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
243 244 245 246 247
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
248
} STqMetaHandle;
L
Liu Jicong 已提交
249

L
Liu Jicong 已提交
250 251 252 253 254 255 256
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
257
} STqMetaList;
L
Liu Jicong 已提交
258

L
Liu Jicong 已提交
259
typedef struct STqMetaStore {
L
Liu Jicong 已提交
260 261 262
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
263
  // topics that are not connectted
L
Liu Jicong 已提交
264
  STqMetaList* unconnectTopic;
L
Liu Jicong 已提交
265

L
Liu Jicong 已提交
266 267 268
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
269 270
  int idxFd;

L
Liu Jicong 已提交
271 272 273 274 275
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
276
} STqMetaStore;
L
Liu Jicong 已提交
277

L
Liu Jicong 已提交
278
typedef struct STQ {
L
Liu Jicong 已提交
279 280
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
281 282 283
  char*         path;
  STqCfg*       tqConfig;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
284
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
285
  SWal*         pWal;
L
Liu Jicong 已提交
286
  SMeta*        pMeta;
L
Liu Jicong 已提交
287 288
} STQ;

L
Liu Jicong 已提交
289 290 291 292 293 294 295 296 297 298 299
typedef struct STqMgmt {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

static STqMgmt tqMgmt;

// init once
int  tqInit();
void tqCleanUp();

L
Liu Jicong 已提交
300
// open in each vnode
L
Liu Jicong 已提交
301
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
302
void tqClose(STQ*);
L
Liu Jicong 已提交
303

L
Liu Jicong 已提交
304
// void* will be replace by a msg type
L
Liu Jicong 已提交
305
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
306
int tqCommit(STQ*);
H
refact  
Hongze Cheng 已提交
307

S
Shengliang Guan 已提交
308 309
int tqSetCursor(STQ*, STqSetCurReq* pMsg);

L
Liu Jicong 已提交
310 311
#if 0
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
L
Liu Jicong 已提交
312 313 314 315 316 317 318 319
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset);
STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
STqGroup* tqGetGroup(STQ*, int64_t clientId);
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int       tqRegisterContext(STqGroup*, void* ahandle);
int       tqSendLaunchQuery(STqMsgItem*, int64_t offset);
L
Liu Jicong 已提交
320
#endif
L
Liu Jicong 已提交
321

S
Shengliang 已提交
322
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
323
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
L
Liu Jicong 已提交
324

H
refact  
Hongze Cheng 已提交
325 326 327 328
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
329
#endif /*_TD_TQ_H_*/