提交 922711df 编写于 作者: H Haojun Liao

Merge branch 'develop' into feature/query

Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
...@@ -212,7 +212,7 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -212,7 +212,7 @@ static void *dnodeProcessVWriteQueue(void *param) {
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; if (pWrite->code == 0 && pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code));
} }
...@@ -225,11 +225,13 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -225,11 +225,13 @@ static void *dnodeProcessVWriteQueue(void *param) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
if (qtype == TAOS_QTYPE_RPC) { if (qtype == TAOS_QTYPE_RPC) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
taosFreeQitem(pWrite);
vnodeRelease(pVnode);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
}
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);
}
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -332,7 +332,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf ...@@ -332,7 +332,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
#define TSDB_MIN_DAYS_PER_FILE 1 #define TSDB_MIN_DAYS_PER_FILE 1
#define TSDB_MAX_DAYS_PER_FILE 3650 #define TSDB_MAX_DAYS_PER_FILE 3650
#define TSDB_DEFAULT_DAYS_PER_FILE 2 #define TSDB_DEFAULT_DAYS_PER_FILE 10
#define TSDB_MIN_KEEP 1 // data in db to be reserved. #define TSDB_MIN_KEEP 1 // data in db to be reserved.
#define TSDB_MAX_KEEP 365000 // data in db to be reserved. #define TSDB_MAX_KEEP 365000 // data in db to be reserved.
......
...@@ -126,7 +126,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ ...@@ -126,7 +126,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
// the TSDB repository info // the TSDB repository info
typedef struct STsdbRepoInfo { typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
int64_t version; // version of the repository uint64_t version; // version of the repository
int64_t tsdbTotalDataSize; // the original inserted data size int64_t tsdbTotalDataSize; // the original inserted data size
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add // TODO: Other informations to add
...@@ -136,7 +136,7 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo); ...@@ -136,7 +136,7 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo);
// the meter information report structure // the meter information report structure
typedef struct { typedef struct {
STableCfg tableCfg; STableCfg tableCfg;
int64_t version; uint64_t version;
int64_t tableTotalDataSize; // In bytes int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes int64_t tableTotalDiskSize; // In bytes
} STableInfo; } STableInfo;
......
...@@ -71,7 +71,7 @@ typedef struct _SSdbTable { ...@@ -71,7 +71,7 @@ typedef struct _SSdbTable {
typedef struct { typedef struct {
ESyncRole role; ESyncRole role;
ESdbStatus status; ESdbStatus status;
int64_t version; uint64_t version;
int64_t sync; int64_t sync;
void * wal; void * wal;
SSyncCfg cfg; SSyncCfg cfg;
......
...@@ -352,7 +352,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -352,7 +352,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
int32_t retLen = write(pPeer->peerFd, msg, msgLen); int32_t retLen = write(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) { if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -498,7 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -498,7 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100; int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
} }
...@@ -575,6 +575,17 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -575,6 +575,17 @@ static void syncChooseMaster(SSyncNode *pNode) {
if (index == pNode->selfIndex) { if (index == pNode->selfIndex) {
sInfo("vgId:%d, start to work as master", pNode->vgId); sInfo("vgId:%d, start to work as master", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER; nodeRole = TAOS_SYNC_ROLE_MASTER;
#if 0
for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer->version == nodeVersion) {
pPeer->role = TAOS_SYNC_ROLE_SLAVE;
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, it shall work as slave", pPeer->id);
}
}
#endif
syncResetFlowCtrl(pNode); syncResetFlowCtrl(pNode);
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} else { } else {
...@@ -831,7 +842,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { ...@@ -831,7 +842,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo; SFwdInfo * pFwdInfo;
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); sDebug("%s, forward-rsp is received, code:%x ver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
...@@ -1097,7 +1108,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1097,7 +1108,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
...@@ -1125,10 +1136,9 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { ...@@ -1125,10 +1136,9 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
} }
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
memset(pFwdInfo, 0, sizeof(SFwdInfo));
pFwdInfo->version = version; pFwdInfo->version = version;
pFwdInfo->mhandle = mhandle; pFwdInfo->mhandle = mhandle;
pFwdInfo->acks = 0;
pFwdInfo->confirmed = 0;
pFwdInfo->time = time; pFwdInfo->time = time;
pSyncFwds->fwds++; pSyncFwds->fwds++;
...@@ -1210,13 +1220,17 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1210,13 +1220,17 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t fwdLen; int32_t fwdLen;
int32_t code = 0; int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, if (pWalHead->version > nodeVersion + 1) {
pWalHead->version, nodeVersion); sError("vgId:%d, hver:%" PRIu64 ", inconsistent with ver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion);
for (int32_t i = 0; i < pNode->replica; ++i) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
pPeer = pNode->peerInfo[i]; sInfo("vgId:%d, restart connection", pNode->vgId);
syncRestartConnection(pPeer); for (int32_t i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
} }
return TSDB_CODE_SYN_INVALID_VERSION; return TSDB_CODE_SYN_INVALID_VERSION;
} }
......
...@@ -294,6 +294,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { ...@@ -294,6 +294,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) {
int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases // TODO: think about multithread cases
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg config = pRepo->config;
STsdbCfg * pRCfg = &pRepo->config; STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
...@@ -308,22 +309,25 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { ...@@ -308,22 +309,25 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
bool configChanged = false; bool configChanged = false;
if (pRCfg->compression != pCfg->compression) { if (pRCfg->compression != pCfg->compression) {
tsdbAlterCompression(pRepo, pCfg->compression); tsdbAlterCompression(pRepo, pCfg->compression);
config.compression = pCfg->compression;
configChanged = true; configChanged = true;
} }
if (pRCfg->keep != pCfg->keep) { if (pRCfg->keep != pCfg->keep) {
if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) { if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) {
tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno));
config.keep = pCfg->keep;
return -1; return -1;
} }
configChanged = true; configChanged = true;
} }
if (pRCfg->totalBlocks != pCfg->totalBlocks) { if (pRCfg->totalBlocks != pCfg->totalBlocks) {
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
config.totalBlocks = pCfg->totalBlocks;
configChanged = true; configChanged = true;
} }
if (configChanged) { if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) { if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
......
...@@ -462,8 +462,8 @@ _exit: ...@@ -462,8 +462,8 @@ _exit:
tdFreeDataCols(pDataCols); tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables); tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper); tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
tsdbEndCommit(pRepo);
return NULL; return NULL;
} }
......
...@@ -77,8 +77,8 @@ void *acquireRelease(void *param) { ...@@ -77,8 +77,8 @@ void *acquireRelease(void *param) {
printf("a"); printf("a");
id = random() % pSpace->refNum; id = random() % pSpace->refNum;
code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); void *p = taosAcquireRef(pSpace->rsetId, pSpace->p[id]);
if (code >= 0) { if (p) {
usleep(id % 5 + 1); usleep(id % 5 + 1);
taosReleaseRef(pSpace->rsetId, pSpace->p[id]); taosReleaseRef(pSpace->rsetId, pSpace->p[id]);
} }
......
...@@ -637,9 +637,8 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { ...@@ -637,9 +637,8 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
TAOS_VN_STATUS_READY) { pVnode->status = TAOS_VN_STATUS_RESET;
return -1;
} }
void *tsdb = pVnode->tsdb; void *tsdb = pVnode->tsdb;
......
...@@ -56,19 +56,19 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) { ...@@ -56,19 +56,19 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) {
static int32_t vnodeCheckRead(void *param) { static int32_t vnodeCheckRead(void *param) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode); pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica, vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
syncRole[pVnode->role], pVnode->refCount, pVnode); syncRole[pVnode->role], pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
......
...@@ -103,18 +103,18 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -103,18 +103,18 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
static int32_t vnodeCheckWrite(void *param) { static int32_t vnodeCheckWrite(void *param) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) { if (pVnode->tsdb == NULL) {
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
if (pVnode->status == TAOS_VN_STATUS_CLOSING) { if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
pVnode->refCount, pVnode); pVnode->refCount, pVnode);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
......
...@@ -42,7 +42,7 @@ ...@@ -42,7 +42,7 @@
<dependency> <dependency>
<groupId>com.mchange</groupId> <groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId> <artifactId>c3p0</artifactId>
<version>0.9.5.2</version> <version>0.9.5.4</version>
</dependency> </dependency>
<!-- log4j --> <!-- log4j -->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册