提交 b962651e 编写于 作者: S Shuduo Sang

Merge branch 'develop' into feature/sangshuduo/hivemq-test

......@@ -122,11 +122,16 @@ void dnodeFreeMPeerQueue() {
}
void dnodeDispatchToMPeerQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMPeerQueue == NULL) {
if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, false);
} else {
SMnodeMsg *pPeer = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
if (!mnodeIsReady()) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL};
rpcSendResponse(&rpcRsp);
} else {
SMnodeMsg *pPeer = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer);
}
}
rpcFreeCont(pMsg->pCont);
......
......@@ -123,11 +123,16 @@ void dnodeFreeMReadQueue() {
}
void dnodeDispatchToMReadQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMReadQueue == NULL) {
if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, true);
} else {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
if (!mnodeIsReady()) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL};
rpcSendResponse(&rpcRsp);
} else {
SMnodeMsg *pRead = mnodeCreateMsg(pMsg);
taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead);
}
}
rpcFreeCont(pMsg->pCont);
......
......@@ -123,13 +123,18 @@ void dnodeFreeMWritequeue() {
}
void dnodeDispatchToMWriteQueue(SRpcMsg *pMsg) {
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
if (!mnodeIsRunning()) {
dnodeSendRedirectMsg(pMsg, true);
} else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
if (!mnodeIsReady()) {
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = mnodeInitCode(), .pCont = NULL};
rpcSendResponse(&rpcRsp);
} else {
SMnodeMsg *pWrite = mnodeCreateMsg(pMsg);
dDebug("msg:%p, app:%p type:%s is put into mwrite queue:%p", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
}
rpcFreeCont(pMsg->pCont);
......@@ -187,7 +192,7 @@ static void *dnodeProcessMWriteQueue(void *param) {
void dnodeReprocessMWriteMsg(void *pMsg) {
SMnodeMsg *pWrite = pMsg;
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
if (!mnodeIsRunning()) {
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
......@@ -196,7 +201,6 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
} else {
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType], tsMWriteQueue, pWrite->retry);
taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite);
}
}
......
......@@ -63,6 +63,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"dnodeeps", dnodeInitEps, dnodeCleanupEps},
{"globalcfg" ,taosCheckGlobalCfg, NULL},
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"shell", dnodeInitShell, dnodeCleanupShell},
{"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVRead, dnodeCleanupVRead},
......@@ -75,7 +76,6 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mgmt", dnodeInitMgmt, dnodeCleanupMgmt},
{"modules", dnodeInitModules, dnodeCleanupModules},
{"mgmt-tmr", dnodeInitMgmtTimer, dnodeCleanupMgmtTimer},
{"shell", dnodeInitShell, dnodeCleanupShell},
{"telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
......
......@@ -144,7 +144,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
if (code != TSDB_CODE_RPC_REDIRECT) return code;
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
......
......@@ -65,12 +65,15 @@ void mnodeStopSystem();
void sdbUpdateAsync();
void sdbUpdateSync(void *pMnodes);
bool mnodeIsRunning();
bool mnodeIsReady();
int32_t mnodeInitCode();
int32_t mnodeProcessRead(SMnodeMsg *pMsg);
int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg);
void mnodeProcessPeerRsp(SRpcMsg *pMsg);
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
#ifdef __cplusplus
}
#endif
......
......@@ -125,6 +125,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SHOWOBJ, 0, 0x030B, "Data expir
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_QUERY_ID, 0, 0x030C, "Invalid query id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_ID, 0, 0x030D, "Invalid stream id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CONN_ID, 0, 0x030E, "Invalid connection id")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT, 0, 0x030F, "Mnode is initializing")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT_SDB, 0, 0x0310, "Mnode is initializing meta data, it takes a while if many tables exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INIT_OTHER, 0, 0x0311, "Mnode is initializing other data")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE, 0, 0x0320, "Object already there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SDB_ERROR, 0, 0x0321, "Unexpected generic error in sdb")
......@@ -184,6 +187,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, 0, 0x0385, "Too many d
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, 0, 0x0386, "Database not available")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_READY, 0, 0x0387, "Database unsynced")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_DAYS, 0, 0x0390, "Invalid database option: days out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION_KEEP, 0, 0x0391, "Invalid database option: keep >= keep2 >= keep1 >= days")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "Dnode out of memory")
......@@ -261,9 +267,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sy
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_VND_COMMITING, 0, 0x0904, "Vnode is commiting")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_FILE_CHNAGED, 0, 0x0905, "Vnode file is changed")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_APP_ERROR, 0, 0x1000, "Unexpected generic error in sync")
// wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
......@@ -86,7 +86,7 @@ typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level);
typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion);
// get file version
typedef int32_t (*FGetFileVersion)(int32_t vgId, uint64_t *fver);
typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver);
typedef struct {
int32_t vgId; // vgroup ID
......@@ -100,7 +100,7 @@ typedef struct {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
FGetVersion getVersion;
} SSyncInfo;
typedef void *tsync_h;
......
......@@ -236,30 +236,28 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) {
mError("invalid db option daysPerFile:%d valid range: [%d, %d]", pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE,
TSDB_MAX_DAYS_PER_FILE);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return TSDB_CODE_MND_INVALID_DB_OPTION_DAYS;
}
if (pCfg->daysToKeep < TSDB_MIN_KEEP || pCfg->daysToKeep > TSDB_MAX_KEEP) {
mError("invalid db option daysToKeep:%d valid range: [%d, %d]", pCfg->daysToKeep, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP;
}
if (pCfg->daysToKeep < pCfg->daysPerFile) {
mError("invalid db option daysToKeep:%d should larger than daysPerFile:%d", pCfg->daysToKeep, pCfg->daysPerFile);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP;
}
#if 0
if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > pCfg->daysToKeep) {
mError("invalid db option daysToKeep2:%d valid range: [%d, %d]", pCfg->daysToKeep, TSDB_MIN_KEEP, pCfg->daysToKeep);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP;
}
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > pCfg->daysToKeep2) {
mError("invalid db option daysToKeep1:%d valid range: [%d, %d]", pCfg->daysToKeep1, TSDB_MIN_KEEP, pCfg->daysToKeep2);
return TSDB_CODE_MND_INVALID_DB_OPTION;
return TSDB_CODE_MND_INVALID_DB_OPTION_KEEP;
}
#endif
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
mError("invalid db option maxRowsPerFileBlock:%d valid range: [%d, %d]", pCfg->maxRowsPerFileBlock,
......
......@@ -17,6 +17,7 @@
#include "os.h"
#include "taosdef.h"
#include "tsched.h"
#include "taoserror.h"
#include "tbalance.h"
#include "tgrant.h"
#include "ttimer.h"
......@@ -37,30 +38,41 @@
#include "mnodeShow.h"
#include "mnodeProfile.h"
typedef enum {
TSDB_MND_STATUS_NOT_RUNNING,
TSDB_MND_STATUS_INIT,
TSDB_MND_STATUS_INIT_SDB,
TSDB_MND_STATUS_INIT_OTHER,
TSDB_MND_STATUS_READY,
TSDB_MND_STATUS_CLEANING,
} EMndStatus;
typedef struct {
const char *const name;
int (*init)();
void (*cleanup)();
EMndStatus status;
} SMnodeComponent;
void *tsMnodeTmr = NULL;
static bool tsMgmtIsRunning = false;
void *tsMnodeTmr = NULL;
static bool tsMgmtIsRunning = false;
static EMndStatus tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING;
static const SMnodeComponent tsMnodeComponents[] = {
{"sdbref", sdbInitRef, sdbCleanUpRef},
{"profile", mnodeInitProfile, mnodeCleanupProfile},
{"cluster", mnodeInitCluster, mnodeCleanupCluster},
{"accts", mnodeInitAccts, mnodeCleanupAccts},
{"users", mnodeInitUsers, mnodeCleanupUsers},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes},
{"dbs", mnodeInitDbs, mnodeCleanupDbs},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups},
{"tables", mnodeInitTables, mnodeCleanupTables},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
{"sdb", sdbInit, sdbCleanUp},
{"balance", balanceInit, balanceCleanUp},
{"grant", grantInit, grantCleanUp},
{"show", mnodeInitShow, mnodeCleanUpShow}
{"sdbref", sdbInitRef, sdbCleanUpRef, TSDB_MND_STATUS_INIT},
{"profile", mnodeInitProfile, mnodeCleanupProfile, TSDB_MND_STATUS_INIT},
{"cluster", mnodeInitCluster, mnodeCleanupCluster, TSDB_MND_STATUS_INIT},
{"accts", mnodeInitAccts, mnodeCleanupAccts, TSDB_MND_STATUS_INIT},
{"users", mnodeInitUsers, mnodeCleanupUsers, TSDB_MND_STATUS_INIT},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes, TSDB_MND_STATUS_INIT},
{"dbs", mnodeInitDbs, mnodeCleanupDbs, TSDB_MND_STATUS_INIT},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups, TSDB_MND_STATUS_INIT},
{"tables", mnodeInitTables, mnodeCleanupTables, TSDB_MND_STATUS_INIT},
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes, TSDB_MND_STATUS_INIT},
{"sdb", sdbInit, sdbCleanUp, TSDB_MND_STATUS_INIT_SDB},
{"balance", balanceInit, balanceCleanUp, TSDB_MND_STATUS_INIT_OTHER},
{"grant", grantInit, grantCleanUp, TSDB_MND_STATUS_INIT_OTHER},
{"show", mnodeInitShow, mnodeCleanUpShow, TSDB_MND_STATUS_INIT_OTHER},
};
static void mnodeInitTimer();
......@@ -76,21 +88,24 @@ static void mnodeCleanupComponents(int32_t stepId) {
static int32_t mnodeInitComponents() {
int32_t code = 0;
for (int32_t i = 0; i < sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]); i++) {
tsMgmtStatus = tsMnodeComponents[i].status;
if (tsMnodeComponents[i].init() != 0) {
mnodeCleanupComponents(i);
code = -1;
break;
}
// sleep(3);
}
return code;
}
int32_t mnodeStartSystem() {
if (tsMgmtIsRunning) {
if (tsMgmtStatus != TSDB_MND_STATUS_NOT_RUNNING) {
mInfo("mnode module already started...");
return 0;
}
tsMgmtStatus = TSDB_MND_STATUS_INIT;
mInfo("starting to initialize mnode ...");
if (mkdir(tsMnodeDir, 0755) != 0 && errno != EEXIST) {
mError("failed to init mnode dir:%s, reason:%s", tsMnodeDir, strerror(errno));
......@@ -106,7 +121,7 @@ int32_t mnodeStartSystem() {
}
grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true;
tsMgmtStatus = TSDB_MND_STATUS_READY;
mInfo("mnode is initialized successfully");
......@@ -126,7 +141,7 @@ int32_t mnodeInitSystem() {
void mnodeCleanupSystem() {
if (tsMgmtIsRunning) {
mInfo("starting to clean up mnode");
tsMgmtIsRunning = false;
tsMgmtStatus = TSDB_MND_STATUS_CLEANING;
dnodeFreeMWritequeue();
dnodeFreeMReadQueue();
......@@ -134,6 +149,7 @@ void mnodeCleanupSystem() {
mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
tsMgmtStatus = TSDB_MND_STATUS_NOT_RUNNING;
mInfo("mnode is cleaned up");
}
}
......@@ -184,5 +200,29 @@ static bool mnodeNeedStart() {
}
bool mnodeIsRunning() {
return tsMgmtIsRunning;
return (tsMgmtStatus != TSDB_MND_STATUS_NOT_RUNNING && tsMgmtStatus != TSDB_MND_STATUS_CLEANING);
}
bool mnodeIsReady() {
return (tsMgmtStatus == TSDB_MND_STATUS_READY);
}
int32_t mnodeInitCode() {
int32_t code = -1;
switch (tsMgmtStatus) {
case TSDB_MND_STATUS_INIT:
code = TSDB_CODE_MND_INIT;
break;
case TSDB_MND_STATUS_INIT_SDB:
code = TSDB_CODE_MND_INIT_SDB;
break;
case TSDB_MND_STATUS_INIT_OTHER:
code = TSDB_CODE_MND_INIT_OTHER;
break;
default:
code = TSDB_CODE_MND_INIT;
}
return code;
}
......@@ -251,6 +251,16 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
sdbUpdateMnodeRoles();
}
static int32_t sdbNotifyFileSynced(int32_t vgId, uint64_t fversion) { return 0; }
static void sdbNotifyFlowCtrl(int32_t vgId, int32_t level) {}
static int32_t sdbGetSyncVersion(int32_t vgId, uint64_t *fver, uint64_t *vver) {
*fver = 0;
*vver = 0;
return 0;
}
// failed to forward, need revert insert
static void sdbHandleFailedConfirm(SSdbRow *pRow) {
SWalHead *pHead = pRow->pHead;
......@@ -299,7 +309,7 @@ void sdbUpdateAsync() {
void sdbUpdateSync(void *pMnodes) {
SMnodeInfos *mnodes = pMnodes;
if (!mnodeIsRunning()) {
if (!mnodeIsReady()) {
mDebug("vgId:1, mnode not start yet, update sync config later");
return;
}
......@@ -372,11 +382,14 @@ void sdbUpdateSync(void *pMnodes) {
syncInfo.version = sdbGetVersion();
syncInfo.syncCfg = syncCfg;
sprintf(syncInfo.path, "%s", tsMnodeDir);
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.getFileInfo = sdbGetFileInfo;
syncInfo.getWalInfo = sdbGetWalInfo;
syncInfo.writeToCache = sdbWriteFwdToQueue;
syncInfo.confirmForward = sdbConfirmForward;
syncInfo.notifyRole = sdbNotifyRole;
syncInfo.notifyFileSynced = sdbNotifyFileSynced;
syncInfo.notifyFlowCtrl = sdbNotifyFlowCtrl;
syncInfo.getVersion = sdbGetSyncVersion;
tsSdbMgmt.cfg = syncCfg;
if (tsSdbMgmt.sync) {
......
......@@ -585,10 +585,21 @@ void mnodeDropAllUsers(SAcctObj *pAcct) {
}
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
*secret = 0;
if (!mnodeIsRunning()) {
mDebug("user:%s, mnode is not running, fail to auth", user);
return TSDB_CODE_RPC_REDIRECT;
}
if (!mnodeIsReady()) {
mDebug("user:%s, failed to auth user, mnode is not ready", user);
return mnodeInitCode();
}
if (!sdbIsMaster()) {
*secret = 0;
mDebug("user:%s, failed to auth user, mnode is not master", user);
return TSDB_CODE_APP_NOT_READY;
return TSDB_CODE_RPC_REDIRECT;
}
SUserObj *pUser = mnodeGetUser(user);
......
......@@ -630,8 +630,16 @@ static void rpcReleaseConn(SRpcConn *pConn) {
} else {
// if there is an outgoing message, free it
if (pConn->outType && pConn->pReqMsg) {
if (pConn->pContext) pConn->pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pConn->pContext->rid);
SRpcReqContext *pContext = pConn->pContext;
if (pContext->pRsp) {
// for synchronous API, post semaphore to unblock app
pContext->pRsp->code = TSDB_CODE_RPC_APP_ERROR;
pContext->pRsp->pCont = NULL;
pContext->pRsp->contLen = 0;
tsem_post(pContext->pSem);
}
pContext->pConn = NULL;
taosRemoveRef(tsRpcRefId, pContext->rid);
}
}
......@@ -1551,10 +1559,9 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen) {
if ( !rpcIsReq(pHead->msgType) ) {
// for response, if code is auth failure, it shall bypass the auth process
code = htonl(pHead->code);
if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
code == TSDB_CODE_RPC_AUTH_REQUIRED || code == TSDB_CODE_MND_INVALID_USER || code == TSDB_CODE_RPC_NOT_READY) {
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
if (code != 0) {
// tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
return 0;
}
}
......
......@@ -139,16 +139,14 @@ typedef struct SsyncPeer {
char id[TSDB_EP_LEN + 32]; // peer vgId + end point
uint64_t version;
uint64_t sversion; // track the peer version in retrieve process
uint64_t lastVer; // track the file version while retrieve
uint64_t lastFileVer; // track the file version while retrieve
uint64_t lastWalVer; // track the wal version while retrieve
int32_t syncFd;
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process
void * timer;
void * pConn;
int32_t notifyFd;
int32_t watchNum;
int32_t *watchFd;
int32_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
......@@ -173,7 +171,7 @@ typedef struct SSyncNode {
FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced;
FGetFileVersion getFileVersion;
FGetVersion getVersion;
pthread_mutex_t mutex;
} SSyncNode;
......
......@@ -196,7 +196,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->confirmForward = pInfo->confirmForward;
pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl;
pNode->notifyFileSynced = pInfo->notifyFileSynced;
pNode->getFileVersion = pInfo->getFileVersion;
pNode->getVersion = pInfo->getVersion;
pNode->selfIndex = -1;
pNode->vgId = pInfo->vgId;
......@@ -498,7 +498,6 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
sDebug("%s, resource is freed", pPeer->id);
tfree(pPeer->watchFd);
tfree(pPeer);
return 0;
}
......
......@@ -26,39 +26,78 @@
#include "tsync.h"
#include "syncInt.h"
static int32_t syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
if (pNode->getFileVersion == NULL) return TSDB_CODE_SUCCESS;
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return -1;
}
pPeer->lastWalVer = wver;
return code;
}
static bool syncIsWalModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last wver:%" PRIu64, pPeer->id, pPeer->lastWalVer);
return true;
}
if (wver != pPeer->lastWalVer) {
sDebug("%s, wal is modified while retrieve, wver:%" PRIu64 ", last:%" PRIu64, pPeer->id, wver, pPeer->lastWalVer);
return true;
}
return false;
}
uint64_t fver = 0;
int32_t code = (*pNode->getFileVersion)(pNode->vgId, &fver);
static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sInfo("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastVer);
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
return -1;
}
pPeer->lastFileVer = fver;
return code;
}
static bool syncAreFilesModified(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
pPeer->fileChanged = 1;
return TSDB_CODE_SYN_VND_COMMITING;
return true;
}
if (fver != pPeer->lastVer) {
sInfo("%s, files are modified while retrieve, fver:%" PRIu64 ", last fver:%" PRIu64, pPeer->id, fver, pPeer->lastVer);
if (fver != pPeer->lastFileVer) {
sDebug("%s, files are modified while retrieve, fver:%" PRIu64 ", last:%" PRIu64, pPeer->id, fver, pPeer->lastFileVer);
pPeer->fileChanged = 1;
return TSDB_CODE_SYN_FILE_CHNAGED;
return true;
}
pPeer->fileChanged = 0;
return TSDB_CODE_SUCCESS;
return false;
}
static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; memset(&fileInfo, 0, sizeof(SFileInfo));
SFileAck fileAck = {0};
int32_t code = TSDB_CODE_SYN_APP_ERROR;
int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0};
if (pNode->getFileVersion) (*pNode->getFileVersion)(pNode->vgId, &pPeer->lastVer);
if (syncGetFileVersion(pNode, pPeer) < 0) return -1;
while (1) {
// retrieve file info
fileInfo.name[0] = 0;
fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size);
......@@ -67,14 +106,14 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
// if no file anymore, break
if (fileInfo.magic == 0 || fileInfo.name[0] == 0) {
code = TSDB_CODE_SUCCESS;
code = 0;
sDebug("%s, no more files to sync", pPeer->id);
break;
}
......@@ -82,7 +121,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
code = -1;
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
......@@ -103,7 +142,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file to peer
int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) {
code = TAOS_SYSTEM_ERROR(errno);
code = -1;
sError("%s, failed to open file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
......@@ -111,7 +150,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
ret = taosSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd);
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
code = -1;
sError("%s, failed to send file:%s while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break;
}
......@@ -120,128 +159,103 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.index++;
// check if processed files are modified
code = syncAreFilesModified(pNode, pPeer);
if (code != TSDB_CODE_SUCCESS) break;
if (syncAreFilesModified(pNode, pPeer)) {
code = -1;
break;
}
}
if (code != TSDB_CODE_SUCCESS) {
sError("%s, failed to retrieve file since %s", pPeer->id, tstrerror(code));
sError("%s, failed to retrieve file, code:0x%x", pPeer->id, code);
}
return code;
}
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead, uint32_t *pEvent) {
int32_t ret;
// if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
int32_t ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) {
sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
return -1;
}
ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) return -1;
if (ret == 0) return 0;
if (ret == 0) {
sTrace("sfd:%d, read to the end of file, ret:%d", sfd, ret);
return 0;
}
if (ret != sizeof(SWalHead)) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
sDebug("sfd:%d, a partial wal head is read out, ret:%d", sfd, ret);
return 0;
}
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
ret = read(sfd, pHead->cont, pHead->len);
if (ret < 0) return -1;
if (ret < 0) {
sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
return -1;
}
if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded
*pEvent = *pEvent | IN_MODIFY;
sDebug("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
return 0;
}
return sizeof(SWalHead) + pHead->len;
}
static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
pPeer->watchNum = 0;
taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
return -1;
}
if (pPeer->watchFd == NULL) pPeer->watchFd = malloc(sizeof(int32_t) * tsMaxWatchFiles);
if (pPeer->watchFd == NULL) {
sError("%s, failed to allocate watchFd", pPeer->id);
return -1;
}
memset(pPeer->watchFd, -1, sizeof(int32_t) * tsMaxWatchFiles);
int32_t *wd = pPeer->watchFd;
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) {
sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno));
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset) {
int32_t sfd = open(name, O_RDONLY);
if (sfd < 0) {
sError("%s, failed to open wal:%s for retrieve since:%s", pPeer->id, name, tstrerror(errno));
return -1;
}
return 0;
}
static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
char buf[2048];
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
int32_t code = taosLSeek(sfd, offset, SEEK_SET);
if (code < 0) {
sError("%s, failed to seek %" PRId64 " in wal:%s for retrieve since:%s", pPeer->id, offset, name, tstrerror(errno));
close(sfd);
return -1;
}
if (len == 0) return 0;
struct inotify_event *event;
for (char *ptr = buf; ptr < buf + len; ptr += sizeof(struct inotify_event) + event->len) {
event = (struct inotify_event *)ptr;
if (event->mask & IN_MODIFY) *pEvent = *pEvent | IN_MODIFY;
if (event->mask & IN_CLOSE_WRITE) *pEvent = *pEvent | IN_CLOSE_WRITE;
}
if (pEvent != 0) sDebug("%s, last wal event:0x%x", pPeer->id, *pEvent);
return 0;
}
sDebug("%s, retrieve last wal:%s, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, name, offset, fversion);
static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
SWalHead *pHead = malloc(SYNC_MAX_SIZE);
int32_t code = -1;
int32_t bytes = 0;
int32_t sfd;
sfd = open(name, O_RDONLY);
if (sfd < 0) {
free(pHead);
return -1;
}
(void)lseek(sfd, offset, SEEK_SET);
sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion);
while (1) {
int32_t wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize < 0) break;
if (wsize == 0) {
code = 0;
code = syncReadOneWalRecord(sfd, pHead);
if (code < 0) {
sError("%s, failed to read one record from wal:%s", pPeer->id, name);
break;
}
if (code == 0) {
code = bytes;
sDebug("%s, read to the end of wal, bytes:%d", pPeer->id, bytes);
break;
}
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t wsize = code;
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) break;
pPeer->sversion = pHead->version;
if (ret != wsize) {
code = -1;
sError("%s, failed to forward wal since %s, hver:%" PRIu64, pPeer->id, strerror(errno), pHead->version);
break;
}
pPeer->sversion = pHead->version;
bytes += wsize;
if (pHead->version >= fversion && fversion > 0) {
code = 0;
bytes = 0;
sDebug("%s, retrieve wal finished, hver:%" PRIu64 " fver:%" PRIu64, pPeer->id, pHead->version, fversion);
break;
}
}
......@@ -249,92 +263,62 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
free(pHead);
close(sfd);
if (code == 0) return bytes;
return -1;
return code;
}
static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
SSyncNode *pNode = pPeer->pSyncNode;
int32_t code = -1;
int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0;
uint64_t fversion = 0;
char fname[TSDB_FILENAME_LEN * 2] = {0}; // full path to wal file
if (syncAreFilesModified(pNode, pPeer) != 0) return -1;
// get full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
while (1) {
int32_t once = 0; // last WAL has once ever been processed
int64_t offset = 0;
uint64_t fversion = 0;
uint32_t event = 0;
// get full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
sDebug("%s, start to retrieve last wal:%s", pPeer->id, fname);
// monitor last wal
if (syncMonitorLastWal(pPeer, fname) < 0) break;
while (1) {
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset, &event);
if (bytes < 0) break;
if (syncAreFilesModified(pNode, pPeer)) return -1;
if (syncGetWalVersion(pNode, pPeer) < 0) return -1;
// check file changes
if (syncCheckLastWalChanges(pPeer, &event) < 0) break;
// if file is not updated or updated once, set the fversion and sstatus
if (((event & IN_MODIFY) == 0) || once) {
if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
sDebug("%s, fversion is 0 then set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
fversion = nodeVersion; // must read data to fversion
}
}
int32_t bytes = syncRetrieveLastWal(pPeer, fname, fversion, offset);
if (bytes < 0) {
sDebug("%s, failed to retrieve last wal", pPeer->id);
return bytes;
}
// if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) {
code = 0;
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes);
break;
}
// check file changes
bool walModified = syncIsWalModified(pNode, pPeer);
// if all data are read out, and no update
if ((bytes == 0) && ((event & IN_MODIFY) == 0)) {
// wal file is closed, break
if (event & IN_CLOSE_WRITE) {
code = 0;
sDebug("%s, current wal is closed", pPeer->id);
break;
}
// wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000);
// if file is not updated or updated once, set the fversion and sstatus
if (!walModified || once) {
if (fversion == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; // start to forward pkt
fversion = nodeVersion; // must read data to fversion
sDebug("%s, set sstatus:%s and fver:%" PRIu64, pPeer->id, syncStatus[pPeer->sstatus], fversion);
}
// if bytes>0, file is updated, or fversion is not reached but file still open, read again
once = 1;
offset += bytes;
sDebug("%s, retrieve last wal, bytes:%d", pPeer->id, bytes);
event = event & (~IN_MODIFY); // clear IN_MODIFY flag
}
if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break;
// if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) {
sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d sver:%" PRIu64, pPeer->id, fversion, bytes,
pPeer->sversion);
return 0;
}
index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break;
if (wname[0] == 0) {
code = 0;
break;
// if all data are read out, and no update
if (bytes == 0 && !walModified) {
// wal not closed, it means some data not flushed to disk, wait for a while
usleep(10000);
}
// current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id);
// if bytes > 0, file is updated, or fversion is not reached but file still open, read again
once = 1;
offset += bytes;
sDebug("%s, continue retrieve last wal, bytes:%d offset:%" PRId64, pPeer->id, bytes, offset);
}
taosClose(pPeer->notifyFd);
return code;
return -1;
}
static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
......@@ -342,7 +326,6 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2];
int32_t size;
struct stat fstat;
int32_t code = -1;
int64_t index = 0;
......@@ -350,9 +333,14 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// retrieve wal info
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->vgId, wname, &index);
if (code < 0) break; // error
if (code < 0) {
sError("%s, failed to get wal info since:%s, code:0x%x", pPeer->id, strerror(errno), code);
break;
}
if (wname[0] == 0) { // no wal file
sDebug("%s, no wal file", pPeer->id);
code = 0;
sDebug("%s, no wal file anymore", pPeer->id);
break;
}
......@@ -364,20 +352,35 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
// get the full path to wal file
snprintf(fname, sizeof(fname), "%s/%s", pNode->path, wname);
// send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok
if (stat(fname, &fstat) < 0) break;
size = fstat.st_size;
// send wal file, old wal file won't be modified, even remove is ok
struct stat fstat;
if (stat(fname, &fstat) < 0) {
code = -1;
sDebug("%s, failed to stat wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
int32_t sfd = open(fname, O_RDONLY);
if (sfd < 0) break;
if (sfd < 0) {
code = -1;
sError("%s, failed to open wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
code = taosSendFile(pPeer->syncFd, sfd, NULL, size);
close(sfd);
if (code < 0) break;
if (code < 0) {
sError("%s, failed to send wal:%s for retrieve since %s, code:0x%x", pPeer->id, fname, strerror(errno), code);
break;
}
if (syncAreFilesModified(pNode, pPeer) != 0) break;
if (syncAreFilesModified(pNode, pPeer)) {
code = -1;
break;
}
}
if (code == 0) {
......@@ -386,9 +389,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
SWalHead walHead;
memset(&walHead, 0, sizeof(walHead));
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
} else {
sError("%s, failed to send wal since %s", pPeer->id, strerror(errno));
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
}
return code;
......@@ -428,7 +431,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer->sversion = 0;
pPeer->sstatus = TAOS_SYNC_STATUS_FILE;
sInfo("%s, start to retrieve files, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
if (syncRetrieveFile(pPeer) < 0) {
if (syncRetrieveFile(pPeer) != 0) {
sError("%s, failed to retrieve files", pPeer->id);
return -1;
}
......@@ -437,8 +440,9 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
if (pPeer->sversion == 0) pPeer->sversion = 1;
sInfo("%s, start to retrieve wals", pPeer->id);
if (syncRetrieveWal(pPeer) < 0) {
sError("%s, failed to retrieve wals", pPeer->id);
int32_t code = syncRetrieveWal(pPeer);
if (code != 0) {
sError("%s, failed to retrieve wals, code:0x%x", pPeer->id, code);
return -1;
}
......@@ -474,7 +478,6 @@ void *syncRetrieveData(void *param) {
}
pPeer->fileChanged = 0;
taosClose(pPeer->notifyFd);
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
......
......@@ -38,7 +38,7 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level);
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver);
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
#ifndef _SYNC
int64_t syncStart(const SSyncInfo *info) { return NULL; }
......@@ -353,7 +353,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
syncInfo.getFileVersion = vnodeGetFileVersion;
syncInfo.getVersion = vnodeGetVersion;
pVnode->sync = syncStart(&syncInfo);
#ifndef _SYNC
......@@ -771,7 +771,7 @@ static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void
return code;
}
static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vError("vgId:%d, vnode not found while write to cache", vgId);
......@@ -780,10 +780,11 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
int32_t code = 0;
if (pVnode->isCommiting) {
vDebug("vgId:%d, vnode is commiting while get file version", vgId);
vDebug("vgId:%d, vnode is commiting while get version", vgId);
code = -1;
} else {
*fver = pVnode->fversion;
*wver = pVnode->version;
}
vnodeRelease(pVnode);
......
#!/bin/bash
ulimit -c unlimited
python3 ./test.py -f insert/basic.py
python3 ./test.py -f insert/int.py
python3 ./test.py -f insert/float.py
python3 ./test.py -f insert/bigint.py
python3 ./test.py -f insert/bool.py
python3 ./test.py -f insert/double.py
python3 ./test.py -f insert/smallint.py
python3 ./test.py -f insert/tinyint.py
python3 ./test.py -f insert/date.py
python3 ./test.py -f insert/binary.py
python3 ./test.py -f insert/nchar.py
#python3 ./test.py -f insert/nchar-boundary.py
python3 ./test.py -f insert/nchar-unicode.py
python3 ./test.py -f insert/multi.py
python3 ./test.py -f insert/randomNullCommit.py
python3 insert/retentionpolicy.py
python3 ./test.py -f insert/alterTableAndInsert.py
python3 ./test.py -f insert/insertIntoTwoTables.py
python3 ./test.py -f table/alter_wal0.py
python3 ./test.py -f table/column_name.py
python3 ./test.py -f table/column_num.py
python3 ./test.py -f table/db_table.py
python3 ./test.py -f table/create_sensitive.py
#python3 ./test.py -f table/tablename-boundary.py
# tag
python3 ./test.py -f tag_lite/filter.py
python3 ./test.py -f tag_lite/create-tags-boundary.py
python3 ./test.py -f tag_lite/3.py
python3 ./test.py -f tag_lite/4.py
python3 ./test.py -f tag_lite/5.py
python3 ./test.py -f tag_lite/6.py
python3 ./test.py -f tag_lite/add.py
python3 ./test.py -f tag_lite/bigint.py
python3 ./test.py -f tag_lite/binary_binary.py
python3 ./test.py -f tag_lite/binary.py
python3 ./test.py -f tag_lite/bool_binary.py
python3 ./test.py -f tag_lite/bool_int.py
python3 ./test.py -f tag_lite/bool.py
python3 ./test.py -f tag_lite/change.py
python3 ./test.py -f tag_lite/column.py
python3 ./test.py -f tag_lite/commit.py
python3 ./test.py -f tag_lite/create.py
python3 ./test.py -f tag_lite/datatype.py
python3 ./test.py -f tag_lite/datatype-without-alter.py
python3 ./test.py -f tag_lite/delete.py
python3 ./test.py -f tag_lite/double.py
python3 ./test.py -f tag_lite/float.py
python3 ./test.py -f tag_lite/int_binary.py
python3 ./test.py -f tag_lite/int_float.py
python3 ./test.py -f tag_lite/int.py
python3 ./test.py -f tag_lite/set.py
python3 ./test.py -f tag_lite/smallint.py
python3 ./test.py -f tag_lite/tinyint.py
#python3 ./test.py -f dbmgmt/database-name-boundary.py
python3 ./test.py -f import_merge/importBlock1HO.py
python3 ./test.py -f import_merge/importBlock1HPO.py
python3 ./test.py -f import_merge/importBlock1H.py
python3 ./test.py -f import_merge/importBlock1S.py
python3 ./test.py -f import_merge/importBlock1Sub.py
python3 ./test.py -f import_merge/importBlock1TO.py
python3 ./test.py -f import_merge/importBlock1TPO.py
python3 ./test.py -f import_merge/importBlock1T.py
python3 ./test.py -f import_merge/importBlock2HO.py
python3 ./test.py -f import_merge/importBlock2HPO.py
python3 ./test.py -f import_merge/importBlock2H.py
python3 ./test.py -f import_merge/importBlock2S.py
python3 ./test.py -f import_merge/importBlock2Sub.py
python3 ./test.py -f import_merge/importBlock2TO.py
python3 ./test.py -f import_merge/importBlock2TPO.py
python3 ./test.py -f import_merge/importBlock2T.py
python3 ./test.py -f import_merge/importBlockbetween.py
python3 ./test.py -f import_merge/importCacheFileHO.py
python3 ./test.py -f import_merge/importCacheFileHPO.py
python3 ./test.py -f import_merge/importCacheFileH.py
python3 ./test.py -f import_merge/importCacheFileS.py
python3 ./test.py -f import_merge/importCacheFileSub.py
python3 ./test.py -f import_merge/importCacheFileTO.py
python3 ./test.py -f import_merge/importCacheFileTPO.py
python3 ./test.py -f import_merge/importCacheFileT.py
python3 ./test.py -f import_merge/importDataH2.py
python3 ./test.py -f import_merge/importDataHO2.py
python3 ./test.py -f import_merge/importDataHO.py
python3 ./test.py -f import_merge/importDataHPO.py
python3 ./test.py -f import_merge/importDataLastHO.py
python3 ./test.py -f import_merge/importDataLastHPO.py
python3 ./test.py -f import_merge/importDataLastH.py
python3 ./test.py -f import_merge/importDataLastS.py
python3 ./test.py -f import_merge/importDataLastSub.py
python3 ./test.py -f import_merge/importDataLastTO.py
python3 ./test.py -f import_merge/importDataLastTPO.py
python3 ./test.py -f import_merge/importDataLastT.py
python3 ./test.py -f import_merge/importDataS.py
python3 ./test.py -f import_merge/importDataSub.py
python3 ./test.py -f import_merge/importDataTO.py
python3 ./test.py -f import_merge/importDataTPO.py
python3 ./test.py -f import_merge/importDataT.py
python3 ./test.py -f import_merge/importHeadOverlap.py
python3 ./test.py -f import_merge/importHeadPartOverlap.py
python3 ./test.py -f import_merge/importHead.py
python3 ./test.py -f import_merge/importHORestart.py
python3 ./test.py -f import_merge/importHPORestart.py
python3 ./test.py -f import_merge/importHRestart.py
python3 ./test.py -f import_merge/importLastHO.py
python3 ./test.py -f import_merge/importLastHPO.py
python3 ./test.py -f import_merge/importLastH.py
python3 ./test.py -f import_merge/importLastS.py
python3 ./test.py -f import_merge/importLastSub.py
python3 ./test.py -f import_merge/importLastTO.py
python3 ./test.py -f import_merge/importLastTPO.py
python3 ./test.py -f import_merge/importLastT.py
python3 ./test.py -f import_merge/importSpan.py
python3 ./test.py -f import_merge/importSRestart.py
python3 ./test.py -f import_merge/importSubRestart.py
python3 ./test.py -f import_merge/importTailOverlap.py
python3 ./test.py -f import_merge/importTailPartOverlap.py
python3 ./test.py -f import_merge/importTail.py
python3 ./test.py -f import_merge/importToCommit.py
python3 ./test.py -f import_merge/importTORestart.py
python3 ./test.py -f import_merge/importTPORestart.py
python3 ./test.py -f import_merge/importTRestart.py
python3 ./test.py -f import_merge/importInsertThenImport.py
python3 ./test.py -f import_merge/importCSV.py
# user
python3 ./test.py -f user/user_create.py
python3 ./test.py -f user/pass_len.py
# stable
python3 ./test.py -f stable/query_after_reset.py
# table
python3 ./test.py -f table/del_stable.py
#query
python3 ./test.py -f query/filter.py
python3 ./test.py -f query/filterCombo.py
python3 ./test.py -f query/queryNormal.py
python3 ./test.py -f query/queryError.py
python3 ./test.py -f query/filterAllIntTypes.py
python3 ./test.py -f query/filterFloatAndDouble.py
python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py -f query/querySort.py
python3 ./test.py -f query/queryJoin.py
python3 ./test.py -f query/select_last_crash.py
python3 ./test.py -f query/queryNullValueTest.py
python3 ./test.py -f query/queryInsertValue.py
python3 ./test.py -f query/queryConnection.py
python3 ./test.py -f query/queryCountCSVData.py
python3 ./test.py -f query/natualInterval.py
python3 ./test.py -f query/bug1471.py
#python3 ./test.py -f query/dataLossTest.py
python3 ./test.py -f query/bug1874.py
python3 ./test.py -f query/bug1875.py
python3 ./test.py -f query/bug1876.py
python3 ./test.py -f query/bug2218.py
#stream
python3 ./test.py -f stream/metric_1.py
python3 ./test.py -f stream/new.py
python3 ./test.py -f stream/stream1.py
python3 ./test.py -f stream/stream2.py
#python3 ./test.py -f stream/parser.py
python3 ./test.py -f stream/history.py
#alter table
python3 ./test.py -f alter/alter_table_crash.py
# client
python3 ./test.py -f client/client.py
python3 ./test.py -f client/version.py
python3 ./test.py -f client/alterDatabase.py
# Misc
python3 testCompress.py
python3 testNoCompress.py
python3 testMinTablesPerVnode.py
# functions
python3 ./test.py -f functions/function_avg.py -r 1
python3 ./test.py -f functions/function_bottom.py -r 1
python3 ./test.py -f functions/function_count.py -r 1
python3 ./test.py -f functions/function_diff.py -r 1
python3 ./test.py -f functions/function_first.py -r 1
python3 ./test.py -f functions/function_last.py -r 1
python3 ./test.py -f functions/function_last_row.py -r 1
python3 ./test.py -f functions/function_leastsquares.py -r 1
python3 ./test.py -f functions/function_max.py -r 1
python3 ./test.py -f functions/function_min.py -r 1
python3 ./test.py -f functions/function_operations.py -r 1
python3 ./test.py -f functions/function_percentile.py -r 1
python3 ./test.py -f functions/function_spread.py -r 1
python3 ./test.py -f functions/function_stddev.py -r 1
python3 ./test.py -f functions/function_sum.py -r 1
python3 ./test.py -f functions/function_top.py -r 1
#python3 ./test.py -f functions/function_twa.py -r 1
python3 queryCount.py
python3 ./test.py -f query/queryGroupbyWithInterval.py
python3 client/twoClients.py
python3 test.py -f query/queryInterval.py
python3 test.py -f query/queryFillTest.py
# tools
python3 test.py -f tools/taosdemoTest.py
python3 test.py -f tools/taosdumpTest.py
python3 test.py -f tools/lowaTest.py
# subscribe
python3 test.py -f subscribe/singlemeter.py
#python3 test.py -f subscribe/stability.py
python3 test.py -f subscribe/supertable.py
# update
python3 ./test.py -f update/allow_update.py
python3 ./test.py -f update/allow_update-0.py
python3 ./test.py -f update/append_commit_data.py
python3 ./test.py -f update/append_commit_last-0.py
python3 ./test.py -f update/append_commit_last.py
python3 ./test.py -f update/merge_commit_data.py
python3 ./test.py -f update/merge_commit_data-0.py
python3 ./test.py -f update/merge_commit_data2.py
python3 ./test.py -f update/merge_commit_data2_update0.py
python3 ./test.py -f update/merge_commit_last-0.py
python3 ./test.py -f update/merge_commit_last.py
# wal
python3 ./test.py -f wal/addOldWalTest.py
\ No newline at end of file
......@@ -137,6 +137,12 @@ if [ "$2" != "sim" ]; then
elif [ "$1" == "pytest" ]; then
echo "### run Python full test ###"
runPyCaseOneByOne fulltest.sh
elif [ "$1" == "p1" ]; then
echo "### run Python_1 test ###"
runPyCaseOneByOne pytest_1.sh
elif [ "$1" == "p2" ]; then
echo "### run Python_2 test ###"
runPyCaseOneByOne pytest_2.sh
elif [ "$1" == "b2" ] || [ "$1" == "b3" ]; then
exit $(($totalFailed + $totalPyFailed))
elif [ "$1" == "smoke" ] || [ -z "$1" ]; then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册