tq.h 3.7 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
H
Hongze Cheng 已提交
15 16 17 18

#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_

H
Hongze Cheng 已提交
19 20
#include "vnodeInt.h"

L
Liu Jicong 已提交
21 22 23 24
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
25
#include "tqueue.h"
L
Liu Jicong 已提交
26
#include "trpc.h"
L
Liu Jicong 已提交
27 28 29
#include "ttimer.h"
#include "wal.h"

H
Hongze Cheng 已提交
30 31 32 33
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
34 35
// tqDebug ===================
// clang-format off
L
Liu Jicong 已提交
36 37 38 39 40 41
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ  FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}     while(0)
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ  ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}     while(0)
#define tqWarn(...)  do { if (tqDebugFlag & DEBUG_WARN)  { taosPrintLog("TQ  WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}       while(0)
#define tqInfo(...)  do { if (tqDebugFlag & DEBUG_INFO)  { taosPrintLog("TQ  ", DEBUG_INFO, 255, __VA_ARGS__); }}            while(0)
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ  ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ  ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
H
Hongze Cheng 已提交
42 43
// clang-format on

H
Hongze Cheng 已提交
44 45 46
typedef struct STqOffsetCfg   STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore;

H
Hongze Cheng 已提交
47 48 49 50 51 52 53 54
struct STqReadHandle {
  int64_t           ver;
  SHashObj*         tbIdHash;
  const SSubmitReq* pMsg;
  SSubmitBlk*       pBlock;
  SSubmitMsgIter    msgIter;
  SSubmitBlkIter    blkIter;
  SMeta*            pVnodeMeta;
L
Liu Jicong 已提交
55
  SArray*           pColIdList;  // SArray<int16_t>
H
Hongze Cheng 已提交
56
  int32_t           sver;
L
Liu Jicong 已提交
57
  int64_t           cachedSchemaUid;
H
Hongze Cheng 已提交
58 59 60 61
  SSchemaWrapper*   pSchemaWrapper;
  STSchema*         pSchema;
};

H
Hongze Cheng 已提交
62
typedef struct {
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
  int64_t  consumerId;
  int32_t  epoch;
  int32_t  skipLogNum;
  int64_t  reqOffset;
  SRWLatch lock;
  SRpcMsg* handle;
} STqPushHandle;

typedef struct {
  char          subKey[TSDB_SUBSCRIBE_KEY_LEN];
  int64_t       consumerId;
  int32_t       epoch;
  int8_t        subType;
  int8_t        withTbName;
  int8_t        withSchema;
  int8_t        withTag;
  char*         qmsg;
80
  SHashObj*     pDropTbUid;
L
Liu Jicong 已提交
81
  STqPushHandle pushHandle;
L
Liu Jicong 已提交
82
  // SRWLatch        lock;
L
Liu Jicong 已提交
83
  SWalReadHandle* pWalReader;
L
Liu Jicong 已提交
84 85 86
  // task number should be the same with fetch thread
  STqReadHandle* pExecReader[5];
  qTaskInfo_t    task[5];
L
Liu Jicong 已提交
87
} STqExec;
H
Hongze Cheng 已提交
88

89 90 91
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec);
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec);

H
Hongze Cheng 已提交
92
struct STQ {
L
Liu Jicong 已提交
93
  char*     path;
L
Liu Jicong 已提交
94 95
  SHashObj* pushMgr;  // consumerId -> STqExec*
  SHashObj* execs;    // subKey -> STqExec
L
Liu Jicong 已提交
96 97 98
  SHashObj* pStreamTasks;
  SVnode*   pVnode;
  SWal*     pWal;
99
  TDB*      pTdb;
H
Hongze Cheng 已提交
100 101 102 103 104 105 106
};

typedef struct {
  int8_t inited;
  tmr_h  timer;
} STqMgmt;

L
Liu Jicong 已提交
107
static STqMgmt tqMgmt = {0};
H
Hongze Cheng 已提交
108

H
Hongze Cheng 已提交
109 110 111 112
// init once
int  tqInit();
void tqCleanUp();

H
Hongze Cheng 已提交
113 114 115 116 117 118 119 120 121
// tqOffset
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
void            STqOffsetClose(STqOffsetStore*);

int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);

H
Hongze Cheng 已提交
122 123 124 125
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
126
#endif /*_TD_VNODE_TQ_H_*/