diff --git a/src/connector/go b/src/connector/go index 8d7bf743852897110cbdcc7c4322cd7a74d4167b..050667e5b4d0eafa5387e4283e713559b421203f 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b +Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index e0345eb1f6f72c28a99b99546f2d0c2a96b8ed37..9d1d16e51e60ced5a07f3aa7d0a5a0f946771be2 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -212,7 +212,7 @@ static void *dnodeProcessVWriteQueue(void *param) { pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; - if (pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; + if (pWrite->code == 0 && pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; dTrace("msg:%p is processed in vwrite queue, result:%s", pWrite, tstrerror(pWrite->code)); } @@ -225,11 +225,13 @@ static void *dnodeProcessVWriteQueue(void *param) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); if (qtype == TAOS_QTYPE_RPC) { dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); - } else if (qtype == TAOS_QTYPE_FWD) { - vnodeConfirmForward(pVnode, pWrite->pHead->version, 0); - taosFreeQitem(pWrite); - vnodeRelease(pVnode); } else { + if (qtype == TAOS_QTYPE_FWD) { + vnodeConfirmForward(pVnode, pWrite->pHead->version, 0); + } + if (pWrite->rspRet.rsp) { + rpcFreeCont(pWrite->rspRet.rsp); + } taosFreeQitem(pWrite); vnodeRelease(pVnode); } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 1d5119dd05a0c6d0b5c31f9ef6733753da3c3f55..20c7af6a21d9a385d00fb3e52fe2a6ffd3d0729e 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -332,7 +332,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf #define TSDB_MIN_DAYS_PER_FILE 1 #define TSDB_MAX_DAYS_PER_FILE 3650 -#define TSDB_DEFAULT_DAYS_PER_FILE 2 +#define TSDB_DEFAULT_DAYS_PER_FILE 10 #define TSDB_MIN_KEEP 1 // data in db to be reserved. #define TSDB_MAX_KEEP 365000 // data in db to be reserved. diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 0cdb24a4da2bcb63b94f1d668eb79e8494704f03..a913deea3772437b76391f95d51509bf61a31754 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -126,7 +126,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ // the TSDB repository info typedef struct STsdbRepoInfo { STsdbCfg tsdbCfg; - int64_t version; // version of the repository + uint64_t version; // version of the repository int64_t tsdbTotalDataSize; // the original inserted data size int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository // TODO: Other informations to add @@ -136,7 +136,7 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo); // the meter information report structure typedef struct { STableCfg tableCfg; - int64_t version; + uint64_t version; int64_t tableTotalDataSize; // In bytes int64_t tableTotalDiskSize; // In bytes } STableInfo; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 12d977f32b224f9ce85db93fb4afc3d6f0e550f8..003ecd0d24498d51d5e82d041740aa6f0a180280 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -71,7 +71,7 @@ typedef struct _SSdbTable { typedef struct { ESyncRole role; ESdbStatus status; - int64_t version; + uint64_t version; int64_t sync; void * wal; SSyncCfg cfg; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index f1ce1d43aa01f5e903f220b472be0090ceaecae5..93739ca3d14a2895aedb3bbe37085c95ac722c96 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -352,7 +352,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { int32_t retLen = write(pPeer->peerFd, msg, msgLen); if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); } else { sDebug("%s, failed to send forward ack, restart", pPeer->id); syncRestartConnection(pPeer); @@ -498,7 +498,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { int32_t checkMs = 100 + (pNode->vgId * 10) % 100; - if (pNode->vgId > 1) checkMs = tsStatusInterval * 2000 + checkMs; + if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs; sDebug("%s, start to check peer connection after %d ms", pPeer->id, checkMs); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer); } @@ -575,6 +575,17 @@ static void syncChooseMaster(SSyncNode *pNode) { if (index == pNode->selfIndex) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; + +#if 0 + for (int32_t i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer->version == nodeVersion) { + pPeer->role = TAOS_SYNC_ROLE_SLAVE; + pPeer->sstatus = TAOS_SYNC_STATUS_CACHE; + sInfo("%s, it shall work as slave", pPeer->id); + } + } +#endif syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->ahandle, nodeRole); } else { @@ -831,7 +842,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo * pFwdInfo; - sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); + sDebug("%s, forward-rsp is received, code:%x ver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { @@ -1097,7 +1108,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - if (taosAcquireRef(tsSyncRefId, pNode->rid) < 0) return; + if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1125,10 +1136,9 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { } SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; + memset(pFwdInfo, 0, sizeof(SFwdInfo)); pFwdInfo->version = version; pFwdInfo->mhandle = mhandle; - pFwdInfo->acks = 0; - pFwdInfo->confirmed = 0; pFwdInfo->time = time; pSyncFwds->fwds++; @@ -1210,13 +1220,17 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t fwdLen; int32_t code = 0; - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int32_t i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); + + if (pWalHead->version > nodeVersion + 1) { + sError("vgId:%d, hver:%" PRIu64 ", inconsistent with ver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); + if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { + sInfo("vgId:%d, restart connection", pNode->vgId); + for (int32_t i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } } + return TSDB_CODE_SYN_INVALID_VERSION; } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 2ded3b668b47ac3dcf35fef287bae8f056977cb2..a18fa6d0efdaadc3429e7469065cd8a5ba417737 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -294,6 +294,7 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo) { int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases STsdbRepo *pRepo = (STsdbRepo *)repo; + STsdbCfg config = pRepo->config; STsdbCfg * pRCfg = &pRepo->config; if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; @@ -308,22 +309,25 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { bool configChanged = false; if (pRCfg->compression != pCfg->compression) { tsdbAlterCompression(pRepo, pCfg->compression); + config.compression = pCfg->compression; configChanged = true; } if (pRCfg->keep != pCfg->keep) { if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) { tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno)); + config.keep = pCfg->keep; return -1; } configChanged = true; } if (pRCfg->totalBlocks != pCfg->totalBlocks) { tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks); + config.totalBlocks = pCfg->totalBlocks; configChanged = true; } if (configChanged) { - if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) { + if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) { tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 702f482471f8b35674bb9e04c8a44d54dcdb5f16..36b57a7dcd094f4aac3ce5d0bfe5dcbfc6dfad40 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -462,8 +462,8 @@ _exit: tdFreeDataCols(pDataCols); tsdbDestroyCommitIters(iters, pMem->maxTables); tsdbDestroyHelper(&whelper); - tsdbEndCommit(pRepo); tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); + tsdbEndCommit(pRepo); return NULL; } diff --git a/src/util/tests/trefTest.c b/src/util/tests/trefTest.c index 6887b24abdae9f5d9949fee0073a35eb717eb4f7..454860410b4dd0c3fdf65c4b7fd7950a1a7d4446 100644 --- a/src/util/tests/trefTest.c +++ b/src/util/tests/trefTest.c @@ -77,8 +77,8 @@ void *acquireRelease(void *param) { printf("a"); id = random() % pSpace->refNum; - code = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); - if (code >= 0) { + void *p = taosAcquireRef(pSpace->rsetId, pSpace->p[id]); + if (p) { usleep(id % 5 + 1); taosReleaseRef(pSpace->rsetId, pSpace->p[id]); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7813c5217b90c9de99fe14c8aef012c084d6af3c..021c392c5211f58d2c46ffb64807b6d434528969 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -637,9 +637,8 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != - TAOS_VN_STATUS_READY) { - return -1; + if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) { + pVnode->status = TAOS_VN_STATUS_RESET; } void *tsdb = pVnode->tsdb; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 9fa3a11c9c659b44a4edc12110969feb6779a8b7..4cc8819bcba07847cb3fa1b09f0663b6368b148b 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -56,19 +56,19 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) { static int32_t vnodeCheckRead(void *param) { SVnodeObj *pVnode = param; if (pVnode->status != TAOS_VN_STATUS_READY) { - vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } // tsdb may be in reset state if (pVnode->tsdb == NULL) { - vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica, + vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica, syncRole[pVnode->role], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index e67c544fb23456d8775077e8a86f1745508bd0fc..80da91602b13c4677020196532524188f34e0617 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -103,18 +103,18 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara static int32_t vnodeCheckWrite(void *param) { SVnodeObj *pVnode = param; if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { - vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_VND_NO_WRITE_AUTH; } // tsdb may be in reset state if (pVnode->tsdb == NULL) { - vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } if (pVnode->status == TAOS_VN_STATUS_CLOSING) { - vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } diff --git a/tests/examples/JDBC/connectionPools/pom.xml b/tests/examples/JDBC/connectionPools/pom.xml index d117c59637b00ebc1894e533fc8fd6a4f99d203f..2793f0a83ddc88711796c133802c82979ae14be5 100644 --- a/tests/examples/JDBC/connectionPools/pom.xml +++ b/tests/examples/JDBC/connectionPools/pom.xml @@ -42,7 +42,7 @@ com.mchange c3p0 - 0.9.5.2 + 0.9.5.4