提交 cae57255 编写于 作者: H Hongze Cheng

more work

上级 c58b43ba
...@@ -33,10 +33,10 @@ ...@@ -33,10 +33,10 @@
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tlrucache.h"
#include "tmallocator.h" #include "tmallocator.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tlrucache.h"
#include "tstream.h" #include "tstream.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -88,7 +88,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p ...@@ -88,7 +88,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
...@@ -107,7 +107,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); ...@@ -107,7 +107,7 @@ int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader);
int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta *pMeta, uint64_t time, SArray *uidList); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
...@@ -183,9 +183,9 @@ typedef struct { ...@@ -183,9 +183,9 @@ typedef struct {
// SVState // SVState
struct SVState { struct SVState {
// int64_t processed;
int64_t committed; int64_t committed;
int64_t applied; int64_t applied;
int64_t commitID;
}; };
struct SVnodeInfo { struct SVnodeInfo {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
/* commit data */ /* commit data */
int64_t commitID;
int32_t minutes; int32_t minutes;
int8_t precision; int8_t precision;
int32_t minRow; int32_t minRow;
...@@ -837,6 +838,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { ...@@ -837,6 +838,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
// unlock(); // unlock();
pCommitter->pTsdb = pTsdb; pCommitter->pTsdb = pTsdb;
pCommitter->commitID = pTsdb->pVnode->state.commitID;
pCommitter->minutes = pTsdb->keepCfg.days; pCommitter->minutes = pTsdb->keepCfg.days;
pCommitter->precision = pTsdb->keepCfg.precision; pCommitter->precision = pTsdb->keepCfg.precision;
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
......
...@@ -40,6 +40,7 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -40,6 +40,7 @@ int vnodeBegin(SVnode *pVnode) {
/* pthread_mutex_unlock(); */ /* pthread_mutex_unlock(); */
pVnode->state.commitID++;
// begin meta // begin meta
if (metaBegin(pVnode->pMeta) < 0) { if (metaBegin(pVnode->pMeta) < 0) {
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
...@@ -210,7 +211,8 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -210,7 +211,8 @@ int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info = {0}; SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN];
vInfo("vgId:%d, start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied); vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
pVnode->onCommit = pVnode->inUse; pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL; pVnode->inUse = NULL;
...@@ -278,7 +280,7 @@ static int vnodeCommitImpl(void *arg) { ...@@ -278,7 +280,7 @@ static int vnodeCommitImpl(void *arg) {
// metaCommit(pVnode->pMeta); // metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq); tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb); // tsdbCommit(pVnode->pTsdb, );
// vnodeBufPoolRecycle(pVnode); // vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit)); tsem_post(&(pVnode->canCommit));
...@@ -302,6 +304,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { ...@@ -302,6 +304,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
return 0; return 0;
} }
...@@ -314,6 +317,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { ...@@ -314,6 +317,8 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "applied version", pState->applied, code); tjsonGetNumberValue(pJson, "applied version", pState->applied, code);
if (code < 0) return -1; if (code < 0) return -1;
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1;
return 0; return 0;
} }
......
...@@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { ...@@ -37,6 +37,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.config = *pCfg; info.config = *pCfg;
info.state.committed = -1; info.state.committed = -1;
info.state.applied = -1; info.state.applied = -1;
info.state.commitID = 0;
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno)); vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
...@@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -79,6 +80,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->config = info.config; pVnode->config = info.config;
pVnode->state.committed = info.state.committed; pVnode->state.committed = info.state.committed;
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
pVnode->syncCount = 0; pVnode->syncCount = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册