未验证 提交 fb3e5bae 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2658 from taosdata/feature/vnode

Feature/vnode
...@@ -198,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -198,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
if (pRes->qhandle == 0) { if (pRes->qhandle == 0) {
tscError("qhandle is NULL"); tscError("qhandle is NULL");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
...@@ -205,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi ...@@ -205,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
return; return;
} }
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param; pSql->param = param;
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
......
...@@ -115,7 +115,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po ...@@ -115,7 +115,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs(); uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime); SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return NULL; return NULL;
} }
......
...@@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) { ...@@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) {
assert(refCount >= 0); assert(refCount >= 0);
HttpContext **ppContext = pContext->ppContext; HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount); httpDebug("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) { if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false); taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);
......
...@@ -47,6 +47,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO ...@@ -47,6 +47,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
} }
} }
if (tscResultsetFetchCompleted(result)) {
isContinue = false;
}
if (isContinue) { if (isContinue) {
// retrieve next batch of rows // retrieve next batch of rows
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s", httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s",
...@@ -75,7 +79,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { ...@@ -75,7 +79,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
HttpContext *pContext = (HttpContext *)param; HttpContext *pContext = (HttpContext *)param;
if (pContext == NULL) return; if (pContext == NULL) return;
HttpSqlCmds * multiCmds = pContext->multiCmds; code = taos_errno(result);
HttpSqlCmds *multiCmds = pContext->multiCmds;
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos; HttpSqlCmd *singleCmd = multiCmds->cmds + multiCmds->pos;
...@@ -109,8 +114,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) { ...@@ -109,8 +114,8 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
return; return;
} }
int num_fields = taos_field_count(result); bool isUpdate = tscIsUpdateQuery(result);
if (num_fields == 0) { if (isUpdate) {
// not select or show commands // not select or show commands
int affectRows = taos_affected_rows(result); int affectRows = taos_affected_rows(result);
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s", httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s",
...@@ -221,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num ...@@ -221,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
if (numOfRows < 0) { if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr, httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr,
pContext->user, tstrerror(numOfRows)); pContext->user, tstrerror(numOfRows));
} }
taos_free_result(result); taos_free_result(result);
if (encode->stopJsonFp) { if (encode->stopJsonFp) {
(encode->stopJsonFp)(pContext, &pContext->singleCmd); (encode->stopJsonFp)(pContext, &pContext->singleCmd);
...@@ -238,6 +243,11 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode) ...@@ -238,6 +243,11 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int unUsedCode)
if (pContext == NULL) return; if (pContext == NULL) return;
int32_t code = taos_errno(result); int32_t code = taos_errno(result);
if (code != unUsedCode) {
httpError("context:%p, fd:%d, ip:%s, user:%s, resultset code:%s input code:%s not matched, sqlObj:%p", pContext,
pContext->fd, pContext->ipstr, pContext->user, tstrerror(code), tstrerror(unUsedCode), (SSqlObj *)result);
}
HttpEncodeMethod *encode = pContext->encodeMethod; HttpEncodeMethod *encode = pContext->encodeMethod;
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......
...@@ -6333,6 +6333,7 @@ int32_t qKillQuery(qinfo_t qinfo) { ...@@ -6333,6 +6333,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
sem_post(&pQInfo->dataReady);
setQueryKilled(pQInfo); setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -6545,13 +6546,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { ...@@ -6545,13 +6546,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
SQueryMgmt *pQueryMgmt = pMgmt; SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) { if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
return NULL; return NULL;
} }
pthread_mutex_lock(&pQueryMgmt->lock); pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) { if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock); pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
return NULL; return NULL;
} else { } else {
uint64_t handleVal = (uint64_t) qInfo; uint64_t handleVal = (uint64_t) qInfo;
......
...@@ -294,7 +294,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -294,7 +294,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
} }
} else { // old data exists, update the node } else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); pNode = taosUpdateCacheImpl(pCacheObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
uDebug("cache:%s, key:%p, %p exist in cache, updated", pCacheObj->name, key, pNode->data); uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode->data, pOld);
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
...@@ -307,26 +307,30 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen ...@@ -307,26 +307,30 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
return NULL; return NULL;
} }
void *pData = NULL;
__cache_rd_lock(pCacheObj); __cache_rd_lock(pCacheObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
int32_t ref = 0; int32_t ref = 0;
if (ptNode != NULL) { if (ptNode != NULL) {
ref = T_REF_INC(*ptNode); ref = T_REF_INC(*ptNode);
pData = (*ptNode)->data;
} }
__cache_unlock(pCacheObj); __cache_unlock(pCacheObj);
if (ptNode != NULL) { if (pData != NULL) {
atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1);
uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, (*ptNode)->data, ref); uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref);
} else { } else {
atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1);
uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key);
} }
atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1); atomic_add_fetch_32(&pCacheObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL; return pData;
} }
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) { void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime) {
...@@ -453,21 +457,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -453,21 +457,20 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
} else { } else {
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1); uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
__cache_wr_lock(pCacheObj);
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately. // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
if (inTrashCan) { if (inTrashCan && (ref == 0)) {
// Remove it if the ref count is 0. // Remove it if the ref count is 0.
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when // The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
// the node is in trashcan. // the node is in trashcan.
if (ref == 0) { assert(pNode->pTNodeHeader->pData == pNode);
__cache_wr_lock(pCacheObj); taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
assert(pNode->pTNodeHeader->pData == pNode);
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
__cache_unlock(pCacheObj);
}
} }
__cache_unlock(pCacheObj);
} }
// else { // else {
......
...@@ -108,9 +108,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -108,9 +108,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
if (handle == NULL) { // failed to register qhandle if (handle == NULL) { // failed to register qhandle
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code));
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qDestroyQueryInfo(pQInfo); // destroy it directly qDestroyQueryInfo(pQInfo); // destroy it directly
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void*) pQInfo, tstrerror(pRsp->code));
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) pQInfo); pRsp->qhandle = htobe64((uint64_t) pQInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册