diff --git a/Jenkinsfile b/Jenkinsfile index 6d563d132b30e8c27f8c57f6f03f5d733703d3f3..08b6121f12916a8be712dbc4585f167d4dc679f0 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -118,6 +118,22 @@ pipeline { date''' } } + stage('connector'){ + agent{label "release"} + steps{ + sh''' + cd ${WORKSPACE} + git checkout develop + cd tests/gotest + bash batchtest.sh + cd ${WORKSPACE}/tests/examples/JDBC/JDBCDemo/ + mvn clean package assembly:single >/dev/null + java -jar target/jdbcChecker-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1 + cd ${WORKSPACE}/tests/examples/python/PYTHONConnectorChecker + python3 PythonChecker.py + ''' + } + } } } diff --git a/documentation20/webdocs/markdowndocs/cluster-ch.md b/documentation20/webdocs/markdowndocs/cluster-ch.md index 10579df79a2573fef8fdcb16a239580c96ce711a..10c28c284c870d1a522a178beb5303901f079a5a 100644 --- a/documentation20/webdocs/markdowndocs/cluster-ch.md +++ b/documentation20/webdocs/markdowndocs/cluster-ch.md @@ -213,8 +213,6 @@ SHOW MNODES; ## Arbitrator的使用 -如果副本数为偶数,当一个vnode group里一半或超过一半的vnode不工作时,是无法从中选出master的。同理,一半或超过一半的mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到arbitrator, 那么节点B就能正常工作。 +如果副本数为偶数,当一个vnode group里一半vnode不工作时,是无法从中选出master的。同理,一半mnode不工作时,是无法选出mnode的master的,因为存在“split brain”问题。为解决这个问题,TDengine引入了arbitrator的概念。Arbitrator模拟一个vnode或mnode在工作,但只简单的负责网络连接,不处理任何数据插入或访问。只要包含arbitrator在内,超过半数的vnode或mnode工作,那么该vnode group或mnode组就可以正常的提供数据插入或查询服务。比如对于副本数为2的情形,如果一个节点A离线,但另外一个节点B正常,而且能连接到arbitrator, 那么节点B就能正常工作。 -下载最新arbitrator及之前版本的安装包,请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。 - -TDengine Arbitrator安装包里带有一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。 +TDengine提供一个执行程序tarbitrator, 找任何一台Linux服务器运行它即可。请点击[安装包下载](https://www.taosdata.com/cn/all-downloads/),在TDengine Arbitrator Linux一节中,选择适合的版本下载并安装。该程序对系统资源几乎没有要求,只需要保证有网络连接即可。该应用的命令行参数`-p`可以指定其对外服务的端口号,缺省是6042。配置每个taosd实例时,可以在配置文件taos.cfg里将参数arbitrator设置为arbitrator的End Point。如果该参数配置了,当副本数为偶数数,系统将自动连接配置的arbitrator。如果副本数为奇数,即使配置了arbitrator, 系统也不会去建立连接。 diff --git a/src/balance/src/balance.c b/src/balance/src/balance.c index 0fa4b3f34615aad3fb749d44cb27b3feab10f49c..0e9bb85b25defd169fea8711d3e0b40304500de4 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/balance.c @@ -216,8 +216,8 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { SVnodeGid *pVnode = pVgroup->vnodeGid + i; if (pVnode == pRmVnode) continue; - mTrace("vgId:%d, change vgroup status, dnode:%d status:%d", pVgroup->vgId, pVnode->pDnode->dnodeId, - pVnode->pDnode->status); + mTrace("vgId:%d, check vgroup status, dnode:%d status:%d, vnode role:%s", pVgroup->vgId, pVnode->pDnode->dnodeId, + pVnode->pDnode->status, syncRole[pVnode->role]); if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue; if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e99297ef16907facd8b6c89885140c442de2890d..9730a27ab3a9b192856ca4e7d9237eb460b2d8d8 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -604,7 +604,8 @@ static UNUSED_FUNC bool tscKillQueryInDnode(SSqlObj* pSql) { return true; } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { return true; } @@ -718,6 +719,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { SSqlCmd* pCmd = &pSql->cmd; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { return; } @@ -766,6 +768,7 @@ void taos_stop_query(TAOS_RES *res) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { assert(pSql->pRpcCtx == NULL); tscKillSTableQuery(pSql); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1a8ed2e72520625d2b7fed35750994232d761224..e53b58daa27b23f32c4487b38c8de8b3afd7cb7f 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -114,9 +114,9 @@ bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex) { } // for select query super table, the super table vgroup list can not be null in any cases. - if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { -// assert(pTableMetaInfo->vgroupList != NULL); // if retrieve vgroupInfo failed, the value may be null - } + // if (pQueryInfo->command == TSDB_SQL_SELECT && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + // assert(pTableMetaInfo->vgroupList != NULL); + // } if ((pQueryInfo->type & TSDB_QUERY_TYPE_FREE_RESOURCE) == TSDB_QUERY_TYPE_FREE_RESOURCE) { return false; diff --git a/src/connector/python/linux/python2/taos/cursor.py b/src/connector/python/linux/python2/taos/cursor.py index 8c268d8afba2b971709fba6f157abfebdc8dfd1a..37c02d330e856717b5ed0bdac76723cf64d3860b 100644 --- a/src/connector/python/linux/python2/taos/cursor.py +++ b/src/connector/python/linux/python2/taos/cursor.py @@ -192,8 +192,10 @@ class TDengineCursor(object): buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: - block, num_of_fields = CTaosInterface.fetchBlock( - self._result, self._fields) + block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) + errno = CTaosInterface.libtaos.taos_errno(self._result) + if errno != 0: + raise ProgrammingError(CTaosInterface.errStr(self._result), errno) if num_of_fields == 0: break self._rowcount += num_of_fields diff --git a/src/connector/python/linux/python3/taos/cursor.py b/src/connector/python/linux/python3/taos/cursor.py index 3f0f315d3388fcac1c752c847b7fa1c412b06749..ec7a85ee1a3f8cb0cd49aca8c2a4242dca89021e 100644 --- a/src/connector/python/linux/python3/taos/cursor.py +++ b/src/connector/python/linux/python3/taos/cursor.py @@ -207,8 +207,10 @@ class TDengineCursor(object): buffer = [[] for i in range(len(self._fields))] self._rowcount = 0 while True: - block, num_of_fields = CTaosInterface.fetchBlock( - self._result, self._fields) + block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) + errno = CTaosInterface.libtaos.taos_errno(self._result) + if errno != 0: + raise ProgrammingError(CTaosInterface.errStr(self._result), errno) if num_of_fields == 0: break self._rowcount += num_of_fields diff --git a/src/connector/python/windows/python2/taos/cursor.py b/src/connector/python/windows/python2/taos/cursor.py index 7eee3bfc8f8559454f53aa967f5ad0294a1cb2bf..8714fe77cb739f23f79247a41d72aa127b6d6d25 100644 --- a/src/connector/python/windows/python2/taos/cursor.py +++ b/src/connector/python/windows/python2/taos/cursor.py @@ -142,6 +142,9 @@ class TDengineCursor(object): self._rowcount = 0 while True: block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) + errno = CTaosInterface.libtaos.taos_errno(self._result) + if errno != 0: + raise ProgrammingError(CTaosInterface.errStr(self._result), errno) if num_of_fields == 0: break self._rowcount += num_of_fields for i in range(len(self._fields)): diff --git a/src/connector/python/windows/python3/taos/cursor.py b/src/connector/python/windows/python3/taos/cursor.py index 5f5aa4e1d7d9b454132f533a9e84cca2859db735..c2c442b06ee71ae90ec63662886c709e38d4d2ad 100644 --- a/src/connector/python/windows/python3/taos/cursor.py +++ b/src/connector/python/windows/python3/taos/cursor.py @@ -142,6 +142,9 @@ class TDengineCursor(object): self._rowcount = 0 while True: block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields) + errno = CTaosInterface.libtaos.taos_errno(self._result) + if errno != 0: + raise ProgrammingError(CTaosInterface.errStr(self._result), errno) if num_of_fields == 0: break self._rowcount += num_of_fields for i in range(len(self._fields)): diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index f6909e54d41a4ce5936ba4b1b3ff1c79e8766564..e61158ef30dddd6037af36c61be1fd522f8af10b 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { rpcFreeCont(pRead->rpcMsg.pCont); vnodeRelease(pVnode); - return; } static void *dnodeProcessReadQueue(void *param) { diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 15ddb6afee7c3ac2914df8133df24f6ef80a0a8a..ed6c232fdb35efbe7733f88a66f847fd221edf91 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -41,6 +41,8 @@ typedef struct { SRpcMsg rpcMsg; } SReadMsg; +extern char *vnodeStatus[]; + int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); int32_t vnodeOpen(int32_t vgId, char *rootDir); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index fc76e3dce39dc2f6eeee3aac19d13f2dc605c1a1..f8f99e22c6f437b1eece0d704c8c4551bc434110 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) { } SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) { - uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); if (pConn == NULL) { mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); @@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po } // mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port); - pConn->lastAccess = expireTime; + pConn->lastAccess = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); return pConn; } @@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) { SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont; int32_t connId = atoi(pKill->queryId); - SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); + SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t)); if (pConn == NULL) { mError("connId:%s, failed to kill, conn not exist", pKill->queryId); return TSDB_CODE_MND_INVALID_CONN_ID; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f6c2ce29d63212400eef8b1f6d535237af37b57a..3cb8db9fafe4ea8fa985377424152c7d0daecf12 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -7118,6 +7118,7 @@ void qCleanupQueryMgmt(void* pQMgmt) { void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { if (pMgmt == NULL) { + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } @@ -7126,6 +7127,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } @@ -7133,6 +7135,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { if (pQueryMgmt->closed) { // pthread_mutex_unlock(&pQueryMgmt->lock); qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo); + terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } else { TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index a9bcf948b6cb00c25a0b283a0cb63126925abd8c..bc728e86d917c649f1c6eb04e870c0261814a904 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -56,6 +56,14 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif +char* vnodeStatus[] = { + "init", + "ready", + "closing", + "updating", + "reset" +}; + int32_t vnodeInitResources() { int code = syncInit(); if (code != 0) return code; @@ -74,6 +82,7 @@ int32_t vnodeInitResources() { void vnodeCleanupResources() { if (tsDnodeVnodesHash != NULL) { + vDebug("vnode list is cleanup"); taosHashCleanup(tsDnodeVnodesHash); tsDnodeVnodesHash = NULL; } @@ -84,9 +93,10 @@ void vnodeCleanupResources() { int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { int32_t code; - SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t)); - if (pTemp != NULL) { - vInfo("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp); + SVnodeObj *pVnode = vnodeAcquire(pVnodeCfg->cfg.vgId); + if (pVnode != NULL) { + vDebug("vgId:%d, vnode already exist, refCount:%d pVnode:%p", pVnodeCfg->cfg.vgId, pVnode->refCount, pVnode); + vnodeRelease(pVnode); return TSDB_CODE_SUCCESS; } @@ -143,22 +153,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return TSDB_CODE_VND_INIT_FAILED; } - vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod); + vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, + pVnodeCfg->cfg.fsyncPeriod); code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir); return code; } int32_t vnodeDrop(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); - if (ppVnode == NULL || *ppVnode == NULL) { - vDebug("vgId:%d, failed to drop, vgId not find", vgId); + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vDebug("vgId:%d, failed to drop, vnode not find", vgId); return TSDB_CODE_VND_INVALID_VGROUP_ID; } - SVnodeObj *pVnode = *ppVnode; - vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); + vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); pVnode->dropped = 1; + + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return TSDB_CODE_SUCCESS; @@ -340,11 +352,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { } int32_t vnodeClose(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); - if (ppVnode == NULL || *ppVnode == NULL) return 0; + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) return 0; - SVnodeObj *pVnode = *ppVnode; - vDebug("vgId:%d, vnode will be closed", pVnode->vgId); + vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode); + vnodeRelease(pVnode); vnodeCleanUp(pVnode); return 0; @@ -355,21 +367,27 @@ void vnodeRelease(void *pVnodeRaw) { int32_t vgId = pVnode->vgId; int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); + vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode); assert(refCount >= 0); if (refCount > 0) { - vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); - if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) + if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) { tsem_post(&pVnode->sem); + } return; } - qCleanupQueryMgmt(pVnode->qMgmt); - pVnode->qMgmt = NULL; + vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode); + + if (pVnode->qMgmt) { + qCleanupQueryMgmt(pVnode->qMgmt); + pVnode->qMgmt = NULL; + } - if (pVnode->tsdb) + if (pVnode->tsdb) { tsdbCloseRepo(pVnode->tsdb, 1); - pVnode->tsdb = NULL; + pVnode->tsdb = NULL; + } // stop continuous query if (pVnode->cq) { @@ -378,18 +396,21 @@ void vnodeRelease(void *pVnodeRaw) { cqClose(cq); } - if (pVnode->wal) + if (pVnode->wal) { walClose(pVnode->wal); - pVnode->wal = NULL; + pVnode->wal = NULL; + } - if (pVnode->wqueue) + if (pVnode->wqueue) { dnodeFreeVnodeWqueue(pVnode->wqueue); - pVnode->wqueue = NULL; + pVnode->wqueue = NULL; + } - if (pVnode->rqueue) + if (pVnode->rqueue) { dnodeFreeVnodeRqueue(pVnode->rqueue); - pVnode->rqueue = NULL; - + pVnode->rqueue = NULL; + } + taosTFree(pVnode->rootDir); if (pVnode->dropped) { @@ -413,22 +434,31 @@ void vnodeRelease(void *pVnodeRaw) { free(pVnode); int32_t count = taosHashGetSize(tsDnodeVnodesHash); - vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); + vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count); +} + +static void vnodeIncRef(void *ptNode) { + assert(ptNode != NULL); + + SVnodeObj **ppVnode = (SVnodeObj **)ptNode; + assert(ppVnode); + assert(*ppVnode); + + SVnodeObj *pVnode = *ppVnode; + atomic_add_fetch_32(&pVnode->refCount, 1); + vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); } void *vnodeAcquire(int32_t vgId) { - SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); + SVnodeObj **ppVnode = taosHashGetCB(tsDnodeVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *)); + if (ppVnode == NULL || *ppVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - vInfo("vgId:%d, not exist", vgId); + vDebug("vgId:%d, not exist", vgId); return NULL; } - SVnodeObj *pVnode = *ppVnode; - atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); - - return pVnode; + return *ppVnode; } void *vnodeAcquireRqueue(int32_t vgId) { @@ -528,7 +558,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { if (pVnode != NULL) { pVnode->accessState = pAccess[i].accessState; if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { - vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState) + vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState); } vnodeRelease(pVnode); } @@ -538,11 +568,12 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { static void vnodeCleanUp(SVnodeObj *pVnode) { // remove from hash, so new messages wont be consumed taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); - int i = 0; if (pVnode->status != TAOS_VN_STATUS_INIT) { // it may be in updateing or reset state, then it shall wait - while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) { + int i = 0; + while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != + TAOS_VN_STATUS_READY) { if (++i % 1000 == 0) { sched_yield(); } @@ -556,7 +587,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { syncStop(sync); } - vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); + vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); // release local resources only after cutting off outside connections qQueryMgmtNotifyClosed(pVnode->qMgmt); @@ -613,17 +644,19 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) char rootDir[128] = "\0"; sprintf(rootDir, "%s/tsdb", pVnode->rootDir); - if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) + if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) { return -1; + } void *tsdb = pVnode->tsdb; pVnode->tsdb = NULL; // acquire vnode - int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); + int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - if (refCount > 2) + if (refCount > 2) { tsem_wait(&pVnode->sem); + } // close tsdb, then open tsdb tsdbCloseRepo(tsdb, 0); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a86d83cf2d739508487deb35944f6e2dc7c6dff9..11bf569a405ba0f20024bbd584090e0529ef925b 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -39,8 +39,7 @@ void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } -int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { - SVnodeObj *pVnode = (SVnodeObj *)param; +static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) { int msgType = pReadMsg->rpcMsg.msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { @@ -49,16 +48,23 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { } if (pVnode->status != TAOS_VN_STATUS_READY) { - vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); + vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType], + vnodeStatus[pVnode->status]); return TSDB_CODE_APP_NOT_READY; } // tsdb may be in reset state - if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; - if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY; + if (pVnode->tsdb == NULL) { + vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]); + return TSDB_CODE_APP_NOT_READY; + } + + if (pVnode->status == TAOS_VN_STATUS_CLOSING) { + vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType], + vnodeStatus[pVnode->status]); + return TSDB_CODE_APP_NOT_READY; + } - // TODO: Later, let slave to support query - // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, syncRole[pVnode->role]); @@ -68,6 +74,25 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); } +int32_t vnodeProcessRead(void *param, SReadMsg *pRead) { + SVnodeObj *pVnode = (SVnodeObj *)param; + int32_t code = vnodeProcessReadImp(pVnode, pRead); + + if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) { + // After the fetch request enters the vnode queue + // If the vnode cannot provide services, the following operations are still required + // Or, there will be a deadlock + void **qhandle = (void **)pRead->pCont; + vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]); + + // qKillQuery(*qhandle); + // qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); + return TSDB_CODE_APP_NOT_READY; + } else { + return code; + } +} + static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; @@ -175,11 +200,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); - if (handle == NULL) { // failed to register qhandle, todo add error test case + if (handle == NULL) { // failed to register qhandle + pRsp->code = terrno; + terrno = 0; vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); - pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; qDestroyQueryInfo(pQInfo); // destroy it directly + return pRsp->code; } else { assert(*handle == pQInfo); pRsp->qhandle = htobe64((uint64_t)pQInfo); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 0a139f6b81ff4e9d8a8fdbbe31aed0a3d2eb7041..492a396d936b0126c04ef093cfb1a820e3565e79 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -202,7 +202,7 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) { memcpy(pWal, pHead, size); atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); taosWriteQitem(pVnode->wqueue, type, pSync); @@ -219,7 +219,7 @@ int vnodeWriteToQueue(void *param, void *data, int type) { memcpy(pWal, pHead, size); atomic_add_fetch_32(&pVnode->refCount, 1); - vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount); + vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); taosWriteQitem(pVnode->wqueue, type, pWal); diff --git a/tests/pytest/concurrent_inquiry.py b/tests/pytest/concurrent_inquiry.py index 2460900891cff4c7f37d4f0c1e782769e0e6b328..39a4cb48fdc22060f63f443a4ac8142cd6a6903e 100644 --- a/tests/pytest/concurrent_inquiry.py +++ b/tests/pytest/concurrent_inquiry.py @@ -12,108 +12,105 @@ # -*- coding: utf-8 -*- import threading import taos - +import sys import json import time import random # query sql query_sql = [ # first supertable -"select count(*) from test.meters where c1 > 50;", -"select count(*) from test.meters where c2 >= 50 and c2 < 100;", -"select count(*) from test.meters where c3 != 5;", +"select count(*) from test.meters ;", "select count(*) from test.meters where t3 > 2;", "select count(*) from test.meters where ts <> '2020-05-13 10:00:00.002';", -"select count(*) from test.meters where t7 like 'fi%';", -"select count(*) from test.meters where t7 like '_econd';", +"select count(*) from test.meters where t7 like 'taos_1%';", +"select count(*) from test.meters where t7 like '_____2';", +"select count(*) from test.meters where t8 like '%思%';", "select count(*) from test.meters interval(1n) order by ts desc;", -"select max(c0) from test.meters group by tbname", -"select first(*) from test.meters;", -"select last(*) from test.meters;", +#"select max(c0) from test.meters group by tbname", +"select first(ts) from test.meters where t5 >5000 and t5<5100;", +"select last(ts) from test.meters where t5 >5000 and t5<5100;", "select last_row(*) from test.meters;", "select twa(c1) from test.t1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c1) from test.meters;", +"select avg(c1) from test.meters where t5 >5000 and t5<5100;", "select bottom(c1, 2) from test.t1;", "select diff(c1) from test.t1;", "select leastsquares(c1, 1, 1) from test.t1 ;", -"select max(c1) from test.meters;", -"select min(c1) from test.meters;", -"select c1 + c2 * c3 + c1 / c5 + c4 + c2 from test.t1;", +"select max(c1) from test.meters where t5 >5000 and t5<5100;", +"select min(c1) from test.meters where t5 >5000 and t5<5100;", +"select c1 + c2 + c1 / c5 + c4 + c2 from test.t1;", "select percentile(c1, 50) from test.t1;", "select spread(c1) from test.t1 ;", "select stddev(c1) from test.t1;", -"select sum(c1) from test.meters;", -"select top(c1, 2) from test.meters;" -"select twa(c6) from test.t1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c6) from test.meters;", -"select bottom(c6, 2) from test.t1;", -"select diff(c6) from test.t1;", -"select leastsquares(c6, 1, 1) from test.t1 ;", -"select max(c6) from test.meters;", -"select min(c6) from test.meters;", -"select c6 + c2 * c3 + c6 / c5 + c4 + c2 from test.t1;", -"select percentile(c6, 50) from test.t1;", -"select spread(c6) from test.t1 ;", -"select stddev(c6) from test.t1;", -"select sum(c6) from test.meters;", -"select top(c6, 2) from test.meters;", +"select sum(c1) from test.meters where t5 >5000 and t5<5100;", +"select top(c1, 2) from test.meters where t5 >5000 and t5<5100;" +"select twa(c4) from test.t1 where ts > 1500000001000 and ts < 1500000101000" , +"select avg(c4) from test.meters where t5 >5000 and t5<5100;", +"select bottom(c4, 2) from test.t1 where t5 >5000 and t5<5100;", +"select diff(c4) from test.t1 where t5 >5000 and t5<5100;", +"select leastsquares(c4, 1, 1) from test.t1 ;", +"select max(c4) from test.meters where t5 >5000 and t5<5100;", +"select min(c4) from test.meters where t5 >5000 and t5<5100;", +"select c5 + c2 + c4 / c5 + c4 + c2 from test.t1 ;", +"select percentile(c5, 50) from test.t1;", +"select spread(c5) from test.t1 ;", +"select stddev(c5) from test.t1 where t5 >5000 and t5<5100;", +"select sum(c5) from test.meters where t5 >5000 and t5<5100;", +"select top(c5, 2) from test.meters where t5 >5000 and t5<5100;", #all vnode -"select count(*) from test.meters where t5 >2500 and t5<7500", -"select max(c0),avg(c1) from test.meters where t5 >2500 and t5<7500", -"select sum(c5),avg(c1) from test.meters where t5 >2500 and t5<7500", -"select max(c0),min(c6) from test.meters where t5 >2500 and t5<7500", -"select min(c0),avg(c6) from test.meters where t5 >2500 and t5<7500", +"select count(*) from test.meters where t5 >5000 and t5<5100", +"select max(c0),avg(c1) from test.meters where t5 >5000 and t5<5100", +"select sum(c5),avg(c1) from test.meters where t5 >5000 and t5<5100", +"select max(c0),min(c5) from test.meters where t5 >5000 and t5<5100", +"select min(c0),avg(c5) from test.meters where t5 >5000 and t5<5100", # second supertable -"select count(*) from test.meters1 where c1 > 50;", -"select count(*) from test.meters1 where c2 >= 50 and c2 < 100;", -"select count(*) from test.meters1 where c3 != 5;", "select count(*) from test.meters1 where t3 > 2;", "select count(*) from test.meters1 where ts <> '2020-05-13 10:00:00.002';", -"select count(*) from test.meters1 where t7 like 'fi%';", -"select count(*) from test.meters1 where t7 like '_econd';", +"select count(*) from test.meters where t7 like 'taos_1%';", +"select count(*) from test.meters where t7 like '_____2';", +"select count(*) from test.meters where t8 like '%思%';", "select count(*) from test.meters1 interval(1n) order by ts desc;", -"select max(c0) from test.meters1 group by tbname", -"select first(*) from test.meters1;", -"select last(*) from test.meters1;", -"select last_row(*) from test.meters1;", +#"select max(c0) from test.meters1 group by tbname", +"select first(ts) from test.meters1 where t5 >5000 and t5<5100;", +"select last(ts) from test.meters1 where t5 >5000 and t5<5100;", +"select last_row(*) from test.meters1 ;", "select twa(c1) from test.m1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c1) from test.meters1;", -"select bottom(c1, 2) from test.m1;", -"select diff(c1) from test.m1;", +"select avg(c1) from test.meters1 where t5 >5000 and t5<5100;", +"select bottom(c1, 2) from test.m1 where t5 >5000 and t5<5100;", +"select diff(c1) from test.m1 ;", "select leastsquares(c1, 1, 1) from test.m1 ;", -"select max(c1) from test.meters1;", -"select min(c1) from test.meters1;", -"select c1 + c2 * c3 + c1 / c5 + c3 + c2 from test.m1;", +"select max(c1) from test.meters1 where t5 >5000 and t5<5100;", +"select min(c1) from test.meters1 where t5 >5000 and t5<5100;", +"select c1 + c2 + c1 / c0 + c2 from test.m1 ;", "select percentile(c1, 50) from test.m1;", "select spread(c1) from test.m1 ;", "select stddev(c1) from test.m1;", -"select sum(c1) from test.meters1;", -"select top(c1, 2) from test.meters1;", -"select twa(c6) from test.m1 where ts > 1500000001000 and ts < 1500000101000" , -"select avg(c6) from test.meters1;", -"select bottom(c6, 2) from test.m1;", -"select diff(c6) from test.m1;", -"select leastsquares(c6, 1, 1) from test.m1 ;", -"select max(c6) from test.meters1;", -"select min(c6) from test.meters1;", -"select c6 + c2 * c3 + c6 / c5 + c3 + c2 from test.m1;", -"select percentile(c6, 50) from test.m1;", -"select spread(c6) from test.m1 ;", -"select stddev(c6) from test.m1;", -"select sum(c6) from test.meters1;", -"select top(c6, 2) from test.meters1;", -"select count(*) from test.meters1 where t5 >2500 and t5<7500", +"select sum(c1) from test.meters1 where t5 >5000 and t5<5100;", +"select top(c1, 2) from test.meters1 where t5 >5000 and t5<5100;", +"select twa(c5) from test.m1 where ts > 1500000001000 and ts < 1500000101000" , +"select avg(c5) from test.meters1 where t5 >5000 and t5<5100;", +"select bottom(c5, 2) from test.m1;", +"select diff(c5) from test.m1;", +"select leastsquares(c5, 1, 1) from test.m1 ;", +"select max(c5) from test.meters1 where t5 >5000 and t5<5100;", +"select min(c5) from test.meters1 where t5 >5000 and t5<5100;", +"select c5 + c2 + c4 / c5 + c0 from test.m1;", +"select percentile(c4, 50) from test.m1;", +"select spread(c4) from test.m1 ;", +"select stddev(c4) from test.m1;", +"select sum(c4) from test.meters1 where t5 >5100 and t5<5300;", +"select top(c4, 2) from test.meters1 where t5 >5100 and t5<5300;", +"select count(*) from test.meters1 where t5 >5100 and t5<5300", #all vnode -"select count(*) from test.meters1 where t5 >2500 and t5<7500", -"select max(c0),avg(c1) from test.meters1 where t5 >2500 and t5<7500", -"select sum(c5),avg(c1) from test.meters1 where t5 >2500 and t5<7500", -"select max(c0),min(c6) from test.meters1 where t5 >2500 and t5<7500", -"select min(c0),avg(c6) from test.meters1 where t5 >2500 and t5<7500", +"select count(*) from test.meters1 where t5 >5100 and t5<5300", +"select max(c0),avg(c1) from test.meters1 where t5 >5000 and t5<5100", +"select sum(c5),avg(c1) from test.meters1 where t5 >5000 and t5<5100", +"select max(c0),min(c5) from test.meters1 where t5 >5000 and t5<5100", +"select min(c0),avg(c5) from test.meters1 where t5 >5000 and t5<5100", #join -"select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5", -"select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7", -"select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8", -"select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8" +# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t5 = meters1.t5", +# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t7 = meters1.t7", +# "select * from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8", +# "select meters.ts,meters1.c2 from meters,meters1 where meters.ts = meters1.ts and meters.t8 = meters1.t8" ] class ConcurrentInquiry: @@ -121,7 +118,8 @@ class ConcurrentInquiry: self.numOfTherads = 50 self.ts=1500000001000 - + def SetThreadsNum(self,num): + self.numOfTherads=num def query_thread(self,threadID): host = "10.211.55.14" user = "root" @@ -142,12 +140,16 @@ class ConcurrentInquiry: for i in ran_query_sql: print("Thread %d : %s"% (threadID,i)) try: + start = time.time() cl.execute(i) cl.fetchall + end = time.time() + print("time cost :",end-start) except Exception as e: print( "Failure thread%d, sql: %s,exception: %s" % (threadID, str(i),str(e))) + exit(-1) print("Thread %d: finishing" % threadID) @@ -155,9 +157,9 @@ class ConcurrentInquiry: def run(self): - + threads = [] - for i in range(50): + for i in range(self.numOfTherads): thread = threading.Thread(target=self.query_thread, args=(i,)) threads.append(thread) thread.start() diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index fd973b8b7604c9c6b7148a336ca1ac082cd1b548..4b27c42d3875fec5f645e73c3f081ec08329f607 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -149,6 +149,7 @@ python3 ./test.py -f query/queryNullValueTest.py python3 ./test.py -f query/queryInsertValue.py python3 ./test.py -f query/queryConnection.py python3 ./test.py -f query/natualInterval.py +python3 ./test.py -f query/bug1471.py #stream python3 ./test.py -f stream/metric_1.py diff --git a/tests/pytest/query/bug1471.py b/tests/pytest/query/bug1471.py new file mode 100644 index 0000000000000000000000000000000000000000..f1cb0bdcdfbb085410d0001606208e11373687a1 --- /dev/null +++ b/tests/pytest/query/bug1471.py @@ -0,0 +1,73 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +import time +import threading + + +class myThread(threading.Thread): + def __init__(self, conn): + threading.Thread.__init__(self) + self.event = threading.Event() + self.conn = taos.connect(conn._host, port=conn._port, config=conn._config) + + def run(self): + cur = self.conn.cursor() + self.event.wait() + cur.execute("drop database db") + cur.close() + self.conn.close() + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def run(self): + for i in range(50): + print("round", i) + thread = myThread(tdSql.cursor._connection) + thread.start() + + tdSql.execute('reset query cache') + tdSql.execute('drop database if exists db') + tdSql.execute('create database db') + tdSql.execute('use db') + tdSql.execute("create table car (ts timestamp, s int)") + tdSql.execute("insert into car values('2020-10-19 17:00:00', 123)") + + thread.event.set() + try: + tdSql.query("select s from car where ts = '2020-10-19 17:00:00'") + except Exception as e: + pass + else: + tdSql.checkData(0, 0, 123) + + thread.join() + time.sleep(0.2) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())