tq.h 4.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "os.h"
L
Liu Jicong 已提交
20
#include "tutil.h"
L
Liu Jicong 已提交
21

H
refact  
Hongze Cheng 已提交
22 23 24 25
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
26
typedef struct TmqMsgHead {
L
Liu Jicong 已提交
27
  int32_t protoVer;
L
Liu Jicong 已提交
28
  int32_t msgType;
L
Liu Jicong 已提交
29
  int64_t cgId;
L
Liu Jicong 已提交
30
  int64_t clientId;
L
Liu Jicong 已提交
31
} TmqMsgHead;
L
Liu Jicong 已提交
32

L
Liu Jicong 已提交
33
typedef struct TmqOneAck {
L
Liu Jicong 已提交
34 35
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
36
} TmqOneAck;
L
Liu Jicong 已提交
37

L
Liu Jicong 已提交
38
typedef struct TmqAcks {
L
Liu Jicong 已提交
39 40
  int32_t ackNum;
  //should be sorted
L
Liu Jicong 已提交
41 42
  TmqOneAck acks[];
} TmqAcks;
L
Liu Jicong 已提交
43

L
Liu Jicong 已提交
44
//TODO: put msgs into common
L
Liu Jicong 已提交
45 46 47 48
typedef struct TmqConnectReq {
  TmqMsgHead head;
  TmqAcks acks;
} TmqConnectReq;
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50 51
typedef struct TmqConnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
52
  int8_t status;
L
Liu Jicong 已提交
53
} TmqConnectRsp;
L
Liu Jicong 已提交
54

L
Liu Jicong 已提交
55 56 57
typedef struct TmqDisconnectReq {
  TmqMsgHead head;
} TmqDiscconectReq;
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59 60
typedef struct TmqDisconnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
61
  int8_t status;
L
Liu Jicong 已提交
62
} TmqDisconnectRsp;
L
Liu Jicong 已提交
63

L
Liu Jicong 已提交
64 65 66 67
typedef struct TmqConsumeReq {
  TmqMsgHead head;
  TmqAcks acks;
} TmqConsumeReq;
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69
typedef struct TmqMsgContent {
L
Liu Jicong 已提交
70 71 72
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
73
} TmqMsgContent;
L
Liu Jicong 已提交
74

L
Liu Jicong 已提交
75 76
typedef struct TmqConsumeRsp {
  TmqMsgHead    head;
L
Liu Jicong 已提交
77
  int64_t       bodySize;
L
Liu Jicong 已提交
78 79
  TmqMsgContent msgs[];
} TmqConsumeRsp;
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81 82
typedef struct TmqSubscribeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
83 84
  int32_t topicNum;
  int64_t topic[];
L
Liu Jicong 已提交
85
} TmqSubscribeReq;
L
Liu Jicong 已提交
86

L
Liu Jicong 已提交
87 88
typedef struct tmqSubscribeRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
89
  int64_t vgId;
L
Liu Jicong 已提交
90
  char ep[TSDB_EP_LEN]; //TSDB_EP_LEN
L
Liu Jicong 已提交
91
} TmqSubscribeRsp;
L
Liu Jicong 已提交
92

L
Liu Jicong 已提交
93
typedef struct TmqHeartbeatReq {
L
Liu Jicong 已提交
94

L
Liu Jicong 已提交
95
} TmqHeartbeatReq;
L
Liu Jicong 已提交
96

L
Liu Jicong 已提交
97
typedef struct TmqHeartbeatRsp {
L
Liu Jicong 已提交
98

L
Liu Jicong 已提交
99
} TmqHeartbeatRsp;
L
Liu Jicong 已提交
100

L
Liu Jicong 已提交
101
typedef struct TqTopicVhandle {
L
Liu Jicong 已提交
102
  int64_t topicId;
L
Liu Jicong 已提交
103
  //executor for filter
L
Liu Jicong 已提交
104
  void*  filterExec;
L
Liu Jicong 已提交
105
  //callback for mnode
L
Liu Jicong 已提交
106 107
  //trigger when vnode list associated topic change
  void* (*mCallback)(void*, void*);
L
Liu Jicong 已提交
108
} TqTopicVhandle;
L
Liu Jicong 已提交
109 110

typedef struct STQ {
L
Liu Jicong 已提交
111
  //the collection of group handle
L
Liu Jicong 已提交
112
  //the handle of kvstore
L
Liu Jicong 已提交
113 114
} STQ;

L
Liu Jicong 已提交
115 116
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
117
//TODO: define a serializer and deserializer
L
Liu Jicong 已提交
118
typedef struct TqBufferItem {
L
Liu Jicong 已提交
119
  int64_t offset;
L
Liu Jicong 已提交
120 121
  //executors are identical but not concurrent
  //so it must be a copy in each item
L
Liu Jicong 已提交
122
  void* executor;
L
Liu Jicong 已提交
123
  int64_t size;
L
Liu Jicong 已提交
124
  void* content;
L
Liu Jicong 已提交
125
} TqBufferItem;
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127
typedef struct TqBufferHandle {
L
Liu Jicong 已提交
128 129 130 131 132
  //char* topic; //c style, end with '\0'
  //int64_t cgId;
  //void* ahandle;
  int64_t nextConsumeOffset;
  int64_t topicId;
L
Liu Jicong 已提交
133 134
  int32_t head;
  int32_t tail;
L
Liu Jicong 已提交
135 136
  TqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle;
L
Liu Jicong 已提交
137

L
Liu Jicong 已提交
138 139 140 141
typedef struct TqListHandle {
  TqBufferHandle bufHandle;
  struct TqListHandle* next;
} TqListHandle;
L
Liu Jicong 已提交
142

L
Liu Jicong 已提交
143
typedef struct TqGroupHandle {
L
Liu Jicong 已提交
144 145 146 147
  int64_t cId;
  int64_t cgId;
  void* ahandle;
  int32_t topicNum;
L
Liu Jicong 已提交
148 149
  TqListHandle *head; 
} TqGroupHandle;
L
Liu Jicong 已提交
150

L
Liu Jicong 已提交
151
typedef struct TqQueryExec {
L
Liu Jicong 已提交
152
  void* src;
L
Liu Jicong 已提交
153
  TqBufferItem* dest;
L
Liu Jicong 已提交
154
  void* executor;
L
Liu Jicong 已提交
155
} TqQueryExec;
L
Liu Jicong 已提交
156

L
Liu Jicong 已提交
157 158 159 160
typedef struct TqQueryMsg {
  TqQueryExec *exec;
  struct TqQueryMsg *next;
} TqQueryMsg;
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162
//init in each vnode
L
Liu Jicong 已提交
163
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
L
Liu Jicong 已提交
164 165 166 167
void tqCleanUp(STQ*);

//void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
168 169
int tqCommit(STQ*);

L
Liu Jicong 已提交
170
int tqConsume(STQ*, TmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
171

L
Liu Jicong 已提交
172
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
173 174 175

int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
L
Liu Jicong 已提交
176
int tqMoveOffsetToNext(TqGroupHandle*);
L
Liu Jicong 已提交
177
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
L
Liu Jicong 已提交
178 179 180
int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*);
L
Liu Jicong 已提交
181

L
Liu Jicong 已提交
182 183 184 185
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr);
L
Liu Jicong 已提交
186

L
Liu Jicong 已提交
187 188 189
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem);
L
Liu Jicong 已提交
190

L
Liu Jicong 已提交
191
int tqGetGHandleSSize(const TqGroupHandle *gHandle);
192 193
int tqBufHandleSSize();
int tqBufItemSSize();
L
Liu Jicong 已提交
194

H
refact  
Hongze Cheng 已提交
195 196 197 198
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
199
#endif /*_TD_TQ_H_*/