streamBackendRocksdb.h 7.7 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _STREAM_BACKEDN_ROCKSDB_H_
#define _STREAM_BACKEDN_ROCKSDB_H_

19 20
#include "rocksdb/c.h"
// #include "streamInc.h"
dengyihao's avatar
dengyihao 已提交
21 22 23 24 25 26
#include "streamState.h"
#include "tcoding.h"
#include "tcommon.h"
#include "tcompare.h"
#include "ttimer.h"

dengyihao's avatar
dengyihao 已提交
27 28 29 30
typedef struct SCfComparator {
  rocksdb_comparator_t** comp;
  int32_t                numOfComp;
} SCfComparator;
31 32 33 34 35 36 37 38 39 40 41 42

typedef struct {
  rocksdb_t*                         db;
  rocksdb_writeoptions_t*            writeOpts;
  rocksdb_readoptions_t*             readOpts;
  rocksdb_options_t*                 dbOpt;
  void*                              param;
  void*                              env;
  rocksdb_cache_t*                   cache;
  TdThreadMutex                      mutex;
  rocksdb_compactionfilterfactory_t* filterFactory;
  SList*                             list;
dengyihao's avatar
dengyihao 已提交
43 44
  TdThreadMutex                      cfMutex;
  SHashObj*                          cfInst;
dengyihao's avatar
dengyihao 已提交
45
  int64_t                            defaultCfInit;
46 47 48 49 50 51 52
} SBackendHandle;

void*      streamBackendInit(const char* path);
void       streamBackendCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void       streamBackendDelCompare(void* backend, void* arg);

dengyihao's avatar
dengyihao 已提交
53 54 55
int  streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);
void streamStateDestroyCompar(void* arg);
dengyihao's avatar
dengyihao 已提交
56

dengyihao's avatar
dengyihao 已提交
57
// state cf
dengyihao's avatar
dengyihao 已提交
58 59 60 61
int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear_rocksdb(SStreamState* pState);
dengyihao's avatar
dengyihao 已提交
62 63 64 65 66 67 68 69 70 71
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
dengyihao's avatar
dengyihao 已提交
72

dengyihao's avatar
dengyihao 已提交
73 74 75 76
// func cf
int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key);
dengyihao's avatar
dengyihao 已提交
77

dengyihao's avatar
dengyihao 已提交
78
//  session cf
dengyihao's avatar
dengyihao 已提交
79
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
dengyihao's avatar
dengyihao 已提交
80 81
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key);
dengyihao's avatar
dengyihao 已提交
82 83 84 85 86 87 88
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key);
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
                                                int32_t* pVLen);
dengyihao's avatar
dengyihao 已提交
89
int32_t streamStateSessionClear_rocksdb(SStreamState* pState);
dengyihao's avatar
dengyihao 已提交
90 91 92 93

int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
                                              int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);

dengyihao's avatar
dengyihao 已提交
94 95 96 97
// fill cf
int32_t          streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t          streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t          streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key);
dengyihao's avatar
dengyihao 已提交
98 99 100 101
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
dengyihao's avatar
dengyihao 已提交
102 103 104 105 106 107

// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);
int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen);

// parname cf
dengyihao's avatar
dengyihao 已提交
108 109
int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]);
int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal);
dengyihao's avatar
dengyihao 已提交
110

111
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
dengyihao's avatar
dengyihao 已提交
112 113 114 115 116 117 118 119

void*   streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void    streamStateClearBatch(void* pBatch);
void    streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
                            void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
dengyihao's avatar
dengyihao 已提交
120

dengyihao's avatar
dengyihao 已提交
121
// default cf
dengyihao's avatar
dengyihao 已提交
122 123 124
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
dengyihao's avatar
dengyihao 已提交
125
int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
dengyihao's avatar
dengyihao 已提交
126 127
void*   streamDefaultIterCreate_rocksdb(SStreamState* pState);
int32_t streamDefaultIterValid_rocksdb(void* iter);
dengyihao's avatar
dengyihao 已提交
128 129 130 131
void    streamDefaultIterSeek_rocksdb(void* iter, const char* key);
void    streamDefaultIterNext_rocksdb(void* iter);
char*   streamDefaultIterKey_rocksdb(void* iter, int32_t* len);
char*   streamDefaultIterVal_rocksdb(void* iter, int32_t* len);
dengyihao's avatar
dengyihao 已提交
132

dengyihao's avatar
dengyihao 已提交
133 134 135 136 137 138 139 140
// batch func
void*   streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void    streamStateClearBatch(void* pBatch);
void    streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
                            void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
dengyihao's avatar
dengyihao 已提交
141
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
dengyihao's avatar
dengyihao 已提交
142
#endif