diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 4e05e1ce4c7fea9719e6249bd4b8ed48e5dce4d4..cae67673e91f2e7625a641b78b22c2de9eff557d 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -198,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; + // user-defined callback function is stored in fetchFp + pSql->fetchFp = fp; + pSql->fp = tscAsyncFetchRowsProxy; + if (pRes->qhandle == 0) { tscError("qhandle is NULL"); pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; @@ -205,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi return; } - // user-defined callback function is stored in fetchFp - pSql->fetchFp = fp; - pSql->fp = tscAsyncFetchRowsProxy; - pSql->param = param; tscResetForNextRetrieve(pRes); diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index af4a09a45a1fc314a648ec528cb4caaf9860df7a..9121f31131bc0cafa85830409bf9d40390da6790 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -115,7 +115,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime); if (pConn == NULL) { - mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); + mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); return NULL; } diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index 98fba9cb3b3f39a70fd102721a022ad923e0e43c..cefcca78215da2267001099aa7f728527f481ffa 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) { assert(refCount >= 0); HttpContext **ppContext = pContext->ppContext; - httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount); + httpDebug("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount); if (tsHttpServer.contextCache != NULL) { taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c index 7a515d124ebee78cb7d574a9f340d4e3f62d7cc2..d8ac170d6d749ae2bf6ed313081cb2a98ca09d2a 100644 --- a/src/plugins/http/src/httpSql.c +++ b/src/plugins/http/src/httpSql.c @@ -47,6 +47,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO } } + if (tscResultsetFetchCompleted(result)) { + isContinue = false; + } + if (isContinue) { // retrieve next batch of rows httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", @@ -75,7 +79,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { HttpContext *pContext = (HttpContext *)param; if (pContext == NULL) return; - HttpSqlCmds * multiCmds = pContext->multiCmds; + code = taos_errno(result); + HttpSqlCmds *multiCmds = pContext->multiCmds; HttpEncodeMethod *encode = pContext->encodeMethod; HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos; @@ -109,8 +114,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { return; } - int num_fields = taos_field_count(result); - if (num_fields == 0) { + bool isUpdate = tscIsUpdateQuery(result); + if (isUpdate) { // not select or show commands int affectRows = taos_affected_rows(result); httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s", @@ -221,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num if (numOfRows < 0) { httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr, pContext->user, tstrerror(numOfRows)); - } - - taos_free_result(result); + } + + taos_free_result(result); if (encode->stopJsonFp) { (encode->stopJsonFp)(pContext, &pContext->singleCmd); @@ -238,6 +243,11 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) if (pContext == NULL) return; int32_t code = taos_errno(result); + if (code != unUsedCode) { + httpError("context:%p, fd:%d, ip:%s, user:%s, resultset code:%s input code:%s not matched, sqlObj:%p", pContext, + pContext->fd, pContext->ipstr, pContext->user, tstrerror(code), tstrerror(unUsedCode), (SSqlObj *)result); + } + HttpEncodeMethod *encode = pContext->encodeMethod; if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 37defc3ccf4b961c090c69c5f0785927a18160bc..f94c65834dd0c4154f5f2274163df7c878f9a7ac 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6333,6 +6333,7 @@ int32_t qKillQuery(qinfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } + sem_post(&pQInfo->dataReady); setQueryKilled(pQInfo); return TSDB_CODE_SUCCESS; } @@ -6545,13 +6546,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { + qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo); return NULL; } pthread_mutex_lock(&pQueryMgmt->lock); if (pQueryMgmt->closed) { pthread_mutex_unlock(&pQueryMgmt->lock); - + qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo); return NULL; } else { uint64_t handleVal = (uint64_t) qInfo; diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index df63d567c795c69b031c6ab97112a1b51d0c2e94..072e9939de3c672a80cebe93b2075a347991411a 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -294,7 +294,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v } } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); - uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data); + uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld); } __cache_unlock(pCacheObj); @@ -307,26 +307,30 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen return NULL; } + void *pData = NULL; + __cache_rd_lock(pCacheObj); - + SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); int32_t ref = 0; if (ptNode != NULL) { ref = T_REF_INC(*ptNode); + pData = (*ptNode)->data; } + __cache_unlock(pCacheObj); - - if (ptNode != NULL) { + + if (pData != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref); + uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); } - + atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); - return (ptNode != NULL) ? (*ptNode)->data : NULL; + return pData; } void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) { @@ -453,21 +457,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } else { uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1); + __cache_wr_lock(pCacheObj); + // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. int32_t ref = T_REF_DEC(pNode); - if (inTrashCan) { + if (inTrashCan && (ref == 0)) { // Remove it if the ref count is 0. // The ref count does not need to load and check again after lock acquired, since ref count can not be increased when // the node is in trashcan. - if (ref == 0) { - __cache_wr_lock(pCacheObj); - assert(pNode->pTNodeHeader->pData == pNode); - taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); - __cache_unlock(pCacheObj); - } - + assert(pNode->pTNodeHeader->pData == pNode); + taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); } + + __cache_unlock(pCacheObj); } // else { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 987783711b2452b17c56568fe4c63ed506c51e30..f529b713cf659d923e64ba8bf49798a0dde0f195 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -108,9 +108,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (code == TSDB_CODE_SUCCESS) { handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); if (handle == NULL) { // failed to register qhandle + vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, + tstrerror(pRsp->code)); pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; qDestroyQueryInfo(pQInfo); // destroy it directly - vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void*) pQInfo, tstrerror(pRsp->code)); } else { assert(*handle == pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo);