tq.h 5.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15 16 17 18

#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_

H
Hongze Cheng 已提交
19 20
#include "vnodeInt.h"

L
Liu Jicong 已提交
21 22 23 24
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
25
#include "tqueue.h"
L
Liu Jicong 已提交
26
#include "trpc.h"
L
Liu Jicong 已提交
27 28 29
#include "ttimer.h"
#include "wal.h"

H
Hongze Cheng 已提交
30 31 32 33
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
34 35
// tqDebug ===================
// clang-format off
L
Liu Jicong 已提交
36 37 38 39 40 41
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ  FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define tqWarn(...)  do { if (tqDebugFlag & DEBUG_WARN)  { taosPrintLog("TQ  WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define tqInfo(...)  do { if (tqDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ  ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
L
Liu Jicong 已提交
42 43 44 45 46 47 48 49 50 51

#define IS_META_MSG(x) ( \
     x == TDMT_VND_CREATE_STB     \
  || x == TDMT_VND_ALTER_STB      \
  || x == TDMT_VND_DROP_STB       \
  || x == TDMT_VND_CREATE_TABLE   \
  || x == TDMT_VND_ALTER_TABLE    \
  || x == TDMT_VND_DROP_TABLE     \
  || x == TDMT_VND_DROP_TTL_TABLE \
)
H
Hongze Cheng 已提交
52 53
// clang-format on

H
Hongze Cheng 已提交
54 55
typedef struct STqOffsetStore STqOffsetStore;

L
Liu Jicong 已提交
56 57
// tqRead

H
Hongze Cheng 已提交
58 59 60 61 62 63
struct STqReadHandle {
  int64_t           ver;
  const SSubmitReq* pMsg;
  SSubmitBlk*       pBlock;
  SSubmitMsgIter    msgIter;
  SSubmitBlkIter    blkIter;
L
Liu Jicong 已提交
64 65 66 67 68 69

  SMeta*    pVnodeMeta;
  SHashObj* tbIdHash;
  SArray*   pColIdList;  // SArray<int16_t>

  int32_t         cachedSchemaVer;
L
Liu Jicong 已提交
70
  int64_t         cachedSchemaSuid;
L
Liu Jicong 已提交
71 72
  SSchemaWrapper* pSchemaWrapper;
  STSchema*       pSchema;
H
Hongze Cheng 已提交
73 74
};

L
Liu Jicong 已提交
75 76
// tqPush

L
Liu Jicong 已提交
77
typedef struct {
L
Liu Jicong 已提交
78
  // msg info
L
Liu Jicong 已提交
79
  int64_t consumerId;
L
Liu Jicong 已提交
80 81
  int64_t reqOffset;
  int64_t processedVer;
L
Liu Jicong 已提交
82
  int32_t epoch;
L
Liu Jicong 已提交
83 84 85 86
  int32_t skipLogNum;
  // rpc info
  int64_t        reqId;
  SRpcHandleInfo rpcInfo;
L
Liu Jicong 已提交
87 88
  tmr_h          timerId;
  int8_t         tmrStopped;
L
Liu Jicong 已提交
89
  // exec
L
Liu Jicong 已提交
90 91 92 93
  int8_t       inputStatus;
  int8_t       execStatus;
  SStreamQueue inputQ;
  SRWLatch     lock;
L
Liu Jicong 已提交
94
} STqPushHandle;
L
Liu Jicong 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

// tqExec

typedef struct {
  char*       qmsg;
  qTaskInfo_t task[5];
} STqExecCol;

typedef struct {
  int64_t suid;
} STqExecTb;

typedef struct {
  SHashObj* pFilterOutTbUid;
} STqExecDb;

typedef struct {
  int8_t subType;

L
Liu Jicong 已提交
114
  SStreamReader* pExecReader[5];
L
Liu Jicong 已提交
115 116 117 118
  union {
    STqExecCol execCol;
    STqExecTb  execTb;
    STqExecDb  execDb;
L
Liu Jicong 已提交
119
  };
L
Liu Jicong 已提交
120

L
Liu Jicong 已提交
121
} STqExecHandle;
H
Hongze Cheng 已提交
122

L
Liu Jicong 已提交
123 124 125 126 127
typedef struct {
  // info
  char    subKey[TSDB_SUBSCRIBE_KEY_LEN];
  int64_t consumerId;
  int32_t epoch;
L
Liu Jicong 已提交
128
  int8_t  fetchMeta;
L
Liu Jicong 已提交
129 130 131 132 133 134 135 136 137 138

  // reader
  SWalReadHandle* pWalReader;

  // push
  STqPushHandle pushHandle;

  // exec
  STqExecHandle execHandle;
} STqHandle;
139

H
Hongze Cheng 已提交
140
struct STQ {
141 142 143 144 145 146 147 148 149
  char*           path;
  SHashObj*       pushMgr;       // consumerId -> STqHandle*
  SHashObj*       handles;       // subKey -> STqHandle
  SHashObj*       pStreamTasks;  // taksId -> SStreamTask
  STqOffsetStore* pOffsetStore;
  SVnode*         pVnode;
  SWal*           pWal;
  TDB*            pMetaStore;
  TTB*            pExecStore;
H
Hongze Cheng 已提交
150 151 152 153 154 155 156
};

typedef struct {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

L
Liu Jicong 已提交
157
static STqMgmt tqMgmt = {0};
H
Hongze Cheng 已提交
158

L
Liu Jicong 已提交
159
// tqRead
L
Liu Jicong 已提交
160
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
L
Liu Jicong 已提交
161

L
Liu Jicong 已提交
162
// tqExec
L
Liu Jicong 已提交
163
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId);
L
Liu Jicong 已提交
164
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId);
L
Liu Jicong 已提交
165
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167 168 169 170 171 172
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);

173 174 175
typedef struct {
  int32_t size;
} STqOffsetHead;
L
Liu Jicong 已提交
176

177
STqOffsetStore* tqOffsetOpen();
L
Liu Jicong 已提交
178
void            tqOffsetClose(STqOffsetStore*);
179 180
STqOffset*      tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t         tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
L
Liu Jicong 已提交
181
int32_t         tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
182 183 184 185
int32_t         tqOffsetSnapshot(STqOffsetStore* pStore);

// tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
H
Hongze Cheng 已提交
186

L
Liu Jicong 已提交
187 188 189 190 191 192 193 194 195 196 197
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
  pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
  pOffsetVal->uid = uid;
  pOffsetVal->ts = ts;
}

static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
  pOffsetVal->type = TMQ_OFFSET__LOG;
  pOffsetVal->version = ver;
}

H
Hongze Cheng 已提交
198 199 200 201
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
202
#endif /*_TD_VNODE_TQ_H_*/