未验证 提交 aba4a194 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #1920 from taosdata/hotfix/savedVersion

Hotfix/saved version
...@@ -157,8 +157,8 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char ...@@ -157,8 +157,8 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnode, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnode, error:%s", user, tstrerror(rpcRsp.code));
} else { } else {
dTrace("user:%s, auth msg received from mnode", user);
SDMAuthRsp *pRsp = rpcRsp.pCont; SDMAuthRsp *pRsp = rpcRsp.pCont;
dTrace("user:%s, auth msg received from mnode", user);
memcpy(secret, pRsp->secret, TSDB_KEY_LEN); memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi; *spi = pRsp->spi;
......
...@@ -57,7 +57,7 @@ typedef struct { ...@@ -57,7 +57,7 @@ typedef struct {
// if name is empty(name[0] is zero), get the file from index or after, used by master // if name is empty(name[0] is zero), get the file from index or after, used by master
// if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node // if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node
// it returns the file magic number and size, if file not there, magic shall be 0. // it returns the file magic number and size, if file not there, magic shall be 0.
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
...@@ -73,7 +73,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); ...@@ -73,7 +73,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef void (*FNotifyFileSynced)(void *ahandle); typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
......
...@@ -185,7 +185,7 @@ void sdbUpdateMnodeRoles() { ...@@ -185,7 +185,7 @@ void sdbUpdateMnodeRoles() {
} }
} }
static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
sdbUpdateMnodeRoles(); sdbUpdateMnodeRoles();
return 0; return 0;
} }
......
...@@ -898,9 +898,9 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -898,9 +898,9 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API // for synchronous API
tsem_post(pContext->pSem);
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet)); memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
tsem_post(pContext->pSem);
} else { } else {
// for asynchronous API // for asynchronous API
SRpcIpSet *pIpSet = NULL; SRpcIpSet *pIpSet = NULL;
......
...@@ -37,8 +37,8 @@ typedef struct { ...@@ -37,8 +37,8 @@ typedef struct {
int32_t refCount; // reference count int32_t refCount; // reference count
int status; int status;
int8_t role; int8_t role;
int64_t version; int64_t version; // current version
int64_t savedVersion; int64_t fversion; // version on saved data file
void *wqueue; void *wqueue;
void *rqueue; void *rqueue;
void *wal; void *wal;
...@@ -50,7 +50,7 @@ typedef struct { ...@@ -50,7 +50,7 @@ typedef struct {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
char * rootDir; char *rootDir;
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, void *pHead, int type); int vnodeWriteToQueue(void *param, void *pHead, int type);
......
...@@ -37,10 +37,10 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode); ...@@ -37,10 +37,10 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static bool vnodeReadVersion(SVnodeObj *pVnode); static bool vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status); static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle); static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
...@@ -196,6 +196,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -196,6 +196,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
} }
vnodeReadVersion(pVnode); vnodeReadVersion(pVnode);
pVnode->fversion = pVnode->version;
pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
...@@ -394,7 +395,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { ...@@ -394,7 +395,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
SVnodeObj *pVnode = arg; SVnodeObj *pVnode = arg;
if (status == TSDB_STATUS_COMMIT_START) { if (status == TSDB_STATUS_COMMIT_START) {
pVnode->savedVersion = pVnode->version; pVnode->fversion = pVnode->version;
return walRenew(pVnode->wal); return walRenew(pVnode->wal);
} }
...@@ -404,8 +405,9 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { ...@@ -404,8 +405,9 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
return 0; return 0;
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
*fversion = pVnode->fversion;
return tsdbGetFileInfo(pVnode->tsdb, name, index, size); return tsdbGetFileInfo(pVnode->tsdb, name, index, size);
} }
...@@ -425,10 +427,14 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -425,10 +427,14 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
static void vnodeNotifyFileSynced(void *ahandle) { static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vTrace("vgId:%d, data file is synced", pVnode->vgId); vTrace("vgId:%d, data file is synced", pVnode->vgId);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// clsoe tsdb, then open tsdb // clsoe tsdb, then open tsdb
...@@ -706,14 +712,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { ...@@ -706,14 +712,14 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->savedVersion); len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp); fwrite(content, 1, len, fp);
fclose(fp); fclose(fp);
free(content); free(content);
vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->savedVersion); vPrint("vgId:%d, save vnode version:%" PRId64 " succeed", pVnode->vgId, pVnode->fversion);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册