tq.h 4.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19 20
#include "os.h"

H
refact  
Hongze Cheng 已提交
21 22 23 24
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
25
typedef struct TmqMsgHead {
L
Liu Jicong 已提交
26
  int32_t protoVer;
L
Liu Jicong 已提交
27
  int32_t msgType;
L
Liu Jicong 已提交
28
  int64_t cgId;
L
Liu Jicong 已提交
29
  int64_t clientId;
L
Liu Jicong 已提交
30
} TmqMsgHead;
L
Liu Jicong 已提交
31

L
Liu Jicong 已提交
32
typedef struct TmqOneAck {
L
Liu Jicong 已提交
33 34
  int64_t topicId;
  int64_t consumeOffset;
L
Liu Jicong 已提交
35
} TmqOneAck;
L
Liu Jicong 已提交
36

L
Liu Jicong 已提交
37
typedef struct TmqAcks {
L
Liu Jicong 已提交
38 39
  int32_t ackNum;
  //should be sorted
L
Liu Jicong 已提交
40 41
  TmqOneAck acks[];
} TmqAcks;
L
Liu Jicong 已提交
42

L
Liu Jicong 已提交
43
//TODO: put msgs into common
L
Liu Jicong 已提交
44 45 46 47
typedef struct TmqConnectReq {
  TmqMsgHead head;
  TmqAcks acks;
} TmqConnectReq;
L
Liu Jicong 已提交
48

L
Liu Jicong 已提交
49 50
typedef struct TmqConnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
51
  int8_t status;
L
Liu Jicong 已提交
52
} TmqConnectRsp;
L
Liu Jicong 已提交
53

L
Liu Jicong 已提交
54 55 56
typedef struct TmqDisconnectReq {
  TmqMsgHead head;
} TmqDiscconectReq;
L
Liu Jicong 已提交
57

L
Liu Jicong 已提交
58 59
typedef struct TmqDisconnectRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
60
  int8_t status;
L
Liu Jicong 已提交
61
} TmqDisconnectRsp;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65 66
typedef struct TmqConsumeReq {
  TmqMsgHead head;
  TmqAcks acks;
} TmqConsumeReq;
L
Liu Jicong 已提交
67

L
Liu Jicong 已提交
68
typedef struct TmqMsgContent {
L
Liu Jicong 已提交
69 70 71
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
L
Liu Jicong 已提交
72
} TmqMsgContent;
L
Liu Jicong 已提交
73

L
Liu Jicong 已提交
74 75
typedef struct TmqConsumeRsp {
  TmqMsgHead    head;
L
Liu Jicong 已提交
76
  int64_t       bodySize;
L
Liu Jicong 已提交
77 78
  TmqMsgContent msgs[];
} TmqConsumeRsp;
L
Liu Jicong 已提交
79

L
Liu Jicong 已提交
80 81
typedef struct TmqSubscribeReq {
  TmqMsgHead head;
L
Liu Jicong 已提交
82 83
  int64_t topicLen;
  char topic[];
L
Liu Jicong 已提交
84
} TmqSubscribeReq;
L
Liu Jicong 已提交
85

L
Liu Jicong 已提交
86 87
typedef struct tmqSubscribeRsp {
  TmqMsgHead head;
L
Liu Jicong 已提交
88 89
  int64_t vgId;
  char ep[]; //TSDB_EP_LEN
L
Liu Jicong 已提交
90
} TmqSubscribeRsp;
L
Liu Jicong 已提交
91

L
Liu Jicong 已提交
92
typedef struct TmqHeartbeatReq {
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94
} TmqHeartbeatReq;
L
Liu Jicong 已提交
95

L
Liu Jicong 已提交
96
typedef struct TmqHeartbeatRsp {
L
Liu Jicong 已提交
97

L
Liu Jicong 已提交
98
} TmqHeartbeatRsp;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100
typedef struct TqTopicVhandle {
L
Liu Jicong 已提交
101 102 103 104 105 106
  //name
  //
  //executor for filter
  //
  //callback for mnode
  //
L
Liu Jicong 已提交
107
} TqTopicVhandle;
L
Liu Jicong 已提交
108 109

typedef struct STQ {
L
Liu Jicong 已提交
110
  //the collection of group handle
L
Liu Jicong 已提交
111 112 113

} STQ;

L
Liu Jicong 已提交
114 115
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
116
//TODO: define a serializer and deserializer
L
Liu Jicong 已提交
117
typedef struct TqBufferItem {
L
Liu Jicong 已提交
118
  int64_t offset;
L
Liu Jicong 已提交
119 120
  //executors are identical but not concurrent
  //so it must be a copy in each item
L
Liu Jicong 已提交
121
  void* executor;
L
Liu Jicong 已提交
122
  int64_t size;
L
Liu Jicong 已提交
123
  void* content;
L
Liu Jicong 已提交
124
} TqBufferItem;
L
Liu Jicong 已提交
125

L
Liu Jicong 已提交
126
typedef struct TqBufferHandle {
L
Liu Jicong 已提交
127 128 129 130 131
  //char* topic; //c style, end with '\0'
  //int64_t cgId;
  //void* ahandle;
  int64_t nextConsumeOffset;
  int64_t topicId;
L
Liu Jicong 已提交
132 133
  int32_t head;
  int32_t tail;
L
Liu Jicong 已提交
134 135
  TqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle;
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137 138 139 140
typedef struct TqListHandle {
  TqBufferHandle bufHandle;
  struct TqListHandle* next;
} TqListHandle;
L
Liu Jicong 已提交
141

L
Liu Jicong 已提交
142
typedef struct TqGroupHandle {
L
Liu Jicong 已提交
143 144 145 146
  int64_t cId;
  int64_t cgId;
  void* ahandle;
  int32_t topicNum;
L
Liu Jicong 已提交
147 148
  TqListHandle *head; 
} TqGroupHandle;
L
Liu Jicong 已提交
149

L
Liu Jicong 已提交
150
typedef struct TqQueryExec {
L
Liu Jicong 已提交
151
  void* src;
L
Liu Jicong 已提交
152
  TqBufferItem* dest;
L
Liu Jicong 已提交
153
  void* executor;
L
Liu Jicong 已提交
154
} TqQueryExec;
L
Liu Jicong 已提交
155

L
Liu Jicong 已提交
156 157 158 159
typedef struct TqQueryMsg {
  TqQueryExec *exec;
  struct TqQueryMsg *next;
} TqQueryMsg;
L
Liu Jicong 已提交
160

L
Liu Jicong 已提交
161
//init in each vnode
L
Liu Jicong 已提交
162
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
L
Liu Jicong 已提交
163 164 165 166
void tqCleanUp(STQ*);

//void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
167 168
int tqCommit(STQ*);

L
Liu Jicong 已提交
169
int tqConsume(STQ*, TmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
170

L
Liu Jicong 已提交
171
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
172 173 174

int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
L
Liu Jicong 已提交
175
int tqMoveOffsetToNext(TqGroupHandle*);
L
Liu Jicong 已提交
176
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
L
Liu Jicong 已提交
177 178 179
int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*);
L
Liu Jicong 已提交
180

L
Liu Jicong 已提交
181 182 183 184
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr);
L
Liu Jicong 已提交
185

L
Liu Jicong 已提交
186 187 188
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem);
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
int tqGetGHandleSSize(const TqGroupHandle *gHandle);
191 192
int tqBufHandleSSize();
int tqBufItemSSize();
L
Liu Jicong 已提交
193

H
refact  
Hongze Cheng 已提交
194 195 196 197
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
198
#endif /*_TD_TQ_H_*/