tq.h 3.2 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19 20
#include "os.h"

H
refact  
Hongze Cheng 已提交
21 22 23 24
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
25 26
typedef struct tmqMsgHead {
  int32_t headLen;
L
Liu Jicong 已提交
27
  int32_t protoVer;
L
Liu Jicong 已提交
28
  int64_t cgId;
L
Liu Jicong 已提交
29
  int64_t topicId;
L
Liu Jicong 已提交
30
  int64_t clientId;
L
Liu Jicong 已提交
31 32
  int32_t checksum;
  int32_t msgType;
L
Liu Jicong 已提交
33 34 35 36 37 38 39 40
} tmqMsgHead;

//TODO: put msgs into common
typedef struct tmqConnectReq {
  tmqMsgHead head;
} tmqConnectReq;

typedef struct tmqConnectResp {
L
Liu Jicong 已提交
41 42
  tmqMsgHead head;
  int8_t status;
L
Liu Jicong 已提交
43 44 45
} tmqConnectResp;

typedef struct tmqDisconnectReq {
L
Liu Jicong 已提交
46
  tmqMsgHead head;
L
Liu Jicong 已提交
47 48 49
} tmqDisconnectReq;

typedef struct tmqDisconnectResp {
L
Liu Jicong 已提交
50 51
  tmqMsgHead head;
  int8_t status;
L
Liu Jicong 已提交
52 53 54
} tmqDiconnectResp;

typedef struct tmqConsumeReq {
L
Liu Jicong 已提交
55 56
  tmqMsgHead head;
  int64_t commitOffset;
L
Liu Jicong 已提交
57 58 59
} tmqConsumeReq;

typedef struct tmqConsumeResp {
L
Liu Jicong 已提交
60 61
  tmqMsgHead head;
  char content[];
L
Liu Jicong 已提交
62 63
} tmqConsumeResp;

L
Liu Jicong 已提交
64 65 66 67 68
//
typedef struct tmqMnodeSubscribeReq {
  tmqMsgHead head;
  int64_t topicLen;
  char topic[];
L
Liu Jicong 已提交
69 70
} tmqSubscribeReq;

L
Liu Jicong 已提交
71 72 73 74
typedef struct tmqMnodeSubscribeResp {
  tmqMsgHead head;
  int64_t vgId;
  char ep[]; //TSDB_EP_LEN
L
Liu Jicong 已提交
75 76 77 78 79 80 81 82 83 84
} tmqSubscribeResp;

typedef struct tmqHeartbeatReq {

} tmqHeartbeatReq;

typedef struct tmqHeartbeatResp {

} tmqHeartbeatResp;

L
Liu Jicong 已提交
85 86 87 88 89 90 91
typedef struct tqTopicVhandle {
  //name
  //
  //executor for filter
  //
  //callback for mnode
  //
L
Liu Jicong 已提交
92
} tqTopicVhandle;
L
Liu Jicong 已提交
93 94 95 96 97 98 99 100 101 102 103

typedef struct STQ {
  //the set for topics
  //key=topicName: str
  //value=tqTopicVhandle

  //a map
  //key=<topic: str, cgId: int64_t>
  //value=consumeOffset: int64_t
} STQ;

L
Liu Jicong 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
#define TQ_BUFFER_SIZE 8

typedef struct tqBufferItem {
  int64_t offset;
  void* executor;
  void* content;
} tqBufferItem;

typedef struct tqGroupHandle {
  char* topic; //c style, end with '\0'
  int64_t cgId;
  void* ahandle;
  int64_t consumeOffset;
  int32_t head;
  int32_t tail;
  tqBufferItem buffer[TQ_BUFFER_SIZE];
} tqGroupHandle;

L
Liu Jicong 已提交
122
//init in each vnode
L
Liu Jicong 已提交
123
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
L
Liu Jicong 已提交
124 125 126 127
void tqCleanUp(STQ*);

//void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
128 129
int tqCommit(STQ*);

L
Liu Jicong 已提交
130
//void* will be replace by a msg type
L
Liu Jicong 已提交
131
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
H
refact  
Hongze Cheng 已提交
132

L
Liu Jicong 已提交
133 134 135 136 137 138 139 140 141 142 143
tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId);

int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(tqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqFetchMsg(tqGroupHandle*, void*);
int tqRegisterContext(tqGroupHandle*, void*);
int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);

H
refact  
Hongze Cheng 已提交
144 145 146 147
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
148
#endif /*_TD_TQ_H_*/