vnodeInt.h 19.0 KB
Newer Older
H
save  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
refact  
Hongze Cheng 已提交
16 17
#ifndef _TD_VNODE_DEF_H_
#define _TD_VNODE_DEF_H_
H
save  
Hongze Cheng 已提交
18

H
Hongze Cheng 已提交
19
#include "executor.h"
H
Hongze Cheng 已提交
20 21
#include "filter.h"
#include "qworker.h"
22
#include "rocksdb/c.h"
H
Hongze Cheng 已提交
23
#include "sync.h"
H
Hongze Cheng 已提交
24
#include "tRealloc.h"
H
Hongze Cheng 已提交
25
#include "tchecksum.h"
H
more  
Hongze Cheng 已提交
26
#include "tcoding.h"
H
Hongze Cheng 已提交
27
#include "tcompare.h"
H
Hongze Cheng 已提交
28
#include "tcompression.h"
L
Liu Jicong 已提交
29
#include "tdatablock.h"
H
Hongze Cheng 已提交
30
#include "tdb.h"
H
Hongze Cheng 已提交
31
#include "tencode.h"
H
refact  
Hongze Cheng 已提交
32
#include "tfs.h"
H
Hongze Cheng 已提交
33
#include "tglobal.h"
H
Hongze Cheng 已提交
34
#include "tjson.h"
H
Hongze Cheng 已提交
35
#include "tlist.h"
H
refact  
Hongze Cheng 已提交
36
#include "tlockfree.h"
H
Hongze Cheng 已提交
37
#include "tlosertree.h"
H
Hongze Cheng 已提交
38
#include "tlrucache.h"
39
#include "tmsgcb.h"
H
Hongze Cheng 已提交
40
#include "trbtree.h"
H
Hongze Cheng 已提交
41
#include "tref.h"
H
Hongze Cheng 已提交
42
#include "tskiplist.h"
H
Hongze Cheng 已提交
43
#include "tstream.h"
H
Hongze Cheng 已提交
44
#include "ttime.h"
H
Hongze Cheng 已提交
45 46
#include "ttimer.h"
#include "wal.h"
H
save  
Hongze Cheng 已提交
47

H
Hongze Cheng 已提交
48 49
#include "vnode.h"

H
save  
Hongze Cheng 已提交
50 51 52 53
#ifdef __cplusplus
extern "C" {
#endif

L
Liu Jicong 已提交
54 55 56 57 58 59
typedef struct SVnodeInfo         SVnodeInfo;
typedef struct SMeta              SMeta;
typedef struct SSma               SSma;
typedef struct STsdb              STsdb;
typedef struct STQ                STQ;
typedef struct SVState            SVState;
C
Cary Xu 已提交
60
typedef struct SVStatis           SVStatis;
L
Liu Jicong 已提交
61
typedef struct SVBufPool          SVBufPool;
62
typedef struct SQueueWorker       SQHandle;
L
Liu Jicong 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75
typedef struct STsdbKeepCfg       STsdbKeepCfg;
typedef struct SMetaSnapReader    SMetaSnapReader;
typedef struct SMetaSnapWriter    SMetaSnapWriter;
typedef struct STsdbSnapReader    STsdbSnapReader;
typedef struct STsdbSnapWriter    STsdbSnapWriter;
typedef struct STqSnapReader      STqSnapReader;
typedef struct STqSnapWriter      STqSnapWriter;
typedef struct STqOffsetReader    STqOffsetReader;
typedef struct STqOffsetWriter    STqOffsetWriter;
typedef struct SStreamTaskReader  SStreamTaskReader;
typedef struct SStreamTaskWriter  SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader;
typedef struct SStreamStateWriter SStreamStateWriter;
76 77
typedef struct SRSmaSnapReader    SRSmaSnapReader;
typedef struct SRSmaSnapWriter    SRSmaSnapWriter;
L
Liu Jicong 已提交
78
typedef struct SSnapDataHdr       SSnapDataHdr;
H
Hongze Cheng 已提交
79
typedef struct SCommitInfo        SCommitInfo;
H
Hongze Cheng 已提交
80
typedef struct SCompactInfo       SCompactInfo;
H
Hongze Cheng 已提交
81
typedef struct SQueryNode         SQueryNode;
H
refact  
Hongze Cheng 已提交
82

C
Cary Xu 已提交
83 84 85 86 87
#define VNODE_META_DIR  "meta"
#define VNODE_TSDB_DIR  "tsdb"
#define VNODE_TQ_DIR    "tq"
#define VNODE_WAL_DIR   "wal"
#define VNODE_TSMA_DIR  "tsma"
C
Cary Xu 已提交
88
#define VNODE_RSMA_DIR  "rsma"
C
Cary Xu 已提交
89
#define VNODE_RSMA0_DIR "tsdb"
C
Cary Xu 已提交
90 91
#define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2"
H
Hongze Cheng 已提交
92

H
Hongze Cheng 已提交
93 94
#define VNODE_BUFPOOL_SEGMENTS 3

H
Hongze Cheng 已提交
95
#define VND_INFO_FNAME "vnode.json"
96
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
H
Hongze Cheng 已提交
97

H
Hongze Cheng 已提交
98
// vnd.h
H
Hongze Cheng 已提交
99 100 101 102 103 104 105
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
struct SQueryNode {
  SQueryNode*          pNext;
  SQueryNode**         ppNext;
  void*                pQHandle;
  _query_reseek_func_t reseek;
};
106

107 108 109 110
#if 1  // refact APIs below (TODO)
typedef SVCreateTbReq   STbCfg;
typedef SVCreateTSmaReq SSmaCfg;

111 112 113 114 115 116
SMTbCursor* metaOpenTbCursor(void* pVnode);
void        metaCloseTbCursor(SMTbCursor* pTbCur);
void        metaPauseTbCursor(SMTbCursor* pTbCur);
void        metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first);
int32_t     metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
int32_t     metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
117 118 119

#endif

H
Hongze Cheng 已提交
120
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
121
void* vnodeBufPoolMallocAligned(SVBufPool* pPool, int size);
H
Hongze Cheng 已提交
122
void  vnodeBufPoolFree(SVBufPool* pPool, void* p);
H
Hongze Cheng 已提交
123
void  vnodeBufPoolRef(SVBufPool* pPool);
H
Hongze Cheng 已提交
124
void  vnodeBufPoolUnRef(SVBufPool* pPool, bool proactive);
H
Hongze Cheng 已提交
125
int   vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo);
H
Hongze Cheng 已提交
126

H
Hongze Cheng 已提交
127
int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode);
H
Hongze Cheng 已提交
128
void    vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive);
H
Hongze Cheng 已提交
129

H
Hongze Cheng 已提交
130 131
// meta
typedef struct SMCtbCursor SMCtbCursor;
C
Cary Xu 已提交
132
typedef struct SMStbCursor SMStbCursor;
H
Hongze Cheng 已提交
133 134
typedef struct STbUidStore STbUidStore;

135 136 137 138
#define META_BEGIN_HEAP_BUFFERPOOL 0
#define META_BEGIN_HEAP_OS         1
#define META_BEGIN_HEAP_NIL        2

H
Hongze Cheng 已提交
139
int             metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
S
Shungang Li 已提交
140
int             metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
141
int             metaClose(SMeta** pMeta);
142
int             metaBegin(SMeta* pMeta, int8_t fromSys);
143 144 145
TXN*            metaGetTxn(SMeta* pMeta);
int             metaCommit(SMeta* pMeta, TXN* txn);
int             metaFinishCommit(SMeta* pMeta, TXN* txn);
146
int             metaPrepareAsyncCommit(SMeta* pMeta);
147
int             metaAbort(SMeta* pMeta);
H
Hongze Cheng 已提交
148
int             metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
H
Hongze Cheng 已提交
149
int             metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
150
int             metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
L
Liu Jicong 已提交
151
int             metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
H
Hongze Cheng 已提交
152
int             metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
M
Minglei Jin 已提交
153
int32_t         metaTrimTables(SMeta* pMeta);
154
int             metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids);
H
more  
Hongze Cheng 已提交
155
int             metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
156
int             metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
157 158
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema*       metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
H
Hongze Cheng 已提交
159
int32_t         metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
H
Hongze Cheng 已提交
160
int             metaGetTableEntryByName(SMetaReader* pReader, const char* name);
H
Hongze Cheng 已提交
161
int             metaAlterCache(SMeta* pMeta, int32_t nPage);
dengyihao's avatar
dengyihao 已提交
162

163 164
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
165

dengyihao's avatar
dengyihao 已提交
166
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
dengyihao's avatar
dengyihao 已提交
167
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
dengyihao's avatar
dengyihao 已提交
168

dengyihao's avatar
dengyihao 已提交
169
int64_t       metaGetTimeSeriesNum(SMeta* pMeta);
170
SMCtbCursor*  metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
171
void          metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
dengyihao's avatar
dengyihao 已提交
172 173 174 175 176 177 178 179 180 181
tb_uid_t      metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor*  metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
void          metaCloseStbCursor(SMStbCursor* pStbCur);
tb_uid_t      metaStbCursorNext(SMStbCursor* pStbCur);
STSma*        metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray*       metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray*       metaGetSmaTbUids(SMeta* pMeta);
void*         metaGetIdx(SMeta* pMeta);
void*         metaGetIvtIdx(SMeta* pMeta);
C
Cary Xu 已提交
182

H
Hongze Cheng 已提交
183 184
int64_t metaGetTbNum(SMeta* pMeta);
void    metaReaderDoInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
185

186 187
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
H
Hongze Cheng 已提交
188

H
Hongze Cheng 已提交
189 190 191 192 193 194
typedef struct SMetaInfo {
  int64_t uid;
  int64_t suid;
  int64_t version;
  int32_t skmVer;
} SMetaInfo;
195
int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo, SMetaReader* pReader);
H
Hongze Cheng 已提交
196

H
Hongze Cheng 已提交
197
// tsdb
198 199 200
int     tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int     tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
H
Hongze Cheng 已提交
201 202
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
203
int32_t tsdbCacheCommit(STsdb* pTsdb);
H
Hongze Cheng 已提交
204
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
H
Hongze Cheng 已提交
205 206
// int32_t tsdbFinishCommit(STsdb* pTsdb);
// int32_t tsdbRollbackCommit(STsdb* pTsdb);
207 208
int     tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int     tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
H
Hongze Cheng 已提交
209
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);
210 211
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
H
Hongze Cheng 已提交
212 213

// tq
X
Xiaoyu Wang 已提交
214 215 216
int  tqInit();
void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
H
Haojun Liao 已提交
217
void tqNotifyClose(STQ*);
X
Xiaoyu Wang 已提交
218 219
void tqClose(STQ*);
int  tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
220
int  tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
221
int  tqUnregisterPushHandle(STQ* pTq, void* pHandle);
222
int  tqStartStreamTasks(STQ* pTq);  // restore all stream tasks after vnode launching completed.
223
int  tqCheckStreamStatus(STQ* pTq);
224

H
Hongze Cheng 已提交
225
int     tqCommit(STQ*);
226
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
L
Liu Jicong 已提交
227
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
228 229 230
// tq-mq
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
L
Liu Jicong 已提交
231 232
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
233
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
234
int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
235
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
236
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
237
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
238
int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg);
239

240 241 242
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
5
54liuyao 已提交
243 244
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
245
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
246
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
L
Liu Jicong 已提交
247
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
248
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
L
Liu Jicong 已提交
249
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
250 251
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
252
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
253 254
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
L
Liu Jicong 已提交
255
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
H
Hongze Cheng 已提交
256

C
Cary Xu 已提交
257
// sma
C
Cary Xu 已提交
258 259
int32_t smaInit();
void    smaCleanUp();
H
Hongze Cheng 已提交
260
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
261
int32_t smaClose(SSma* pSma);
C
Cary Xu 已提交
262
int32_t smaBegin(SSma* pSma);
K
kailixu 已提交
263
int32_t smaPrepareAsyncCommit(SSma* pSma);
H
Hongze Cheng 已提交
264
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
C
Cary Xu 已提交
265
int32_t smaFinishCommit(SSma* pSma);
C
Cary Xu 已提交
266
int32_t smaPostCommit(SSma* pSma);
C
Cary Xu 已提交
267
int32_t smaDoRetention(SSma* pSma, int64_t now);
C
Cary Xu 已提交
268

C
Cary Xu 已提交
269
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
270 271
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);

272
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
K
kailixu 已提交
273
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
274
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
275
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
276
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
277 278
void*   tdUidStoreFree(STbUidStore* pStore);

H
Hongze Cheng 已提交
279 280 281
// SMetaSnapReader ========================================
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader);
H
Hongze Cheng 已提交
282
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData);
H
Hongze Cheng 已提交
283
// SMetaSnapWriter ========================================
H
Hongze Cheng 已提交
284 285 286
int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWriter** ppWriter);
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
H
Hongze Cheng 已提交
287
// STsdbSnapReader ========================================
C
Cary Xu 已提交
288
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, STsdbSnapReader** ppReader);
H
Hongze Cheng 已提交
289
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
H
Hongze Cheng 已提交
290
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
H
Hongze Cheng 已提交
291
// STsdbSnapWriter ========================================
H
Hongze Cheng 已提交
292
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter);
H
Hongze Cheng 已提交
293
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
H
Hongze Cheng 已提交
294
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
H
Hongze Cheng 已提交
295
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
L
Liu Jicong 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
// STqSnapshotReader ==
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader);
int32_t tqSnapReaderClose(STqSnapReader** ppReader);
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
// STqSnapshotWriter ======================================
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqOffsetReader ========================================
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData);
// STqOffsetWriter ========================================
int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter);
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback);
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData);
// SStreamTaskWriter ======================================
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================
316 317 318 319 320 321 322 323
// SRSmaSnapReader ========================================
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader);
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
H
Hongze Cheng 已提交
324

L
Liu Jicong 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
typedef struct {
  int8_t  streamType;  // sma or other
  int8_t  dstType;
  int16_t padding;
  int32_t smaId;
  int64_t tbUid;
  int64_t lastReceivedVer;
  int64_t lastCommittedVer;
} SStreamSinkInfo;

typedef struct {
  SVnode*   pVnode;
  SHashObj* pHash;  // streamId -> SStreamSinkInfo
} SSink;

H
refact  
Hongze Cheng 已提交
340 341 342 343
// SVState
struct SVState {
  int64_t committed;
  int64_t applied;
H
Hongze Cheng 已提交
344
  int64_t applyTerm;
H
Hongze Cheng 已提交
345
  int64_t commitID;
H
Hongze Cheng 已提交
346
  int64_t commitTerm;
H
refact  
Hongze Cheng 已提交
347 348
};

C
Cary Xu 已提交
349
struct SVStatis {
C
Cary Xu 已提交
350 351 352 353
  int64_t nInsert;              // delta
  int64_t nInsertSuccess;       // delta
  int64_t nBatchInsert;         // delta
  int64_t nBatchInsertSuccess;  // delta
C
Cary Xu 已提交
354 355
};

H
Hongze Cheng 已提交
356 357 358
struct SVnodeInfo {
  SVnodeCfg config;
  SVState   state;
C
Cary Xu 已提交
359
  SVStatis  statis;
H
Hongze Cheng 已提交
360 361
};

C
Cary Xu 已提交
362 363 364 365 366 367 368 369
typedef enum {
  TSDB_TYPE_TSDB = 0,     // TSDB
  TSDB_TYPE_TSMA = 1,     // TSMA
  TSDB_TYPE_RSMA_L0 = 2,  // RSMA Level 0
  TSDB_TYPE_RSMA_L1 = 3,  // RSMA Level 1
  TSDB_TYPE_RSMA_L2 = 4,  // RSMA Level 2
} ETsdbType;

L
Liu Jicong 已提交
370
struct STsdbKeepCfg {
C
Cary Xu 已提交
371 372 373 374 375
  int8_t  precision;  // precision always be used with below keep cfgs
  int32_t days;
  int32_t keep0;
  int32_t keep1;
  int32_t keep2;
376
};
C
Cary Xu 已提交
377

378 379 380 381 382
typedef struct SVCommitSched {
  int64_t commitMs;
  int64_t maxWaitMs;
} SVCommitSched;

H
save  
Hongze Cheng 已提交
383
struct SVnode {
H
Hongze Cheng 已提交
384 385 386 387 388
  char*     path;
  SVnodeCfg config;
  SVState   state;
  SVStatis  statis;
  STfs*     pTfs;
389
  int32_t   diskPrimary;
H
Hongze Cheng 已提交
390 391 392
  SMsgCb    msgCb;

  // Buffer Pool
H
Hongze Cheng 已提交
393 394
  TdThreadMutex mutex;
  TdThreadCond  poolNotEmpty;
H
Hongze Cheng 已提交
395 396
  SVBufPool*    aBufPool[VNODE_BUFPOOL_SEGMENTS];
  SVBufPool*    freeList;
H
Hongze Cheng 已提交
397
  SVBufPool*    inUse;
H
Hongze Cheng 已提交
398 399 400
  SVBufPool*    onCommit;
  SVBufPool*    recycleHead;
  SVBufPool*    recycleTail;
H
Hongze Cheng 已提交
401
  SVBufPool*    onRecycle;
H
Hongze Cheng 已提交
402

H
Hongze Cheng 已提交
403 404 405 406 407 408 409 410
  SMeta*        pMeta;
  SSma*         pSma;
  STsdb*        pTsdb;
  SWal*         pWal;
  STQ*          pTq;
  SSink*        pSink;
  tsem_t        canCommit;
  int64_t       sync;
411
  TdThreadMutex lock;
412
  bool          blocked;
413
  bool          restored;
H
Hongze Cheng 已提交
414
  tsem_t        syncSem;
415 416
  int32_t       blockSec;
  int64_t       blockSeq;
H
Hongze Cheng 已提交
417
  SQHandle*     pQuery;
H
save  
Hongze Cheng 已提交
418 419
};

C
Cary Xu 已提交
420
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
421

C
Cary Xu 已提交
422 423
#define VND_TSDB(vnd)       ((vnd)->pTsdb)
#define VND_RSMA0(vnd)      ((vnd)->pTsdb)
C
Cary Xu 已提交
424 425
#define VND_RSMA1(vnd)      ((vnd)->pSma->pRSmaTsdb[TSDB_RETENTION_L0])
#define VND_RSMA2(vnd)      ((vnd)->pSma->pRSmaTsdb[TSDB_RETENTION_L1])
C
Cary Xu 已提交
426
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
C
Cary Xu 已提交
427 428
#define VND_IS_RSMA(v)      ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v)      ((v)->config.isTsma == 1)
C
Cary Xu 已提交
429

430 431 432 433
#define TSDB_CACHE_NO(c)       ((c).cacheLast == 0)
#define TSDB_CACHE_LAST_ROW(c) (((c).cacheLast & 1) > 0)
#define TSDB_CACHE_LAST(c)     (((c).cacheLast & 2) > 0)

H
Hongze Cheng 已提交
434 435 436 437 438 439
struct STbUidStore {
  tb_uid_t  suid;
  SArray*   tbUids;
  SHashObj* uidHash;
};

440 441 442 443
struct SSma {
  bool          locked;
  TdThreadMutex mutex;
  SVnode*       pVnode;
C
Cary Xu 已提交
444
  STsdb*        pRSmaTsdb[TSDB_RETENTION_L2];
445 446 447 448 449 450
  void*         pTSmaEnv;
  void*         pRSmaEnv;
};

#define SMA_CFG(s)        (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s)   (&(s)->pVnode->config.tsdbCfg)
H
Hongze Cheng 已提交
451
#define SMA_RETENTION(s)  ((SRetention*)&(s)->pVnode->config.tsdbCfg.retentions)
452 453 454 455 456 457 458
#define SMA_LOCKED(s)     ((s)->locked)
#define SMA_META(s)       ((s)->pVnode->pMeta)
#define SMA_VID(s)        TD_VID((s)->pVnode)
#define SMA_TFS(s)        ((s)->pVnode->pTfs)
#define SMA_TSMA_ENV(s)   ((s)->pTSmaEnv)
#define SMA_RSMA_ENV(s)   ((s)->pRSmaEnv)
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
C
Cary Xu 已提交
459 460
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0])
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1])
H
refact  
Hongze Cheng 已提交
461

L
Liu Jicong 已提交
462
// sma
L
Liu Jicong 已提交
463
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
L
Liu Jicong 已提交
464

C
Cary Xu 已提交
465
enum {
H
Hongze Cheng 已提交
466
  SNAP_DATA_CFG = 0,
C
Cary Xu 已提交
467 468 469 470 471 472 473 474 475 476
  SNAP_DATA_META = 1,
  SNAP_DATA_TSDB = 2,
  SNAP_DATA_DEL = 3,
  SNAP_DATA_RSMA1 = 4,
  SNAP_DATA_RSMA2 = 5,
  SNAP_DATA_QTASK = 6,
  SNAP_DATA_TQ_HANDLE = 7,
  SNAP_DATA_TQ_OFFSET = 8,
  SNAP_DATA_STREAM_TASK = 9,
  SNAP_DATA_STREAM_STATE = 10,
C
Cary Xu 已提交
477 478
};

H
Hongze Cheng 已提交
479 480
struct SSnapDataHdr {
  int8_t  type;
K
kailixu 已提交
481
  int8_t  flag;
H
Hongze Cheng 已提交
482
  int64_t index;
H
Hongze Cheng 已提交
483 484 485 486
  int64_t size;
  uint8_t data[];
};

H
Hongze Cheng 已提交
487 488 489
struct SCommitInfo {
  SVnodeInfo info;
  SVnode*    pVnode;
490
  TXN*       txn;
H
Hongze Cheng 已提交
491
};
H
Hongze Cheng 已提交
492

H
Hongze Cheng 已提交
493
struct SCompactInfo {
H
Hongze Cheng 已提交
494 495 496 497
  SVnode*     pVnode;
  int32_t     flag;
  int64_t     commitID;
  STimeWindow tw;
H
Hongze Cheng 已提交
498
};
H
Hongze Cheng 已提交
499

500 501
void initStorageAPI(SStorageAPI* pAPI);

H
save  
Hongze Cheng 已提交
502 503 504 505
#ifdef __cplusplus
}
#endif

L
Liu Jicong 已提交
506
#endif /*_TD_VNODE_DEF_H_*/