storageapi.h 17.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_STORAGEAPI_H
#define TDENGINE_STORAGEAPI_H

19 20
#include "function.h"
#include "index.h"
21 22
#include "taosdef.h"
#include "tcommon.h"
23 24 25
#include "tmsg.h"
#include "tscalablebf.h"
#include "tsimplehash.h"
26 27 28 29 30 31 32 33 34 35 36 37 38

#ifdef __cplusplus
extern "C" {
#endif

#define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL  2

#define CACHESCAN_RETRIEVE_TYPE_ALL    0x1
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW    0x4
#define CACHESCAN_RETRIEVE_LAST        0x8

39 40 41 42 43
#define META_READER_NOLOCK 0x1

typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);

44 45 46 47 48
typedef struct SMetaEntry {
  int64_t  version;
  int8_t   type;
  int8_t   flags;  // TODO: need refactor?
  tb_uid_t uid;
49
  char*    name;
50 51 52 53 54 55 56
  union {
    struct {
      SSchemaWrapper schemaRow;
      SSchemaWrapper schemaTag;
      SRSmaParam     rsmaParam;
    } stbEntry;
    struct {
57
      int64_t  btime;
58 59
      int32_t  ttlDays;
      int32_t  commentLen;
60
      char*    comment;
61
      tb_uid_t suid;
62
      uint8_t* pTags;
63 64
    } ctbEntry;
    struct {
65
      int64_t        btime;
66 67
      int32_t        ttlDays;
      int32_t        commentLen;
68
      char*          comment;
69 70 71 72
      int32_t        ncid;  // next column id
      SSchemaWrapper schemaRow;
    } ntbEntry;
    struct {
73
      STSma* tsma;
74 75 76
    } smaEntry;
  };

77
  uint8_t* pBuf;
78 79
} SMetaEntry;

80
typedef struct SMetaReader {
81 82 83 84 85 86 87
  int32_t            flags;
  void*              pMeta;
  SDecoder           coder;
  SMetaEntry         me;
  void*              pBuf;
  int32_t            szBuf;
  struct SStoreMeta* pAPI;
88 89 90
} SMetaReader;

typedef struct SMTbCursor {
91 92 93 94
  void*       pMeta;
  void*       pDbc;
  void*       pKey;
  void*       pVal;
95 96 97
  int32_t     kLen;
  int32_t     vLen;
  SMetaReader mr;
98
  int8_t      paused;
99 100 101 102 103 104 105 106 107
} SMTbCursor;

typedef struct SRowBuffPos {
  void* pRowBuff;
  void* pKey;
  bool  beFlushed;
  bool  beUsed;
} SRowBuffPos;

108 109 110 111
// tq
typedef struct SMetaTableInfo {
  int64_t         suid;
  int64_t         uid;
112
  SSchemaWrapper* schema;
113 114 115 116
  char            tbName[TSDB_TABLE_NAME_LEN];
} SMetaTableInfo;

typedef struct SSnapContext {
117 118 119 120 121 122 123 124 125
  SMeta*    pMeta;  // todo remove it
  int64_t   snapVersion;
  void*     pCur;
  int64_t   suid;
  int8_t    subType;
  SHashObj* idVersion;
  SHashObj* suidInfo;
  SArray*   idList;
  int32_t   index;
126 127
  int8_t    withMeta;
  int8_t    queryMeta;  // true-get meta, false-get data
128 129 130 131 132 133 134 135 136 137 138 139 140
} SSnapContext;

typedef struct {
  int64_t uid;
  int64_t ctbNum;
} SMetaStbStats;

// void    tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
// int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
// int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
// int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
// bool    tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
// bool    tqCurrentBlockConsumed(const STqReader* pReader);
141
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
142 143
// bool    tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool    tqNextBlockImpl(STqReader *pReader, const char* idstr);
144 145 146
// int32_t        getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t        setForSnapShot(SSnapContext
// *ctx, int64_t uid); int32_t        destroySnapContext(SSnapContext *ctx);
147

148
// clang-format off
149
/*-------------------------------------------------new api format---------------------------------------------------*/
150
typedef struct TsdReader {
151
  int32_t      (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
152 153
                           SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
                           SHashObj** pIgnoreTables);
154 155 156 157
  void         (*tsdReaderClose)();
  void         (*tsdSetReaderTaskId)(void *pReader, const char *pId);
  int32_t      (*tsdSetQueryTableList)();
  int32_t      (*tsdNextDataBlock)();
158

159
  int32_t      (*tsdReaderRetrieveBlockSMAInfo)();
160 161
  SSDataBlock *(*tsdReaderRetrieveDataBlock)();

162
  void         (*tsdReaderReleaseDataBlock)();
163

164 165 166 167
  int32_t      (*tsdReaderResetStatus)();
  int32_t      (*tsdReaderGetDataBlockDistInfo)();
  int64_t      (*tsdReaderGetNumOfInMemRows)();
  void         (*tsdReaderNotifyClosing)();
168
} TsdReader;
169

170
typedef struct SStoreCacheReader {
171 172 173 174 175 176
  int32_t  (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
                         SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
  void    *(*closeReader)(void *pReader);
  int32_t  (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
                           SArray *pTableUidList);
  int32_t  (*reuseReader)(void *pReader, void *pTableIdList, int32_t numOfTables);
177
} SStoreCacheReader;
178

179 180
// clang-format on

181 182 183 184 185 186 187 188 189
/*------------------------------------------------------------------------------------------------------------------*/
/*
void    tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList);
int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
bool    tqReaderIsQueriedTable(STqReader* pReader, uint64_t uid);
bool    tqCurrentBlockConsumed(const STqReader* pReader);

190
int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
191 192 193 194 195
bool    tqNextBlockInWal(STqReader* pReader, const char* idstr);
bool    tqNextBlockImpl(STqReader *pReader, const char* idstr);

 int32_t    tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char* idstr);
STqReader *tqReaderOpen(void *pVnode);
196
void       tqReaderClose(STqReader *);
197 198 199 200 201 202 203

int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool    tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
SWalReader* tqGetWalReader(STqReader* pReader);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
*/
// todo rename
204
typedef struct SStoreTqReader {
205
  struct STqReader* (*tqReaderOpen)();
206 207 208 209 210 211
  void (*tqReaderClose)();

  int32_t (*tqReaderSeek)();
  int32_t (*tqRetrieveBlock)();
  bool (*tqReaderNextBlockInWal)();
  bool (*tqNextBlockImpl)();  // todo remove it
H
Haojun Liao 已提交
212
  SSDataBlock* (*tqGetResultBlock)();
213 214

  void (*tqReaderSetColIdList)();
215
  int32_t (*tqReaderSetQueryTableList)();
216 217 218 219 220 221 222

  int32_t (*tqReaderAddTables)();
  int32_t (*tqReaderRemoveTables)();

  bool (*tqReaderIsQueriedTable)();
  bool (*tqReaderCurrentBlockConsumed)();

223 224
  struct SWalReader* (*tqReaderGetWalReader)();  // todo remove it
  int32_t (*tqReaderRetrieveTaosXBlock)();       // todo remove it
225 226

  int32_t (*tqReaderSetSubmitMsg)();  // todo remove it
227
  bool (*tqReaderNextBlockFilterOut)();
228
} SStoreTqReader;
229 230

typedef struct SStoreSnapshotFn {
231
  int32_t (*setForSnapShot)(SSnapContext* ctx, int64_t uid);
232
  int32_t (*destroySnapshot)(SSnapContext* ctx);
233 234
  SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
  int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
235 236
} SStoreSnapshotFn;

237
typedef struct SStoreMeta {
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
  SMTbCursor* (*openTableMetaCursor)(void* pVnode);                     // metaOpenTbCursor
  void (*closeTableMetaCursor)(SMTbCursor* pTbCur);                     // metaCloseTbCursor
  void (*pauseTableMetaCursor)(SMTbCursor* pTbCur);                     // metaPauseTbCursor
  void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first);      // metaResumeTbCursor
  int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType);  // metaTbCursorNext
  int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType);  // metaTbCursorPrev

  int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
  int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
  const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal);  // todo remove it

  int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid);
  int32_t (*getTableTypeByName)(void* pVnode, char* tbName, ETableType* tbType);
  int32_t (*getTableNameByUid)(void* pVnode, uint64_t uid, char* tbName);
  bool (*isTableExisted)(void* pVnode, tb_uid_t uid);

  int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
  int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
                                   int32_t payloadLen);

  int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
                                bool* acquireRes);
  int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
                                int32_t payloadLen, double selectivityRatio);

  void* (*storeGetIndexInfo)();
  void* (*getInvertIndex)(void* pVnode);
  int32_t (*getChildTableList)(
      void* pVnode, int64_t suid,
      SArray* list);  // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
  int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList);  // vnodeGetStbIdList  & vnodeGetAllTableList
  void* storeGetVersionRange;
  void* storeGetLastTimestamp;

  int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid);  // tsdbGetTableSchema
273 274

  // db name, vgId, numOfTables, numOfSTables
275 276 277 278 279 280
  int32_t (*getNumOfChildTables)(
      void* pVnode, int64_t uid,
      int64_t* numOfTables);  // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
  void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
                       int64_t* numOfNormalTables);  // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
                                                     // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
281

H
Haojun Liao 已提交
282
  int64_t (*getNumOfRowsInMem)(void* pVnode);
283 284 285 286 287
  /**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
 */
288
} SStoreMeta;
289

H
Haojun Liao 已提交
290
typedef struct SStoreMetaReader {
291 292 293 294 295 296
  void (*initReader)(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI);
  void (*clearReader)(SMetaReader* pReader);
  void (*readerReleaseLock)(SMetaReader* pReader);
  int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid);
  int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name);
  int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid);
H
Haojun Liao 已提交
297
} SStoreMetaReader;
298 299

typedef struct SUpdateInfo {
300
  SArray*      pTsBuckets;
301
  uint64_t     numBuckets;
302
  SArray*      pTsSBFs;
303 304 305 306
  uint64_t     numSBFs;
  int64_t      interval;
  int64_t      watermark;
  TSKEY        minTS;
307 308
  SScalableBf* pCloseWinSBF;
  SHashObj*    pMap;
309 310 311 312
  uint64_t     maxDataVersion;
} SUpdateInfo;

typedef struct {
313 314 315 316 317
  void*   iter;      //  rocksdb_iterator_t*    iter;
  void*   snapshot;  //  rocksdb_snapshot_t*    snapshot;
  void*   readOpt;   //  rocksdb_readoptions_t* readOpt;
  void*   db;        //  rocksdb_t*             db;
  void*   pCur;
318 319 320 321 322 323 324 325 326
  int64_t number;
} SStreamStateCur;

typedef struct SStateStore {
  int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname);
  int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal);

  int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
  int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal);
327
  void (*streamStateFreeVal)(void* val);
328 329 330

  int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
  int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
331
  bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
332 333 334
  int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
  int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
  int32_t (*streamStateClear)(SStreamState* pState);
335
  void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
336 337 338 339 340 341 342
  int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
  int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);

  int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
  int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
  int32_t (*streamStateFillDel)(SStreamState* pState, const SWinKey* key);

343 344
  int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
  int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);
345

346 347 348 349 350
  SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
  SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
  SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
  SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
  void (*streamStateFreeCur)(SStreamStateCur* pCur);
351

352 353
  int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
  int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
354

355 356
  int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                             int32_t* pVLen);
357 358 359 360
  int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
  int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
  int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
  int32_t (*streamStateSessionClear)(SStreamState* pState);
361
  int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
362
  int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
363
                                           state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
364
  int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
365

366 367 368 369 370
  SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
  TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
  bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
  bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
  void (*updateInfoDestroy)(SUpdateInfo* pInfo);
L
liuyao 已提交
371 372
  void (*windowSBfDelete)(SUpdateInfo *pInfo, uint64_t count);
  void (*windowSBfAdd)(SUpdateInfo *pInfo, uint64_t count);
373

374 375 376 377 378
  SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
  void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
  void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
  int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
  int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
379

380 381 382
  SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
  SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
  SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
383

384
  struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
385
                                                  uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id);
386

387 388 389
  void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
  void (*streamFileStateClear)(struct SStreamFileState* pFileState);
  bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
390 391

  SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
392 393 394 395 396
  void (*streamStateClose)(SStreamState* pState, bool remove);
  int32_t (*streamStateBegin)(SStreamState* pState);
  int32_t (*streamStateCommit)(SStreamState* pState);
  void (*streamStateDestroy)(SStreamState* pState, bool remove);
  int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
L
liuyao 已提交
397
  void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
398 399 400
} SStateStore;

typedef struct SStorageAPI {
H
Haojun Liao 已提交
401 402 403 404 405 406 407 408
  SStoreMeta          metaFn;  // todo: refactor
  TsdReader           tsdReader;
  SStoreMetaReader    metaReaderFn;
  SStoreCacheReader   cacheFn;
  SStoreSnapshotFn    snapshotFn;
  SStoreTqReader      tqReaderFn;
  SStateStore         stateStore;
  SMetaDataFilterAPI  metaFilter;
409
  SFunctionStateStore functionStore;
410 411 412 413 414 415 416
} SStorageAPI;

#ifdef __cplusplus
}
#endif

#endif  // TDENGINE_STORAGEAPI_H