vnodeInt.h 7.2 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_VNODE_DEF_H_
#define _TD_VNODE_DEF_H_
H
save  
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19
#include "executor.h"
H
Hongze Cheng 已提交
20 21 22
#include "filter.h"
#include "qworker.h"
#include "sync.h"
H
Hongze Cheng 已提交
23
#include "tchecksum.h"
H
more  
Hongze Cheng 已提交
24
#include "tcoding.h"
H
Hongze Cheng 已提交
25
#include "tcompare.h"
H
Hongze Cheng 已提交
26
#include "tcompression.h"
L
Liu Jicong 已提交
27
#include "tdatablock.h"
H
Hongze Cheng 已提交
28
#include "tdb.h"
H
Hongze Cheng 已提交
29
#include "tencode.h"
H
refact  
Hongze Cheng 已提交
30
#include "tfs.h"
H
Hongze Cheng 已提交
31
#include "tglobal.h"
H
Hongze Cheng 已提交
32
#include "tjson.h"
H
Hongze Cheng 已提交
33
#include "tlist.h"
H
refact  
Hongze Cheng 已提交
34
#include "tlockfree.h"
H
Hongze Cheng 已提交
35
#include "tlosertree.h"
H
Hongze Cheng 已提交
36
#include "tmallocator.h"
37
#include "tmsgcb.h"
H
Hongze Cheng 已提交
38
#include "tskiplist.h"
H
Hongze Cheng 已提交
39
#include "tstream.h"
H
Hongze Cheng 已提交
40
#include "ttime.h"
H
Hongze Cheng 已提交
41 42
#include "ttimer.h"
#include "wal.h"
H
save  
Hongze Cheng 已提交
43

H
Hongze Cheng 已提交
44 45
#include "vnode.h"

H
save  
Hongze Cheng 已提交
46 47 48 49
#ifdef __cplusplus
extern "C" {
#endif

H
Hongze Cheng 已提交
50 51 52 53 54 55 56
typedef struct SVnodeInfo SVnodeInfo;
typedef struct SMeta      SMeta;
typedef struct STsdb      STsdb;
typedef struct STQ        STQ;
typedef struct SVState    SVState;
typedef struct SVBufPool  SVBufPool;
typedef struct SQWorker   SQHandle;
H
refact  
Hongze Cheng 已提交
57

C
Cary Xu 已提交
58 59 60 61 62
#define VNODE_META_DIR  "meta"
#define VNODE_TSDB_DIR  "tsdb"
#define VNODE_TQ_DIR    "tq"
#define VNODE_WAL_DIR   "wal"
#define VNODE_TSMA_DIR  "tsma"
C
Cary Xu 已提交
63
#define VNODE_RSMA0_DIR "tsdb"
C
Cary Xu 已提交
64 65
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
H
Hongze Cheng 已提交
66

H
Hongze Cheng 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79
// vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void  vnodeBufPoolFree(SVBufPool* pPool, void* p);

// meta
typedef struct SMCtbCursor SMCtbCursor;
typedef struct STbUidStore STbUidStore;

int             metaOpen(SVnode* pVnode, SMeta** ppMeta);
int             metaClose(SMeta* pMeta);
int             metaBegin(SMeta* pMeta);
int             metaCommit(SMeta* pMeta);
int             metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
H
Hongze Cheng 已提交
80
int             metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
H
Hongze Cheng 已提交
81
int             metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
H
Hongze Cheng 已提交
82
int             metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
H
Hongze Cheng 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95 96
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema*       metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
int             metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int             metaGetTbNum(SMeta* pMeta);
SMCtbCursor*    metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void            metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
tb_uid_t        metaCtbCursorNext(SMCtbCursor* pCtbCur);
SArray*         metaGetSmaTbUids(SMeta* pMeta, bool isDup);
void*           metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper*   metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
int32_t         metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t         metaDropTSma(SMeta* pMeta, int64_t indexUid);

// tsdb
C
Cary Xu 已提交
97
int          tsdbOpen(SVnode* pVnode, int8_t type);
98 99 100 101 102 103 104
int          tsdbClose(STsdb* pTsdb);
int          tsdbBegin(STsdb* pTsdb);
int          tsdbCommit(STsdb* pTsdb);
int32_t      tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
int32_t      tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t      tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int          tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
H
Hongze Cheng 已提交
105
int          tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
106
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
H
Hongze Cheng 已提交
107
                             uint64_t taskId);
108 109 110
tsdbReaderT  tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
                                 void* pMemRef);
int32_t      tsdbGetTableGroupFromIdListT(STsdb* tsdb, SArray* pTableIdList, STableGroupInfo* pGroupInfo);
H
Hongze Cheng 已提交
111 112 113 114 115 116 117 118 119 120 121 122

// tq
STQ*    tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void    tqClose(STQ*);
int     tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int     tqCommit(STQ*);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);

C
Cary Xu 已提交
123 124
// sma

125
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
C
Cary Xu 已提交
126 127 128 129
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void    tsdbUidStoreDestory(STbUidStore* pStore);
void*   tsdbUidStoreFree(STbUidStore* pStore);
C
Cary Xu 已提交
130
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
C
Cary Xu 已提交
131

L
Liu Jicong 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
typedef struct {
  int8_t  streamType;  // sma or other
  int8_t  dstType;
  int16_t padding;
  int32_t smaId;
  int64_t tbUid;
  int64_t lastReceivedVer;
  int64_t lastCommittedVer;
} SStreamSinkInfo;

typedef struct {
  SVnode*   pVnode;
  SHashObj* pHash;  // streamId -> SStreamSinkInfo
} SSink;

H
refact  
Hongze Cheng 已提交
147 148
// SVState
struct SVState {
H
Hongze Cheng 已提交
149
  // int64_t processed;
H
refact  
Hongze Cheng 已提交
150 151 152 153
  int64_t committed;
  int64_t applied;
};

H
Hongze Cheng 已提交
154 155 156 157 158
struct SVnodeInfo {
  SVnodeCfg config;
  SVState   state;
};

C
Cary Xu 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
typedef enum {
  TSDB_TYPE_TSDB = 0,     // TSDB
  TSDB_TYPE_TSMA = 1,     // TSMA
  TSDB_TYPE_RSMA_L0 = 2,  // RSMA Level 0
  TSDB_TYPE_RSMA_L1 = 3,  // RSMA Level 1
  TSDB_TYPE_RSMA_L2 = 4,  // RSMA Level 2
} ETsdbType;

typedef struct {
  int8_t  precision;  // precision always be used with below keep cfgs
  int32_t days;
  int32_t keep0;
  int32_t keep1;
  int32_t keep2;
} STsdbKeepCfg;

H
save  
Hongze Cheng 已提交
175
struct SVnode {
H
refact  
Hongze Cheng 已提交
176 177 178
  char*      path;
  SVnodeCfg  config;
  SVState    state;
H
Hongze Cheng 已提交
179 180
  STfs*      pTfs;
  SMsgCb     msgCb;
H
Hongze Cheng 已提交
181 182 183 184
  SVBufPool* pPool;
  SVBufPool* inUse;
  SVBufPool* onCommit;
  SVBufPool* onRecycle;
H
refact  
Hongze Cheng 已提交
185 186
  SMeta*     pMeta;
  STsdb*     pTsdb;
C
Cary Xu 已提交
187 188
  STsdb*     pRSma1;
  STsdb*     pRSma2;
H
refact  
Hongze Cheng 已提交
189
  SWal*      pWal;
L
Liu Jicong 已提交
190 191
  STQ*       pTq;
  SSink*     pSink;
192
  int64_t    sync;
H
refact  
Hongze Cheng 已提交
193 194
  tsem_t     canCommit;
  SQHandle*  pQuery;
H
save  
Hongze Cheng 已提交
195 196
};

C
Cary Xu 已提交
197 198 199 200 201
#define VND_TSDB(vnd)       ((vnd)->pTsdb)
#define VND_RSMA0(vnd)      ((vnd)->pTsdb)
#define VND_RSMA1(vnd)      ((vnd)->pRSma1)
#define VND_RSMA2(vnd)      ((vnd)->pRSma2)
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
C
Cary Xu 已提交
202

H
Hongze Cheng 已提交
203 204
struct STbUidStore {
  tb_uid_t  suid;
C
Cary Xu 已提交
205
  tb_uid_t  uid;  // TODO: just for debugging, remove when uid provided in SSDataBlock
H
Hongze Cheng 已提交
206 207 208 209
  SArray*   tbUids;
  SHashObj* uidHash;
};

H
refact  
Hongze Cheng 已提交
210 211
#define TD_VID(PVNODE) (PVNODE)->config.vgId

C
Cary Xu 已提交
212
static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
C
Cary Xu 已提交
213 214 215
  SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
  return (pRetention->freq > 0 && pRetention->keep > 0);
}
C
Cary Xu 已提交
216

L
Liu Jicong 已提交
217
// sma
L
Liu Jicong 已提交
218
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
219

H
save  
Hongze Cheng 已提交
220 221 222 223
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
224
#endif /*_TD_VNODE_DEF_H_*/