tq.h 5.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "os.h"
L
Liu Jicong 已提交
20
#include "tutil.h"
L
Liu Jicong 已提交
21
#include "mallocator.h"
L
Liu Jicong 已提交
22

H
refact  
Hongze Cheng 已提交
23 24 25 26
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
27
typedef struct TmqMsgHead {
L
Liu Jicong 已提交
28
  int32_t protoVer;
L
Liu Jicong 已提交
29
  int32_t msgType;
L
Liu Jicong 已提交
30
  int64_t cgId;
L
Liu Jicong 已提交
31
  int64_t clientId;
L
Liu Jicong 已提交
32
} TmqMsgHead;
L
Liu Jicong 已提交
33

L
Liu Jicong 已提交
34
typedef struct TmqOneAck {
L
Liu Jicong 已提交
35 36
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
37
} TmqOneAck;
L
Liu Jicong 已提交
38

L
Liu Jicong 已提交
39
typedef struct TmqAcks {
L
Liu Jicong 已提交
40
  int32_t ackNum;
L
Liu Jicong 已提交
41
  // should be sorted
L
Liu Jicong 已提交
42 43
  TmqOneAck acks[];
} TmqAcks;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
// TODO: put msgs into common
L
Liu Jicong 已提交
46 47
typedef struct TmqConnectReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
48
  TmqAcks    acks;
L
Liu Jicong 已提交
49
} TmqConnectReq;
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51 52
typedef struct TmqConnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
53
  int8_t     status;
L
Liu Jicong 已提交
54
} TmqConnectRsp;
L
Liu Jicong 已提交
55

L
Liu Jicong 已提交
56 57 58
typedef struct TmqDisconnectReq {
  TmqMsgHead head;
} TmqDiscconectReq;
L
Liu Jicong 已提交
59

L
Liu Jicong 已提交
60 61
typedef struct TmqDisconnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
62
  int8_t     status;
L
Liu Jicong 已提交
63
} TmqDisconnectRsp;
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
typedef struct TmqConsumeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
67
  TmqAcks    acks;
L
Liu Jicong 已提交
68
} TmqConsumeReq;
L
Liu Jicong 已提交
69

L
Liu Jicong 已提交
70
typedef struct TmqMsgContent {
L
Liu Jicong 已提交
71 72 73
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
74
} TmqMsgContent;
L
Liu Jicong 已提交
75

L
Liu Jicong 已提交
76 77
typedef struct TmqConsumeRsp {
  TmqMsgHead    head;
L
Liu Jicong 已提交
78
  int64_t       bodySize;
L
Liu Jicong 已提交
79 80
  TmqMsgContent msgs[];
} TmqConsumeRsp;
L
Liu Jicong 已提交
81

L
Liu Jicong 已提交
82 83
typedef struct TmqSubscribeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
84 85
  int32_t    topicNum;
  int64_t    topic[];
L
Liu Jicong 已提交
86
} TmqSubscribeReq;
L
Liu Jicong 已提交
87

L
Liu Jicong 已提交
88 89
typedef struct tmqSubscribeRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
90 91
  int64_t    vgId;
  char       ep[TSDB_EP_LEN];  // TSDB_EP_LEN
L
Liu Jicong 已提交
92
} TmqSubscribeRsp;
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94 95
typedef struct TmqHeartbeatReq {
} TmqHeartbeatReq;
L
Liu Jicong 已提交
96

L
Liu Jicong 已提交
97 98
typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
typedef struct TqTopicVhandle {
L
Liu Jicong 已提交
101
  int64_t topicId;
L
Liu Jicong 已提交
102 103 104 105
  // executor for filter
  void* filterExec;
  // callback for mnode
  // trigger when vnode list associated topic change
L
Liu Jicong 已提交
106
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
107
} TqTopicVhandle;
L
Liu Jicong 已提交
108 109


L
Liu Jicong 已提交
110 111
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
112
// TODO: define a serializer and deserializer
L
Liu Jicong 已提交
113
typedef struct TqBufferItem {
L
Liu Jicong 已提交
114
  int64_t offset;
L
Liu Jicong 已提交
115 116 117
  // executors are identical but not concurrent
  // so it must be a copy in each item
  void*   executor;
L
Liu Jicong 已提交
118
  int64_t size;
L
Liu Jicong 已提交
119
  void*   content;
L
Liu Jicong 已提交
120
} TqBufferItem;
L
Liu Jicong 已提交
121

L
Liu Jicong 已提交
122
typedef struct TqBufferHandle {
L
Liu Jicong 已提交
123 124 125 126 127 128 129
  // char* topic; //c style, end with '\0'
  // int64_t cgId;
  // void* ahandle;
  int64_t      nextConsumeOffset;
  int64_t      topicId;
  int32_t      head;
  int32_t      tail;
L
Liu Jicong 已提交
130 131
  TqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle;
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133
typedef struct TqListHandle {
L
Liu Jicong 已提交
134
  TqBufferHandle       bufHandle;
L
Liu Jicong 已提交
135 136
  struct TqListHandle* next;
} TqListHandle;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138
typedef struct TqGroupHandle {
L
Liu Jicong 已提交
139 140 141 142 143
  int64_t       cId;
  int64_t       cgId;
  void*         ahandle;
  int32_t       topicNum;
  TqListHandle* head;
L
Liu Jicong 已提交
144
} TqGroupHandle;
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
typedef struct TqQueryExec {
L
Liu Jicong 已提交
147
  void*         src;
L
Liu Jicong 已提交
148
  TqBufferItem* dest;
L
Liu Jicong 已提交
149
  void*         executor;
L
Liu Jicong 已提交
150
} TqQueryExec;
L
Liu Jicong 已提交
151

L
Liu Jicong 已提交
152
typedef struct TqQueryMsg {
L
Liu Jicong 已提交
153 154
  TqQueryExec*       exec;
  struct TqQueryMsg* next;
L
Liu Jicong 已提交
155
} TqQueryMsg;
L
Liu Jicong 已提交
156

L
Liu Jicong 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
typedef struct TqLogReader {
  void* logHandle;
  int32_t (*walRead)(void* logHandle, void** data, int64_t ver);
  int64_t (*walGetFirstVer)(void* logHandle);
  int64_t (*walGetSnapshotVer)(void* logHandle);
  int64_t (*walGetLastVer)(void* logHandle);
} TqLogReader;

typedef struct TqConfig {
  // TODO
} TqConfig;

typedef struct STQ {
  // the collection of group handle
  // the handle of kvstore
  const char*  path;
  TqConfig*    tqConfig;
  TqLogReader* tqLogReader; 
  SMemAllocatorFactory* allocFac;
} STQ;

// open in each vnode
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
void tqDestroy(STQ*);
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182
// void* will be replace by a msg type
L
Liu Jicong 已提交
183
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
184 185
int tqCommit(STQ*);

L
Liu Jicong 已提交
186
int tqConsume(STQ*, TmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
187

L
Liu Jicong 已提交
188
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
189 190 191

int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
L
Liu Jicong 已提交
192
int tqMoveOffsetToNext(TqGroupHandle*);
L
Liu Jicong 已提交
193
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
L
Liu Jicong 已提交
194 195 196
int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*);
L
Liu Jicong 已提交
197

L
Liu Jicong 已提交
198 199 200 201
int   tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes);
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
L
Liu Jicong 已提交
202

L
Liu Jicong 已提交
203 204 205
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
L
Liu Jicong 已提交
206

L
Liu Jicong 已提交
207
int tqGetGHandleSSize(const TqGroupHandle* gHandle);
208 209
int tqBufHandleSSize();
int tqBufItemSSize();
L
Liu Jicong 已提交
210

H
refact  
Hongze Cheng 已提交
211 212 213 214
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
215
#endif /*_TD_TQ_H_*/