tq.h 4.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19 20
#include "os.h"

H
refact  
Hongze Cheng 已提交
21 22 23 24
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
25
typedef struct tmqMsgHead {
L
Liu Jicong 已提交
26
  int32_t protoVer;
L
Liu Jicong 已提交
27
  int32_t msgType;
L
Liu Jicong 已提交
28
  int64_t cgId;
L
Liu Jicong 已提交
29
  int64_t clientId;
L
Liu Jicong 已提交
30 31
} tmqMsgHead;

L
Liu Jicong 已提交
32 33 34 35 36 37 38 39 40 41 42
typedef struct tmqOneAck {
  int64_t topicId;
  int64_t consumeOffset;
} tmqOneAck;

typedef struct tmqAcks {
  int32_t ackNum;
  //should be sorted
  tmqOneAck acks[];
} tmqAcks;

L
Liu Jicong 已提交
43 44 45
//TODO: put msgs into common
typedef struct tmqConnectReq {
  tmqMsgHead head;
L
Liu Jicong 已提交
46
  tmqAcks acks;
L
Liu Jicong 已提交
47 48
} tmqConnectReq;

L
Liu Jicong 已提交
49
typedef struct tmqConnectRsp {
L
Liu Jicong 已提交
50 51
  tmqMsgHead head;
  int8_t status;
L
Liu Jicong 已提交
52
} tmqConnectRsp;
L
Liu Jicong 已提交
53 54

typedef struct tmqDisconnectReq {
L
Liu Jicong 已提交
55
  tmqMsgHead head;
L
Liu Jicong 已提交
56 57
} tmqDisconnectReq;

L
Liu Jicong 已提交
58
typedef struct tmqDisconnectRsp {
L
Liu Jicong 已提交
59 60
  tmqMsgHead head;
  int8_t status;
L
Liu Jicong 已提交
61
} tmqDiconnectRsp;
L
Liu Jicong 已提交
62 63

typedef struct tmqConsumeReq {
L
Liu Jicong 已提交
64
  tmqMsgHead head;
L
Liu Jicong 已提交
65
  tmqAcks acks;
L
Liu Jicong 已提交
66 67
} tmqConsumeReq;

L
Liu Jicong 已提交
68 69 70 71 72 73 74 75 76 77 78
typedef struct tmqMsgContent {
  int64_t topicId;
  int64_t msgLen;
  char    msg[];
} tmqMsgContent;

typedef struct tmqConsumeRsp {
  tmqMsgHead    head;
  int64_t       bodySize;
  tmqMsgContent msgs[];
} tmqConsumeRsp;
L
Liu Jicong 已提交
79

L
Liu Jicong 已提交
80 81 82 83
typedef struct tmqMnodeSubscribeReq {
  tmqMsgHead head;
  int64_t topicLen;
  char topic[];
L
Liu Jicong 已提交
84 85
} tmqSubscribeReq;

L
Liu Jicong 已提交
86
typedef struct tmqMnodeSubscribeRsp {
L
Liu Jicong 已提交
87 88 89
  tmqMsgHead head;
  int64_t vgId;
  char ep[]; //TSDB_EP_LEN
L
Liu Jicong 已提交
90
} tmqSubscribeRsp;
L
Liu Jicong 已提交
91 92 93 94 95

typedef struct tmqHeartbeatReq {

} tmqHeartbeatReq;

L
Liu Jicong 已提交
96
typedef struct tmqHeartbeatRsp {
L
Liu Jicong 已提交
97

L
Liu Jicong 已提交
98
} tmqHeartbeatRsp;
L
Liu Jicong 已提交
99

L
Liu Jicong 已提交
100 101 102 103 104 105 106
typedef struct tqTopicVhandle {
  //name
  //
  //executor for filter
  //
  //callback for mnode
  //
L
Liu Jicong 已提交
107
} tqTopicVhandle;
L
Liu Jicong 已提交
108 109

typedef struct STQ {
L
Liu Jicong 已提交
110
  //the collection of group handle
L
Liu Jicong 已提交
111 112 113

} STQ;

L
Liu Jicong 已提交
114 115
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
116
//TODO: define a serializer and deserializer
L
Liu Jicong 已提交
117 118
typedef struct tqBufferItem {
  int64_t offset;
L
Liu Jicong 已提交
119 120
  //executors are identical but not concurrent
  //so it must be a copy in each item
L
Liu Jicong 已提交
121
  void* executor;
L
Liu Jicong 已提交
122
  int64_t size;
L
Liu Jicong 已提交
123 124 125
  void* content;
} tqBufferItem;

L
Liu Jicong 已提交
126 127 128 129 130 131
typedef struct tqBufferHandle {
  //char* topic; //c style, end with '\0'
  //int64_t cgId;
  //void* ahandle;
  int64_t nextConsumeOffset;
  int64_t topicId;
L
Liu Jicong 已提交
132 133 134
  int32_t head;
  int32_t tail;
  tqBufferItem buffer[TQ_BUFFER_SIZE];
L
Liu Jicong 已提交
135 136 137
} tqBufferHandle;

typedef struct tqListHandle {
138
  tqBufferHandle bufHandle;
L
Liu Jicong 已提交
139 140 141 142 143 144 145 146 147
  struct tqListHandle* next;
} tqListHandle;

typedef struct tqGroupHandle {
  int64_t cId;
  int64_t cgId;
  void* ahandle;
  int32_t topicNum;
  tqListHandle *head; 
L
Liu Jicong 已提交
148 149
} tqGroupHandle;

L
Liu Jicong 已提交
150 151 152 153 154 155 156 157 158 159 160
typedef struct tqQueryExec {
  void* src;
  tqBufferItem* dest;
  void* executor;
} tqQueryExec;

typedef struct tqQueryMsg {
  tqQueryExec *exec;
  struct tqQueryMsg *next;
} tqQueryMsg;

L
Liu Jicong 已提交
161
//init in each vnode
L
Liu Jicong 已提交
162
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
L
Liu Jicong 已提交
163 164 165 166
void tqCleanUp(STQ*);

//void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
167 168
int tqCommit(STQ*);

L
Liu Jicong 已提交
169
int tqConsume(STQ*, tmqConsumeReq*);
H
refact  
Hongze Cheng 已提交
170

L
Liu Jicong 已提交
171
tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
L
Liu Jicong 已提交
172 173 174 175 176 177

int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(tqGroupHandle*, void*);
L
Liu Jicong 已提交
178
int tqLaunchQuery(tqGroupHandle*);
179
int tqSendLaunchQuery(tqGroupHandle*);
L
Liu Jicong 已提交
180

181 182 183 184
int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes);
void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr);
void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr);
void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr);
L
Liu Jicong 已提交
185

186 187 188
const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem);
L
Liu Jicong 已提交
189 190

int tqGetGHandleSSize(const tqGroupHandle *gHandle);
191 192
int tqBufHandleSSize();
int tqBufItemSSize();
L
Liu Jicong 已提交
193

H
refact  
Hongze Cheng 已提交
194 195 196 197
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
198
#endif /*_TD_TQ_H_*/