提交 5891d97a 编写于 作者: H Hongze Cheng

more work

上级 64b1400d
...@@ -3,6 +3,7 @@ add_library(vnode STATIC "") ...@@ -3,6 +3,7 @@ add_library(vnode STATIC "")
target_sources( target_sources(
vnode vnode
PRIVATE PRIVATE
# vnode # vnode
"src/vnd/vnodeOpen.c" "src/vnd/vnodeOpen.c"
"src/vnd/vnodeBufPool.c" "src/vnd/vnodeBufPool.c"
...@@ -14,7 +15,6 @@ target_sources( ...@@ -14,7 +15,6 @@ target_sources(
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeUtil.c"
# meta # meta
"src/meta/metaOpen.c" "src/meta/metaOpen.c"
...@@ -66,7 +66,6 @@ target_include_directories( ...@@ -66,7 +66,6 @@ target_include_directories(
PUBLIC "inc" PUBLIC "inc"
PRIVATE "src/inc" PRIVATE "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
) )
target_link_libraries( target_link_libraries(
vnode vnode
...@@ -80,18 +79,19 @@ target_link_libraries( ...@@ -80,18 +79,19 @@ target_link_libraries(
PUBLIC executor PUBLIC executor
PUBLIC scheduler PUBLIC scheduler
PUBLIC tdb PUBLIC tdb
#PUBLIC bdb
#PUBLIC scalar # PUBLIC bdb
# PUBLIC scalar
PUBLIC transport PUBLIC transport
PUBLIC stream PUBLIC stream
PUBLIC index PUBLIC index
) )
target_compile_definitions(vnode PUBLIC -DMETA_REFACT) target_compile_definitions(vnode PUBLIC -DMETA_REFACT)
if (${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX) if(${BUILD_WITH_INVERTEDINDEX})
add_definitions(-DUSE_INVERTED_INDEX)
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
...@@ -159,8 +159,8 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t * ...@@ -159,8 +159,8 @@ int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *
// SVSnapReader // SVSnapReader
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader); int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader);
int32_t vnodeSnapReaderClose(SVSnapReader *pReader); int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nData); int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
// SVSnapWriter; // SVSnapWriter
int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter); int32_t vnodeSnapshotWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
int32_t vnodeSnapshotWriterClose(SVSnapWriter *pWriter, int8_t rollback); int32_t vnodeSnapshotWriterClose(SVSnapWriter *pWriter, int8_t rollback);
int32_t vnodeSnapshotWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapshotWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
......
...@@ -59,7 +59,9 @@ typedef struct SVBufPool SVBufPool; ...@@ -59,7 +59,9 @@ typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapReader SMetaSnapReader;
typedef struct SMetaSnapWriter SMetaSnapWriter;
typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapReader STsdbSnapReader;
typedef struct STsdbSnapWriter STsdbSnapWriter;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -72,10 +74,8 @@ typedef struct STsdbSnapReader STsdbSnapReader; ...@@ -72,10 +74,8 @@ typedef struct STsdbSnapReader STsdbSnapReader;
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
// vnd.h // vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
int32_t vnodeRealloc(void** pp, int32_t size);
void vnodeFree(void* p);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
...@@ -109,9 +109,6 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid); ...@@ -109,9 +109,6 @@ STSma* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy); STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy);
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta); SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
int32_t metaSnapReaderClose(SMetaSnapReader* pReader);
int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
...@@ -131,9 +128,6 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p ...@@ -131,9 +128,6 @@ int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* p
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader* pReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, void** ppData, uint32_t* nData);
// tq // tq
int tqInit(); int tqInit();
...@@ -180,6 +174,16 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); ...@@ -180,6 +174,16 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void tdUidStoreDestory(STbUidStore* pStore); void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore);
// SMetaSnapReader ========================================
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader);
int32_t metaSnapReaderClose(SMetaSnapReader** ppReader);
int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData, int64_t* nDatap);
// SMetaSnapWriter ========================================
// STsdbSnapReader ========================================
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData, int64_t* nData);
// STsdbSnapWriter ========================================
typedef struct { typedef struct {
int8_t streamType; // sma or other int8_t streamType; // sma or other
int8_t dstType; int8_t dstType;
......
...@@ -15,53 +15,57 @@ ...@@ -15,53 +15,57 @@
#include "meta.h" #include "meta.h"
// SMetaSnapReader ========================================
struct SMetaSnapReader { struct SMetaSnapReader {
SMeta* pMeta; SMeta* pMeta;
TBC* pTbc;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
TBC* pTbc;
}; };
int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) { int32_t metaSnapReaderOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
int32_t c = 0; int32_t c = 0;
SMetaSnapReader* pMetaReader = NULL; SMetaSnapReader* pMetaSnapReader = NULL;
pMetaReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaReader)); // alloc
if (pMetaReader == NULL) { pMetaSnapReader = (SMetaSnapReader*)taosMemoryCalloc(1, sizeof(*pMetaSnapReader));
if (pMetaSnapReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
pMetaReader->pMeta = pMeta; pMetaSnapReader->pMeta = pMeta;
pMetaReader->sver = sver; pMetaSnapReader->sver = sver;
pMetaReader->ever = ever; pMetaSnapReader->ever = ever;
code = tdbTbcOpen(pMeta->pTbDb, &pMetaReader->pTbc, NULL);
// impl
code = tdbTbcOpen(pMeta->pTbDb, &pMetaSnapReader->pTbc, NULL);
if (code) { if (code) {
goto _err; goto _err;
} }
code = tdbTbcMoveTo(pMetaReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c); code = tdbTbcMoveTo(pMetaSnapReader->pTbc, &(STbDbKey){.version = sver, .uid = INT64_MIN}, sizeof(STbDbKey), &c);
if (code) { if (code) {
goto _err; goto _err;
} }
*ppReader = pMetaReader; *ppReader = pMetaSnapReader;
return code; return code;
_err: _err:
metaError("vgId:%d meta snap reader open failed since %s", TD_VID(pMeta->pVnode), tstrerror(code));
*ppReader = NULL; *ppReader = NULL;
return code; return code;
} }
int32_t metaSnapReaderClose(SMetaSnapReader* pReader) { int32_t metaSnapReaderClose(SMetaSnapReader** ppReader) {
if (pReader) { tdbTbcClose((*ppReader)->pTbc);
tdbTbcClose(pReader->pTbc); taosMemoryFree(*ppReader);
taosMemoryFree(pReader); *ppReader = NULL;
}
return 0; return 0;
} }
int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap) { int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData, int64_t* nDatap) {
const void* pKey = NULL; const void* pKey = NULL;
const void* pData = NULL; const void* pData = NULL;
int32_t nKey = 0; int32_t nKey = 0;
...@@ -82,7 +86,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap) ...@@ -82,7 +86,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap)
} }
// copy the data // copy the data
if (vnodeRealloc(ppData, nData) < 0) { if (tRealloc(ppData, nData) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
...@@ -90,4 +94,6 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap) ...@@ -90,4 +94,6 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, void** ppData, uint32_t* nDatap)
memcpy(*ppData, pData, nData); memcpy(*ppData, pData, nData);
*nDatap = nData; *nDatap = nData;
return code; return code;
} }
\ No newline at end of file
// SMetaSnapWriter ========================================
\ No newline at end of file
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "tsdb.h" #include "tsdb.h"
// STsdbSnapReader ========================================
struct STsdbSnapReader { struct STsdbSnapReader {
STsdb* pTsdb; STsdb* pTsdb;
int64_t sver; int64_t sver;
...@@ -59,7 +60,7 @@ _err: ...@@ -59,7 +60,7 @@ _err:
return code; return code;
} }
int32_t tsdbSnapRead(STsdbSnapReader* pReader, void** ppData, uint32_t* nData) { int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData, int64_t* nData) {
int32_t code = 0; int32_t code = 0;
// read data file // read data file
...@@ -73,8 +74,8 @@ _err: ...@@ -73,8 +74,8 @@ _err:
return code; return code;
} }
int32_t tsdbSnapReaderClose(STsdbSnapReader* pReader) { int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
taosMemoryFree(pReader); taosMemoryFree(ppReader);
return code; return code;
} }
...@@ -26,8 +26,8 @@ struct SVSnapReader { ...@@ -26,8 +26,8 @@ struct SVSnapReader {
// tsdb // tsdb
int8_t tsdbDone; int8_t tsdbDone;
STsdbSnapReader *pTsdbReader; STsdbSnapReader *pTsdbReader;
void *pData; uint8_t *pData;
int32_t nData; int64_t nData;
}; };
int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) { int32_t vnodeSnapReaderOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapReader **ppReader) {
...@@ -61,15 +61,15 @@ _err: ...@@ -61,15 +61,15 @@ _err:
int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t code = 0; int32_t code = 0;
vnodeFree(pReader->pData); tFree(pReader->pData);
if (pReader->pTsdbReader) tsdbSnapReaderClose(pReader->pTsdbReader); if (pReader->pTsdbReader) tsdbSnapReaderClose(&pReader->pTsdbReader);
if (pReader->pMetaReader) metaSnapReaderClose(pReader->pMetaReader); if (pReader->pMetaReader) metaSnapReaderClose(&pReader->pMetaReader);
taosMemoryFree(pReader); taosMemoryFree(pReader);
return code; return code;
} }
int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nData) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0; int32_t code = 0;
if (!pReader->metaDone) { if (!pReader->metaDone) {
...@@ -78,7 +78,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nDat ...@@ -78,7 +78,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nDat
if (code == TSDB_CODE_VND_READ_END) { if (code == TSDB_CODE_VND_READ_END) {
pReader->metaDone = 1; pReader->metaDone = 1;
} else { } else {
return code; goto _err;
} }
} else { } else {
*ppData = pReader->pData; *ppData = pReader->pData;
...@@ -93,7 +93,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nDat ...@@ -93,7 +93,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, const void **ppData, uint32_t *nDat
if (code == TSDB_CODE_VND_READ_END) { if (code == TSDB_CODE_VND_READ_END) {
pReader->tsdbDone = 1; pReader->tsdbDone = 1;
} else { } else {
return code; goto _err;
} }
} else { } else {
*ppData = pReader->pData; *ppData = pReader->pData;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
int32_t vnodeRealloc(void** pp, int32_t size) {
uint8_t* p = NULL;
int32_t csize = 0;
if (*pp) {
p = (uint8_t*)(*pp) - sizeof(int32_t);
csize = *(int32_t*)p;
}
if (csize >= size) {
return 0;
}
p = (uint8_t*)taosMemoryRealloc(p, size);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*(int32_t*)p = size;
*pp = p + sizeof(int32_t);
return 0;
}
void vnodeFree(void* p) {
if (p) {
taosMemoryFree(((uint8_t*)p) - sizeof(int32_t));
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册