提交 2332725e 编写于 作者: J jtao1735

Merge branch 'develop' into feature/unified

...@@ -188,7 +188,7 @@ static int32_t dnodeOpenVnodes() { ...@@ -188,7 +188,7 @@ static int32_t dnodeOpenVnodes() {
free(vnodeList); free(vnodeList);
dPrint("there are total vnodes:%d, failed to open:%d", numOfVnodes, failed); dPrint("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, numOfVnodes-failed, failed);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -54,8 +54,8 @@ typedef struct { ...@@ -54,8 +54,8 @@ typedef struct {
int role[TAOS_SYNC_MAX_REPLICA]; int role[TAOS_SYNC_MAX_REPLICA];
} SNodesRole; } SNodesRole;
// if name is null, get the file from index or after, used by master // if name is empty(name[0] is zero), get the file from index or after, used by master
// if name is provided, get the named file at the specified index, used by unsynced node // if name is provided(name[0] is not zero), get the named file at the specified index, used by unsynced node
// it returns the file magic number and size, if file not there, magic shall be 0. // it returns the file magic number and size, if file not there, magic shall be 0.
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
...@@ -72,6 +72,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); ...@@ -72,6 +72,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
// when role is changed, call this to notify app // when role is changed, call this to notify app
typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// when data file is synced successfully, notity app
typedef void (*FNotifyFileSynced)(void *ahandle);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
uint64_t version; // initial version uint64_t version; // initial version
...@@ -84,7 +87,7 @@ typedef struct { ...@@ -84,7 +87,7 @@ typedef struct {
FWriteToCache writeToCache; FWriteToCache writeToCache;
FConfirmForward confirmForward; FConfirmForward confirmForward;
FNotifyRole notifyRole; FNotifyRole notifyRole;
FNotifyFileSynced notifyFileSynced;
} SSyncInfo; } SSyncInfo;
typedef void* tsync_h; typedef void* tsync_h;
......
...@@ -188,7 +188,7 @@ static void taosAcceptTcpConnection(void *arg) { ...@@ -188,7 +188,7 @@ static void taosAcceptTcpConnection(void *arg) {
sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port); sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
if (sockFd < 0) return; if (sockFd < 0) return;
tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port); tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
while (1) { while (1) {
socklen_t addrlen = sizeof(caddr); socklen_t addrlen = sizeof(caddr);
......
...@@ -42,6 +42,7 @@ static int vnodeWalCallback(void *arg); ...@@ -42,6 +42,7 @@ static int vnodeWalCallback(void *arg);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle);
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
...@@ -230,6 +231,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -230,6 +231,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
// start continuous query // start continuous query
...@@ -401,6 +403,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -401,6 +403,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
static void vnodeNotifyFileSynced(void *ahandle) {
SVnodeObj *pVnode = ahandle;
dTrace("pVnode:%p vgId:%d, data file is synced", pVnode, pVnode->vgId);
// clsoe tsdb, then open tsdb
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
...@@ -51,7 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -51,7 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
return TSDB_CODE_MSG_NOT_PROCESSED; return TSDB_CODE_MSG_NOT_PROCESSED;
if (pVnode->status != TAOS_VN_STATUS_READY) if (pVnode->status != TAOS_VN_STATUS_READY && qtype == TAOS_QTYPE_RPC)
return TSDB_CODE_NOT_ACTIVE_VNODE; return TSDB_CODE_NOT_ACTIVE_VNODE;
if (pHead->version == 0) { // from client if (pHead->version == 0) { // from client
......
...@@ -79,6 +79,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -79,6 +79,8 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
free(pWal); free(pWal);
pWal = NULL; pWal = NULL;
} else {
wTrace("wal:%s, it is open, level:%d", path, pWal->level);
} }
return pWal; return pWal;
...@@ -177,8 +179,11 @@ void walFsync(void *handle) { ...@@ -177,8 +179,11 @@ void walFsync(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal->level == TAOS_WAL_FSYNC) if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
fsync(pWal->fd); if (fsync(pWal->fd) < 0) {
wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
}
}
} }
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册