sdb.h 12.7 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_SDB_H_
#define _TD_SDB_H_
S
Shengliang Guan 已提交
18

L
Liu Jicong 已提交
19 20
#include "os.h"

21 22 23 24
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
25
#include "wal.h"
26

S
Shengliang Guan 已提交
27 28 29
#ifdef __cplusplus
extern "C" {
#endif
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31 32 33 34 35 36 37 38 39
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...)  { if (mDebugFlag & DEBUG_WARN)  { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...)  { if (mDebugFlag & DEBUG_INFO)  { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on

40 41 42 43 44 45
#define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \
  {                                                       \
    if (func(pRaw, dataPos, val) != 0) {                  \
      goto pos;                                           \
    }                                                     \
    dataPos += sizeof(type);                              \
S
Shengliang Guan 已提交
46 47
  }

48
#define SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos)     \
S
Shengliang Guan 已提交
49 50
  {                                                         \
    if (sdbGetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
51
      goto pos;                                             \
S
Shengliang Guan 已提交
52 53
    }                                                       \
    dataPos += valLen;                                      \
S
Shengliang Guan 已提交
54 55
  }

56 57 58
#define SDB_GET_INT64(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt64, int64_t)
#define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t)
#define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t)
S
Shengliang Guan 已提交
59
#define SDB_GET_INT8(pData, dataPos, val, pos)  SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t)
S
Shengliang Guan 已提交
60

61 62 63 64
#define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \
  {                                                 \
    char val[valLen] = {0};                         \
    SDB_GET_BINARY(pRaw, dataPos, val, valLen, pos) \
S
Shengliang Guan 已提交
65 66
  }

67 68 69 70 71 72
#define SDB_SET_VAL(pRaw, dataPos, val, pos, func, type) \
  {                                                      \
    if (func(pRaw, dataPos, val) != 0) {                 \
      goto pos;                                          \
    }                                                    \
    dataPos += sizeof(type);                             \
S
Shengliang Guan 已提交
73 74
  }

75 76 77
#define SDB_SET_INT64(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt64, int64_t)
#define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t)
#define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t)
S
Shengliang Guan 已提交
78
#define SDB_SET_INT8(pRaw, dataPos, val, pos)  SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t)
79 80

#define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos)     \
S
Shengliang Guan 已提交
81 82
  {                                                         \
    if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
83
      goto pos;                                             \
S
Shengliang Guan 已提交
84 85 86 87
    }                                                       \
    dataPos += valLen;                                      \
  }

88 89 90 91
#define SDB_SET_RESERVE(pRaw, dataPos, valLen, pos) \
  {                                                 \
    char val[valLen] = {0};                         \
    SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \
S
Shengliang Guan 已提交
92 93
  }

94
#define SDB_SET_DATALEN(pRaw, dataLen, pos)     \
S
Shengliang Guan 已提交
95 96
  {                                             \
    if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
97
      goto pos;                                 \
S
Shengliang Guan 已提交
98
    }                                           \
S
Shengliang Guan 已提交
99 100
  }

S
Shengliang Guan 已提交
101
typedef struct SMnode  SMnode;
S
Shengliang Guan 已提交
102
typedef struct SSdb    SSdb;
S
Shengliang Guan 已提交
103 104
typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow;
S
Shengliang Guan 已提交
105 106 107 108 109 110 111
typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
112 113 114 115 116 117 118

typedef enum {
  SDB_KEY_BINARY = 1,
  SDB_KEY_INT32 = 2,
  SDB_KEY_INT64 = 3,
} EKeyType;

S
Shengliang Guan 已提交
119
typedef enum {
120
  SDB_STATUS_INIT = 0,
S
Shengliang Guan 已提交
121
  SDB_STATUS_CREATING = 1,
S
Shengliang Guan 已提交
122 123
  SDB_STATUS_DROPPING = 2,
  SDB_STATUS_DROPPED = 3,
S
Shengliang Guan 已提交
124
  SDB_STATUS_READY = 4,
S
Shengliang Guan 已提交
125 126
} ESdbStatus;

S
Shengliang Guan 已提交
127
typedef enum {
S
Shengliang Guan 已提交
128 129 130 131 132 133 134 135 136 137
  SDB_TRANS = 0,
  SDB_CLUSTER = 1,
  SDB_MNODE = 2,
  SDB_QNODE = 3,
  SDB_SNODE = 4,
  SDB_BNODE = 5,
  SDB_DNODE = 6,
  SDB_USER = 7,
  SDB_AUTH = 8,
  SDB_ACCT = 9,
L
Liu Jicong 已提交
138 139 140 141 142 143
  SDB_STREAM = 10,
  SDB_OFFSET = 11,
  SDB_SUBSCRIBE = 12,
  SDB_CONSUMER = 13,
  SDB_TOPIC = 14,
  SDB_VGROUP = 15,
S
sma  
Shengliang Guan 已提交
144 145 146 147 148
  SDB_SMA = 16,
  SDB_STB = 17,
  SDB_DB = 18,
  SDB_FUNC = 19,
  SDB_MAX = 20
S
Shengliang Guan 已提交
149 150
} ESdbType;

S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
typedef struct SSdbRaw {
  int8_t  type;
  int8_t  status;
  int8_t  sver;
  int8_t  reserved;
  int32_t dataLen;
  char    pData[];
} SSdbRaw;

typedef struct SSdbRow {
  ESdbType   type;
  ESdbStatus status;
  int32_t    refCount;
  char       pObj[];
} SSdbRow;

typedef struct SSdb {
  SMnode        *pMnode;
169
  SWal          *pWal;
S
Shengliang Guan 已提交
170 171 172
  char          *currDir;
  char          *tmpDir;
  int64_t        lastCommitVer;
173
  int64_t        lastCommitTerm;
S
Shengliang Guan 已提交
174 175
  int64_t        curVer;
  int64_t        curTerm;
S
Shengliang Guan 已提交
176
  int64_t        curConfig;
S
Shengliang Guan 已提交
177 178 179 180 181 182 183 184 185 186 187
  int64_t        tableVer[SDB_MAX];
  int64_t        maxId[SDB_MAX];
  EKeyType       keyTypes[SDB_MAX];
  SHashObj      *hashObjs[SDB_MAX];
  TdThreadRwlock locks[SDB_MAX];
  SdbInsertFp    insertFps[SDB_MAX];
  SdbUpdateFp    updateFps[SDB_MAX];
  SdbDeleteFp    deleteFps[SDB_MAX];
  SdbDeployFp    deployFps[SDB_MAX];
  SdbEncodeFp    encodeFps[SDB_MAX];
  SdbDecodeFp    decodeFps[SDB_MAX];
S
Shengliang Guan 已提交
188
  TdThreadMutex  filelock;
S
Shengliang Guan 已提交
189 190 191 192
} SSdb;

typedef struct SSdbIter {
  TdFilePtr file;
S
Shengliang Guan 已提交
193
  int64_t   total;
S
Shengliang Guan 已提交
194
  char     *name;
S
Shengliang Guan 已提交
195
} SSdbIter;
S
Shengliang Guan 已提交
196

S
Shengliang Guan 已提交
197
typedef struct {
S
Shengliang Guan 已提交
198 199
  ESdbType    sdbType;
  EKeyType    keyType;
S
Shengliang Guan 已提交
200 201 202 203 204 205
  SdbDeployFp deployFp;
  SdbEncodeFp encodeFp;
  SdbDecodeFp decodeFp;
  SdbInsertFp insertFp;
  SdbUpdateFp updateFp;
  SdbDeleteFp deleteFp;
S
Shengliang Guan 已提交
206
} SSdbTable;
S
Shengliang Guan 已提交
207

S
Shengliang Guan 已提交
208 209
typedef struct SSdbOpt {
  const char *path;
S
Shengliang Guan 已提交
210
  SMnode     *pMnode;
211
  SWal       *pWal;
S
Shengliang Guan 已提交
212
} SSdbOpt;
S
Shengliang Guan 已提交
213

S
Shengliang Guan 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
/**
 * @brief Initialize and start the sdb.
 *
 * @param pOption Option of the sdb.
 * @return SSdb* The sdb object.
 */
SSdb *sdbInit(SSdbOpt *pOption);

/**
 * @brief Stop and cleanup the sdb.
 *
 * @param pSdb The sdb object to close.
 */
void sdbCleanup(SSdb *pSdb);

/**
 * @brief Set the properties of sdb table.
 *
 * @param pSdb The sdb object.
 * @param table The properties of the table.
 * @return int32_t 0 for success, -1 for failure.
 */
S
Shengliang Guan 已提交
236
int32_t sdbSetTable(SSdb *pSdb, SSdbTable table);
S
Shengliang Guan 已提交
237 238 239 240 241 242 243

/**
 * @brief Set the initial rows of sdb.
 *
 * @param pSdb The sdb object.
 * @return int32_t 0 for success, -1 for failure.
 */
S
Shengliang Guan 已提交
244
int32_t sdbDeploy(SSdb *pSdb);
S
Shengliang Guan 已提交
245 246 247 248 249 250 251

/**
 * @brief Load sdb from file.
 *
 * @param pSdb The sdb object.
 * @return int32_t 0 for success, -1 for failure.
 */
S
Shengliang Guan 已提交
252
int32_t sdbReadFile(SSdb *pSdb);
S
Shengliang Guan 已提交
253

254 255 256 257 258 259 260 261
/**
 * @brief Write sdb file.
 *
 * @param pSdb The sdb object.
 * @return int32_t 0 for success, -1 for failure.
 */
int32_t sdbWriteFile(SSdb *pSdb);

S
Shengliang Guan 已提交
262
/**
S
Shengliang Guan 已提交
263
 * @brief Parse and write raw data to sdb, then free the pRaw object
S
Shengliang Guan 已提交
264 265 266 267 268
 *
 * @param pSdb The sdb object.
 * @param pRaw The raw data.
 * @return int32_t 0 for success, -1 for failure.
 */
S
Shengliang Guan 已提交
269
int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271 272 273 274 275 276 277
/**
 * @brief Parse and write raw data to sdb.
 *
 * @param pSdb The sdb object.
 * @param pRaw The raw data.
 * @return int32_t 0 for success, -1 for failure.
 */
278
int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
279

S
Shengliang Guan 已提交
280 281 282 283 284 285 286 287
/**
 * @brief Acquire a row from sdb
 *
 * @param pSdb The sdb object.
 * @param type The type of the row.
 * @param pKey The key value of the row.
 * @return void* The object of the row.
 */
L
Liu Jicong 已提交
288
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
S
Shengliang Guan 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301 302

/**
 * @brief Release a row from sdb.
 *
 * @param pSdb The sdb object.
 * @param pObj The object of the row.
 */
void sdbRelease(SSdb *pSdb, void *pObj);

/**
 * @brief Traverse a sdb table
 *
 * @param pSdb The sdb object.
 * @param type The type of the table.
S
Shengliang Guan 已提交
303
 * @param pIter The initial iterator of the table.
S
Shengliang Guan 已提交
304 305 306 307
 * @param pObj The object of the row just fetched.
 * @return void* The next iterator of the table.
 */
void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
308
void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status) ;
S
Shengliang Guan 已提交
309 310 311 312 313 314 315 316 317

/**
 * @brief Cancel a traversal
 *
 * @param pSdb The sdb object.
 * @param type The initial iterator of table.
 */
void sdbCancelFetch(SSdb *pSdb, void *pIter);

S
Shengliang Guan 已提交
318 319 320 321 322 323 324 325 326 327 328 329
/**
 * @brief Traverse a sdb
 *
 * @param pSdb The sdb object.
 * @param type The initial iterator of table.
 * @param fp The function pointer.
 * @param p1 The callback param.
 * @param p2 The callback param.
 * @param p3 The callback param.
 */
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3);

S
Shengliang Guan 已提交
330 331 332 333 334
/**
 * @brief Get the number of rows in the table
 *
 * @param pSdb The sdb object.
 * @param pIter The type of the table.
S
Shengliang Guan 已提交
335
 * @return int32_t The number of rows in the table
S
Shengliang Guan 已提交
336
 */
S
Shengliang Guan 已提交
337
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
S
Shengliang Guan 已提交
338

S
Shengliang Guan 已提交
339 340 341 342 343
/**
 * @brief Get the max id of the table, keyType of table should be INT32
 *
 * @param pSdb The sdb object.
 * @param pIter The type of the table.
S
Shengliang Guan 已提交
344
 * @return int32_t The max id of the table
S
Shengliang Guan 已提交
345 346 347
 */
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);

348 349 350 351 352 353 354 355 356
/**
 * @brief Get the version of the table
 *
 * @param pSdb The sdb object.
 * @param pIter The type of the table.
 * @return int32_t The version of the table
 */
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);

S
Shengliang Guan 已提交
357
/**
358
 * @brief Update the index of sdb
S
Shengliang Guan 已提交
359 360
 *
 * @param pSdb The sdb object.
361 362
 * @param index The update value of the apply index.
 * @return int32_t The current index of sdb
S
Shengliang Guan 已提交
363
 */
364
void    sdbSetApplyIndex(SSdb *pSdb, int64_t index);
365
void    sdbSetApplyTerm(SSdb *pSdb, int64_t term);
S
Shengliang Guan 已提交
366
void    sdbSetCurConfig(SSdb *pSdb, int64_t config);
367
int64_t sdbGetApplyIndex(SSdb *pSdb);
368
int64_t sdbGetApplyTerm(SSdb *pSdb);
369 370
int64_t sdbGetCommitIndex(SSdb *pSdb);
int64_t sdbGetCommitTerm(SSdb *pSdb);
S
Shengliang Guan 已提交
371
int64_t sdbGetCurConfig(SSdb *pSdb);
S
Shengliang Guan 已提交
372

S
Shengliang Guan 已提交
373
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
S
Shengliang Guan 已提交
374 375
void     sdbFreeRaw(SSdbRaw *pRaw);
int32_t  sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
S
Shengliang Guan 已提交
376
int32_t  sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val);
S
Shengliang Guan 已提交
377 378 379 380 381 382
int32_t  sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val);
int32_t  sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val);
int32_t  sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen);
int32_t  sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen);
int32_t  sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status);
int32_t  sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val);
S
Shengliang Guan 已提交
383
int32_t  sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val);
S
Shengliang Guan 已提交
384 385 386 387 388 389 390 391
int32_t  sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val);
int32_t  sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val);
int32_t  sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen);
int32_t  sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver);
int32_t  sdbGetRawTotalSize(SSdbRaw *pRaw);

SSdbRow *sdbAllocRow(int32_t objSize);
void    *sdbGetRowObj(SSdbRow *pRow);
S
Shengliang Guan 已提交
392
void     sdbFreeRow(SSdb *pSdb, SSdbRow *pRow, bool callFunc);
S
Shengliang Guan 已提交
393

S
Shengliang Guan 已提交
394 395 396 397 398 399 400
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter);
int32_t sdbStopRead(SSdb *pSdb, SSdbIter *pIter);
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len);

int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter);
int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply);
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len);
S
Shengliang Guan 已提交
401

S
Shengliang Guan 已提交
402
const char *sdbTableName(ESdbType type);
S
Shengliang Guan 已提交
403
const char *sdbStatusName(ESdbStatus status);
S
Shengliang Guan 已提交
404
void        sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
S
Shengliang Guan 已提交
405
int32_t     sdbGetIdFromRaw(SSdb *pSdb, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
406

S
Shengliang Guan 已提交
407 408 409
#ifdef __cplusplus
}
#endif
S
Shengliang Guan 已提交
410

S
Shengliang Guan 已提交
411
#endif /*_TD_SDB_H_*/