tq.h 4.9 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_TQ_H_
#define _TD_TQ_H_
H
refact  
Hongze Cheng 已提交
18

L
Liu Jicong 已提交
19
#include "common.h"
L
Liu Jicong 已提交
20
#include "executor.h"
L
Liu Jicong 已提交
21
#include "mallocator.h"
L
Liu Jicong 已提交
22
#include "meta.h"
L
Liu Jicong 已提交
23
#include "os.h"
L
Liu Jicong 已提交
24
#include "scheduler.h"
L
Liu Jicong 已提交
25
#include "taoserror.h"
L
Liu Jicong 已提交
26
#include "tlist.h"
L
Liu Jicong 已提交
27
#include "tmsg.h"
L
Liu Jicong 已提交
28
#include "trpc.h"
L
Liu Jicong 已提交
29
#include "ttimer.h"
L
Liu Jicong 已提交
30
#include "tutil.h"
L
Liu Jicong 已提交
31
#include "vnode.h"
L
Liu Jicong 已提交
32
#include "wal.h"
L
Liu Jicong 已提交
33

H
refact  
Hongze Cheng 已提交
34 35 36 37
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
38 39
#define TQ_BUFFER_SIZE 8

L
Liu Jicong 已提交
40 41 42 43 44
typedef struct STqRspHandle {
  void* handle;
  void* ahandle;
} STqRspHandle;

L
Liu Jicong 已提交
45 46
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;

L
Liu Jicong 已提交
47
typedef struct STqTaskItem {
L
Liu Jicong 已提交
48 49 50 51 52 53
  int8_t         status;
  int64_t        offset;
  void*          dst;
  qTaskInfo_t    task;
  STqReadHandle* pReadHandle;
  SSubQueryMsg*  pQueryMsg;
L
Liu Jicong 已提交
54 55 56 57 58 59 60 61 62
} STqTaskItem;

// new version
typedef struct STqBuffer {
  int64_t     firstOffset;
  int64_t     lastOffset;
  STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer;

L
Liu Jicong 已提交
63 64 65 66 67 68 69 70
typedef struct STqTopicHandle {
  char            topicName[TSDB_TOPIC_FNAME_LEN];
  char*           sql;
  char*           logicalPlan;
  char*           physicalPlan;
  int64_t         committedOffset;
  int64_t         currentOffset;
  STqBuffer       buffer;
L
Liu Jicong 已提交
71
  SWalReadHandle* pReadhandle;
L
Liu Jicong 已提交
72 73 74 75
} STqTopicHandle;

typedef struct STqConsumerHandle {
  int64_t consumerId;
L
Liu Jicong 已提交
76
  int64_t epoch;
L
Liu Jicong 已提交
77
  char    cgroup[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
78 79
  SArray* topics;  // SArray<STqClientTopic>
} STqConsumerHandle;
L
Liu Jicong 已提交
80

L
Liu Jicong 已提交
81
typedef struct STqMemRef {
L
Liu Jicong 已提交
82
  SMemAllocatorFactory* pAllocatorFactory;
L
Liu Jicong 已提交
83 84
  SMemAllocator*        pAllocator;
} STqMemRef;
L
Liu Jicong 已提交
85

L
Liu Jicong 已提交
86
typedef struct STqSerializedHead {
L
Liu Jicong 已提交
87 88 89 90 91
  int16_t ver;
  int16_t action;
  int32_t checksum;
  int64_t ssize;
  char    content[];
L
Liu Jicong 已提交
92
} STqSerializedHead;
L
Liu Jicong 已提交
93

L
Liu Jicong 已提交
94 95 96
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*FTqDelete)(void*);
L
Liu Jicong 已提交
97 98 99 100 101

#define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256

#define TQ_PAGE_SIZE 4096
L
Liu Jicong 已提交
102
// key + offset + size
L
Liu Jicong 已提交
103
#define TQ_IDX_SIZE 24
L
Liu Jicong 已提交
104
// 4096 / 24
L
Liu Jicong 已提交
105
#define TQ_MAX_IDX_ONE_PAGE 170
L
Liu Jicong 已提交
106
// 24 * 170
L
Liu Jicong 已提交
107
#define TQ_IDX_PAGE_BODY_SIZE 4080
L
Liu Jicong 已提交
108
// 4096 - 4080
L
Liu Jicong 已提交
109 110
#define TQ_IDX_PAGE_HEAD_SIZE 16

L
Liu Jicong 已提交
111 112
#define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1
L
Liu Jicong 已提交
113
#define TQ_ACTION_INUSE_CONT 2
L
Liu Jicong 已提交
114
#define TQ_ACTION_INTXN 3
L
Liu Jicong 已提交
115

L
Liu Jicong 已提交
116
#define TQ_SVER 0
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118 119 120
// TODO: inplace mode is not implemented
#define TQ_UPDATE_INPLACE 0
#define TQ_UPDATE_APPEND 1
L
Liu Jicong 已提交
121 122

#define TQ_DUP_INTXN_REWRITE 0
L
Liu Jicong 已提交
123
#define TQ_DUP_INTXN_REJECT 2
L
Liu Jicong 已提交
124

L
Liu Jicong 已提交
125
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
L
Liu Jicong 已提交
128 129

static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
L
Liu Jicong 已提交
130 131

#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
L
Liu Jicong 已提交
132

L
Liu Jicong 已提交
133
typedef struct STqMetaHandle {
L
Liu Jicong 已提交
134 135 136 137 138
  int64_t key;
  int64_t offset;
  int64_t serializedSize;
  void*   valueInUse;
  void*   valueInTxn;
L
Liu Jicong 已提交
139
} STqMetaHandle;
L
Liu Jicong 已提交
140

L
Liu Jicong 已提交
141 142 143 144 145 146 147
typedef struct STqMetaList {
  STqMetaHandle       handle;
  struct STqMetaList* next;
  // struct STqMetaList* inTxnPrev;
  // struct STqMetaList* inTxnNext;
  struct STqMetaList* unpersistPrev;
  struct STqMetaList* unpersistNext;
L
Liu Jicong 已提交
148
} STqMetaList;
L
Liu Jicong 已提交
149

L
Liu Jicong 已提交
150
typedef struct STqMetaStore {
L
Liu Jicong 已提交
151 152 153
  STqMetaList* bucket[TQ_BUCKET_SIZE];
  // a table head
  STqMetaList* unpersistHead;
L
Liu Jicong 已提交
154
  // topics that are not connectted
L
Liu Jicong 已提交
155
  STqMetaList* unconnectTopic;
L
Liu Jicong 已提交
156

L
Liu Jicong 已提交
157 158 159
  // TODO:temporaral use, to be replaced by unified tfile
  int fileFd;
  // TODO:temporaral use, to be replaced by unified tfile
L
Liu Jicong 已提交
160 161
  int idxFd;

L
Liu Jicong 已提交
162 163 164 165 166
  char*          dirPath;
  int32_t        tqConfigFlag;
  FTqSerialize   pSerializer;
  FTqDeserialize pDeserializer;
  FTqDelete      pDeleter;
L
Liu Jicong 已提交
167
} STqMetaStore;
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169
typedef struct STQ {
L
Liu Jicong 已提交
170 171
  // the collection of groups
  // the handle of meta kvstore
L
Liu Jicong 已提交
172 173 174
  char*         path;
  STqCfg*       tqConfig;
  STqMemRef     tqMemRef;
L
Liu Jicong 已提交
175
  STqMetaStore* tqMeta;
L
Liu Jicong 已提交
176
  SWal*         pWal;
L
Liu Jicong 已提交
177
  SMeta*        pMeta;
L
Liu Jicong 已提交
178 179
} STQ;

L
Liu Jicong 已提交
180 181 182 183 184 185 186 187 188 189 190
typedef struct STqMgmt {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

static STqMgmt tqMgmt;

// init once
int  tqInit();
void tqCleanUp();

L
Liu Jicong 已提交
191
// open in each vnode
L
Liu Jicong 已提交
192
STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac);
L
Liu Jicong 已提交
193
void tqClose(STQ*);
L
Liu Jicong 已提交
194

L
Liu Jicong 已提交
195
// void* will be replace by a msg type
L
Liu Jicong 已提交
196
int tqPushMsg(STQ*, void* msg, int64_t version);
L
Liu Jicong 已提交
197
int tqCommit(STQ*);
H
refact  
Hongze Cheng 已提交
198

S
Shengliang 已提交
199
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
200
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
L
Liu Jicong 已提交
201

H
refact  
Hongze Cheng 已提交
202 203 204 205
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
206
#endif /*_TD_TQ_H_*/