vnode.h 6.6 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
S
Shengliang Guan 已提交
14 15 16 17 18
 */

#ifndef _TD_VNODE_H_
#define _TD_VNODE_H_

H
refact  
Hongze Cheng 已提交
19
#include "os.h"
S
Shengliang Guan 已提交
20
#include "tmsgcb.h"
L
Liu Jicong 已提交
21 22
#include "tqueue.h"
#include "trpc.h"
H
refact  
Hongze Cheng 已提交
23 24

#include "meta.h"
H
refact  
Hongze Cheng 已提交
25
#include "tarray.h"
S
Shengliang Guan 已提交
26
#include "tfs.h"
H
refact  
Hongze Cheng 已提交
27
#include "tsdb.h"
H
refact  
Hongze Cheng 已提交
28
#include "wal.h"
S
Shengliang Guan 已提交
29

S
Shengliang Guan 已提交
30 31 32 33
#ifdef __cplusplus
extern "C" {
#endif

H
save  
Hongze Cheng 已提交
34
/* ------------------------ TYPES EXPOSED ------------------------ */
S
shm  
Shengliang Guan 已提交
35 36
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SVnode       SVnode;
L
Liu Jicong 已提交
37
typedef struct {
L
Liu Jicong 已提交
38 39 40 41
  // TODO
  int32_t reserved;
} STqCfg;

L
Liu Jicong 已提交
42
typedef struct {
H
Hongze Cheng 已提交
43
  int32_t  vgId;
D
dapan1121 已提交
44
  uint64_t dbId;
S
Shengliang Guan 已提交
45
  STfs    *pTfs;
H
Hongze Cheng 已提交
46 47 48 49
  uint64_t wsize;
  uint64_t ssize;
  uint64_t lsize;
  bool     isHeapAllocator;
H
more  
Hongze Cheng 已提交
50 51
  uint32_t ttl;
  uint32_t keep;
L
Liu Jicong 已提交
52
  int8_t   streamMode;
H
Hongze Cheng 已提交
53
  bool     isWeak;
H
more  
Hongze Cheng 已提交
54 55
  STsdbCfg tsdbCfg;
  SMetaCfg metaCfg;
H
Hongze Cheng 已提交
56 57
  STqCfg   tqCfg;
  SWalCfg  walCfg;
S
Shengliang Guan 已提交
58
  SMsgCb   msgCb;
D
dapan1121 已提交
59 60
  uint32_t hashBegin;
  uint32_t hashEnd;
L
Liu Jicong 已提交
61
  int8_t   hashMethod;
H
more  
Hongze Cheng 已提交
62
} SVnodeCfg;
H
save  
Hongze Cheng 已提交
63

L
Liu Jicong 已提交
64
typedef struct {
65
  int64_t           ver;
L
Liu Jicong 已提交
66
  int64_t           tbUid;
67
  SHashObj         *tbIdHash;
S
Shengliang Guan 已提交
68
  const SSubmitReq *pMsg;
69 70 71 72 73 74 75 76
  SSubmitBlk       *pBlock;
  SSubmitMsgIter    msgIter;
  SSubmitBlkIter    blkIter;
  SMeta            *pVnodeMeta;
  SArray           *pColIdList;  // SArray<int32_t>
  int32_t           sver;
  SSchemaWrapper   *pSchemaWrapper;
  STSchema         *pSchema;
L
Liu Jicong 已提交
77 78
} STqReadHandle;

H
save  
Hongze Cheng 已提交
79
/* ------------------------ SVnode ------------------------ */
H
more  
Hongze Cheng 已提交
80 81
/**
 * @brief Initialize the vnode module
H
more  
Hongze Cheng 已提交
82
 *
H
more  
Hongze Cheng 已提交
83 84
 * @return int 0 for success and -1 for failure
 */
S
Shengliang Guan 已提交
85
int vnodeInit();
H
more  
Hongze Cheng 已提交
86 87

/**
S
Shengliang Guan 已提交
88
 * @brief Cleanup the vnode module
H
more  
Hongze Cheng 已提交
89
 *
H
more  
Hongze Cheng 已提交
90
 */
S
Shengliang Guan 已提交
91
void vnodeCleanup();
H
more  
Hongze Cheng 已提交
92

H
refact  
Hongze Cheng 已提交
93 94 95 96
/**
 * @brief Open a VNODE.
 *
 * @param path path of the vnode
H
refact  
Hongze Cheng 已提交
97
 * @param pVnodeCfg options of the vnode
H
refact  
Hongze Cheng 已提交
98 99
 * @return SVnode* The vnode object
 */
S
Shengliang Guan 已提交
100
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
H
refact  
Hongze Cheng 已提交
101 102 103 104

/**
 * @brief Close a VNODE
 *
H
more  
Hongze Cheng 已提交
105
 * @param pVnode The vnode object to close
H
refact  
Hongze Cheng 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
 */
void vnodeClose(SVnode *pVnode);

/**
 * @brief Destroy a VNODE.
 *
 * @param path Path of the VNODE.
 */
void vnodeDestroy(const char *path);

/**
 * @brief Process an array of write messages.
 *
 * @param pVnode The vnode object.
 * @param pMsgs The array of SRpcMsg
 */
S
Shengliang Guan 已提交
122
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
H
refact  
Hongze Cheng 已提交
123 124 125 126 127 128

/**
 * @brief Apply a write request message.
 *
 * @param pVnode The vnode object.
 * @param pMsg The request message
H
more  
Hongze Cheng 已提交
129
 * @param pRsp The response message
H
refact  
Hongze Cheng 已提交
130 131 132 133
 * @return int 0 for success, -1 for failure
 */
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);

L
Liu Jicong 已提交
134 135 136 137 138 139 140 141 142 143
/**
 * @brief Process a consume message.
 *
 * @param pVnode The vnode object.
 * @param pMsg The request message
 * @param pRsp The response message
 * @return int 0 for success, -1 for failure
 */
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);

H
refact  
Hongze Cheng 已提交
144 145 146 147 148 149 150 151 152
/**
 * @brief Process the sync request
 *
 * @param pVnode
 * @param pMsg
 * @param pRsp
 * @return int
 */
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
H
save  
Hongze Cheng 已提交
153

154 155 156 157 158 159 160
/**
 * @brief Process a query message.
 *
 * @param pVnode The vnode object.
 * @param pMsg The request message
 * @return int 0 for success, -1 for failure
 */
S
Shengliang 已提交
161
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
162 163 164 165 166 167 168 169

/**
 * @brief Process a fetch message.
 *
 * @param pVnode The vnode object.
 * @param pMsg The request message
 * @return int 0 for success, -1 for failure
 */
L
Liu Jicong 已提交
170
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
171

H
refact  
Hongze Cheng 已提交
172
/* ------------------------ SVnodeCfg ------------------------ */
H
refact  
Hongze Cheng 已提交
173 174 175 176 177
/**
 * @brief Initialize VNODE options.
 *
 * @param pOptions The options object to be initialized. It should not be NULL.
 */
H
refact  
Hongze Cheng 已提交
178
void vnodeOptionsInit(SVnodeCfg *pOptions);
H
refact  
Hongze Cheng 已提交
179 180 181 182 183 184

/**
 * @brief Clear VNODE options.
 *
 * @param pOptions Options to clear.
 */
H
refact  
Hongze Cheng 已提交
185
void vnodeOptionsClear(SVnodeCfg *pOptions);
H
save  
Hongze Cheng 已提交
186

D
dapan1121 已提交
187
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
D
dapan1121 已提交
188

H
refact  
Hongze Cheng 已提交
189 190
/* ------------------------ FOR COMPILE ------------------------ */

S
Shengliang Guan 已提交
191 192 193
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
S
Shengliang Guan 已提交
194
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
S
Shengliang Guan 已提交
195

L
Liu Jicong 已提交
196
/* ------------------------- TQ READ --------------------------- */
L
Liu Jicong 已提交
197

L
Liu Jicong 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
enum {
  TQ_STREAM_TOKEN__DATA = 1,
  TQ_STREAM_TOKEN__WATERMARK,
  TQ_STREAM_TOKEN__CHECKPOINT,
};

typedef struct {
  int8_t type;
  int8_t reserved[7];
  union {
    void   *data;
    int64_t wmTs;
    int64_t checkpointId;
  };
} STqStreamToken;

214
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
L
Liu Jicong 已提交
215

216
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {
L
Liu Jicong 已提交
217 218 219
  pReadHandle->pColIdList = pColIdList;
}

L
Liu Jicong 已提交
220 221
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
// pHandle->tbUid = tbUid;
L
Liu Jicong 已提交
222
//}
L
Liu Jicong 已提交
223

224
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
L
fix  
Liu Jicong 已提交
225 226 227 228
  if (pHandle->tbIdHash) {
    taosHashClear(pHandle->tbIdHash);
  }

L
Liu Jicong 已提交
229
  pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
L
Liu Jicong 已提交
230
  if (pHandle->tbIdHash == NULL) {
231
    terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
232 233
    return -1;
  }
234

L
Liu Jicong 已提交
235
  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
236
    int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
L
Liu Jicong 已提交
237 238
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }
239

L
Liu Jicong 已提交
240 241 242
  return 0;
}

L
fix  
Liu Jicong 已提交
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
  if (pHandle->tbIdHash == NULL) {
    pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
    if (pHandle->tbIdHash == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
  }

  for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
    int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
    taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
  }

  return 0;
}

L
Liu Jicong 已提交
260 261 262
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool    tqNextDataBlock(STqReadHandle *pHandle);
int     tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
L
Liu Jicong 已提交
263
// return SArray<SColumnInfoData>
264
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
L
Liu Jicong 已提交
265

S
Shengliang Guan 已提交
266 267 268 269
#ifdef __cplusplus
}
#endif

270
#endif /*_TD_VNODE_H_*/