tq.h 6.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15 16 17 18

#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_

H
Hongze Cheng 已提交
19 20
#include "vnodeInt.h"

L
Liu Jicong 已提交
21 22 23 24
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
25
#include "tqueue.h"
L
Liu Jicong 已提交
26
#include "trpc.h"
L
Liu Jicong 已提交
27 28 29
#include "ttimer.h"
#include "wal.h"

H
Hongze Cheng 已提交
30 31 32 33
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
34 35
// tqDebug ===================
// clang-format off
L
Liu Jicong 已提交
36 37 38 39 40 41
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ  FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define tqWarn(...)  do { if (tqDebugFlag & DEBUG_WARN)  { taosPrintLog("TQ  WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define tqInfo(...)  do { if (tqDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ  ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
L
Liu Jicong 已提交
42

H
Hongze Cheng 已提交
43 44
// clang-format on

H
Hongze Cheng 已提交
45 46
typedef struct STqOffsetStore STqOffsetStore;

L
Liu Jicong 已提交
47
// tqPush
48 49
#define EXTRACT_DATA_FROM_WAL_ID    (-1)
#define STREAM_TASK_STATUS_CHECK_ID (-2)
L
Liu Jicong 已提交
50 51 52

// tqExec
typedef struct {
dengyihao's avatar
dengyihao 已提交
53
  char* qmsg;  // SubPlanToString
L
Liu Jicong 已提交
54 55 56
} STqExecCol;

typedef struct {
L
Liu Jicong 已提交
57
  int64_t suid;
58 59
  char*   qmsg;   // SubPlanToString
  SNode*  node;
L
Liu Jicong 已提交
60 61 62
} STqExecTb;

typedef struct {
L
Liu Jicong 已提交
63
  SHashObj* pFilterOutTbUid;
L
Liu Jicong 已提交
64 65 66
} STqExecDb;

typedef struct {
dengyihao's avatar
dengyihao 已提交
67 68 69
  int8_t      subType;
  STqReader*  pTqReader;
  qTaskInfo_t task;
L
Liu Jicong 已提交
70 71 72 73
  union {
    STqExecCol execCol;
    STqExecTb  execTb;
    STqExecDb  execDb;
L
Liu Jicong 已提交
74
  };
dengyihao's avatar
dengyihao 已提交
75
  int32_t numOfCols;  // number of out pout column, temporarily used
L
Liu Jicong 已提交
76
} STqExecHandle;
H
Hongze Cheng 已提交
77

dengyihao's avatar
dengyihao 已提交
78
typedef enum tq_handle_status {
wmmhello's avatar
wmmhello 已提交
79 80
  TMQ_HANDLE_STATUS_IDLE = 0,
  TMQ_HANDLE_STATUS_EXEC = 1,
dengyihao's avatar
dengyihao 已提交
81
} tq_handle_status;
82

L
Liu Jicong 已提交
83
typedef struct {
84 85 86 87 88 89 90
  char          subKey[TSDB_SUBSCRIBE_KEY_LEN];
  int64_t       consumerId;
  int32_t       epoch;
  int8_t        fetchMeta;
  int64_t       snapshotVer;
  SWalReader*   pWalReader;
  SWalRef*      pRef;
wmmhello's avatar
wmmhello 已提交
91
//  STqPushHandle pushHandle;    // push
92
  STqExecHandle execHandle;    // exec
93
  SRpcMsg*      msg;
wmmhello's avatar
wmmhello 已提交
94
  tq_handle_status        status;
L
Liu Jicong 已提交
95
} STqHandle;
L
Liu Jicong 已提交
96

H
Hongze Cheng 已提交
97
struct STQ {
98 99 100 101
  SVnode*         pVnode;
  char*           path;
  int64_t         walLogLastVer;
  SRWLatch        lock;
wmmhello's avatar
wmmhello 已提交
102
  SHashObj*       pPushMgr;    // subKey -> STqHandle
103 104
  SHashObj*       pHandle;     // subKey -> STqHandle
  SHashObj*       pCheckInfo;  // topic -> SAlterCheckInfo
105
  STqOffsetStore* pOffsetStore;
106 107 108 109
  TDB*            pMetaDB;
  TTB*            pExecStore;
  TTB*            pCheckStore;
  SStreamMeta*    pStreamMeta;
H
Hongze Cheng 已提交
110 111 112 113 114 115 116
};

typedef struct {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

117 118 119 120
typedef struct {
  int32_t size;
} STqOffsetHead;

L
Liu Jicong 已提交
121
static STqMgmt tqMgmt = {0};
H
Hongze Cheng 已提交
122

L
Liu Jicong 已提交
123 124
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
125
void    tqDestroyTqHandle(void* data);
L
Liu Jicong 已提交
126

L
Liu Jicong 已提交
127
// tqRead
L
Liu Jicong 已提交
128
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
129
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
130
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
L
Liu Jicong 已提交
131

wmmhello's avatar
wmmhello 已提交
132
// tqExec
133
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
L
Liu Jicong 已提交
134
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
dengyihao's avatar
dengyihao 已提交
135 136
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
                      int32_t type, int32_t vgId);
137 138
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
L
Liu Jicong 已提交
139

L
Liu Jicong 已提交
140 141 142 143 144
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
L
Liu Jicong 已提交
145
int32_t tqMetaRestoreHandle(STQ* pTq);
146 147 148
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
149 150
int32_t tqMetaGetHandle(STQ* pTq, const char* key);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
L
Liu Jicong 已提交
151

152
STqOffsetStore* tqOffsetOpen(STQ* pTq);
L
Liu Jicong 已提交
153
void            tqOffsetClose(STqOffsetStore*);
154 155
STqOffset*      tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t         tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
L
Liu Jicong 已提交
156
int32_t         tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
L
Liu Jicong 已提交
157
int32_t         tqOffsetCommitFile(STqOffsetStore* pStore);
158 159

// tqSink
H
Haojun Liao 已提交
160 161 162
int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
                         const char* pIdStr);
void    tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
H
Hongze Cheng 已提交
163

L
Liu Jicong 已提交
164
// tqOffset
165
char*   tqOffsetBuildFName(const char* path, int32_t fVer);
L
Liu Jicong 已提交
166 167
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);

L
Liu Jicong 已提交
168
// tqStream
169
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
170
int32_t tqStreamTasksScanWal(STQ* pTq);
171
int32_t tqStreamTasksStatusCheck(STQ* pTq);
172 173

// tq util
174
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
175
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
176
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
H
Haojun Liao 已提交
177
                        int32_t type, int64_t sver, int64_t ever);
178
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
H
Hongze Cheng 已提交
179 180 181 182
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
183
#endif /*_TD_VNODE_TQ_H_*/