diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 3b291b22712f8e6fb987ffd5041a2fc67a056a6e..8cac9b3398a0bc4569b52acc7858bbc1e50de542 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -174,7 +174,7 @@ static void syncConnCallback(void *param, TAOS_RES *tres, int code) { TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { STscObj *pObj = NULL; - SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, &pObj); + SSqlObj *pSql = taosConnectImpl(ip, user, pass, auth, db, port, syncConnCallback, NULL, (void **)&pObj); if (pSql != NULL) { pSql->fp = syncConnCallback; pSql->param = pSql; @@ -245,11 +245,11 @@ static void asyncConnCallback(void *param, TAOS_RES *tres, int code) { TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, TAOS **taos) { STscObj *pObj = NULL; - SSqlObj* pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, &pObj); + SSqlObj *pSql = taosConnectImpl(ip, user, pass, NULL, db, port, asyncConnCallback, param, (void **)&pObj); if (pSql == NULL) { return NULL; } - + if (taos) *taos = pObj; pSql->fetchFp = fp; diff --git a/src/inc/taos.h b/src/inc/taos.h index 66b5f930bca4116ae72c1ee60753e4cc22c3b088..315313734753de73bf477b1f67783a45c38c87c9 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -22,13 +22,12 @@ extern "C" { #endif -typedef struct STscObj TAOS; -typedef struct STscStmt TAOS_STMT; -typedef struct SSqlObj TAOS_RES; -typedef struct SSqlStream TAOS_STREAM; -typedef struct SSub TAOS_SUB; -typedef unsigned char** TAOS_ROW; - +typedef void TAOS; +typedef void TAOS_STMT; +typedef void TAOS_RES; +typedef void TAOS_STREAM; +typedef void TAOS_SUB; +typedef void **TAOS_ROW; // Data type definition #define TSDB_DATA_TYPE_NULL 0 // 1 bytes diff --git a/src/inc/vnode.h b/src/inc/vnode.h index ed6c232fdb35efbe7733f88a66f847fd221edf91..fdce4d62794075bd2e7027b125780fbd7a2deaed 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -56,6 +56,7 @@ void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); +int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); void vnodeBuildStatusMsg(void *param); void vnodeConfirmForward(void *param, uint64_t version, int32_t code); @@ -65,6 +66,7 @@ int32_t vnodeInitResources(); void vnodeCleanupResources(); int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); +int32_t vnodeCheckRead(void *pVnode); #ifdef __cplusplus } diff --git a/src/sync/CMakeLists.txt b/src/sync/CMakeLists.txt index efdf2bd185cb5db46d7f0918725ebe8886d5bd98..60271c771ca0a01bd449cb878fe2269759250fd3 100644 --- a/src/sync/CMakeLists.txt +++ b/src/sync/CMakeLists.txt @@ -5,14 +5,14 @@ INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) - LIST(REMOVE_ITEM SRC ./src/tarbitrator.c) + LIST(REMOVE_ITEM SRC src/tarbitrator.c) ADD_LIBRARY(sync ${SRC}) TARGET_LINK_LIBRARIES(sync tutil pthread common) - LIST(APPEND BIN_SRC ./src/tarbitrator.c) - LIST(APPEND BIN_SRC ./src/taosTcpPool.c) + LIST(APPEND BIN_SRC src/tarbitrator.c) + LIST(APPEND BIN_SRC src/taosTcpPool.c) ADD_EXECUTABLE(tarbitrator ${BIN_SRC}) TARGET_LINK_LIBRARIES(tarbitrator sync common osdetail tutil) - ADD_SUBDIRECTORY(test) + #ADD_SUBDIRECTORY(test) ENDIF () diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index bc728e86d917c649f1c6eb04e870c0261814a904..e529f27f55abe99e0a842fb839f2916a0f303bfc 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -465,9 +465,10 @@ void *vnodeAcquireRqueue(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; - if (pVnode->status == TAOS_VN_STATUS_RESET) { - terrno = TSDB_CODE_APP_NOT_READY; - vInfo("vgId:%d, status is in reset", vgId); + int32_t code = vnodeCheckRead(pVnode); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]); vnodeRelease(pVnode); return NULL; } @@ -479,13 +480,14 @@ void *vnodeAcquireWqueue(int32_t vgId) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) return NULL; - if (pVnode->status == TAOS_VN_STATUS_RESET) { - terrno = TSDB_CODE_APP_NOT_READY; - vInfo("vgId:%d, status is in reset", vgId); + int32_t code = vnodeCheckWrite(pVnode); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]); vnodeRelease(pVnode); return NULL; } - + return pVnode->wqueue; } diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 11bf569a405ba0f20024bbd584090e0529ef925b..1da84bf9b41c1bfc011a97cb0c06a71795a204b0 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -39,7 +39,13 @@ void vnodeInitReadFp(void) { vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; } -static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) { +// +// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are +// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the +// request enters the queue +// +int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { + SVnodeObj *pVnode = (SVnodeObj *)param; int msgType = pReadMsg->rpcMsg.msgType; if (vnodeProcessReadMsgFp[msgType] == NULL) { @@ -47,53 +53,35 @@ static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) { return TSDB_CODE_VND_MSG_NOT_PROCESSED; } + return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); +} + +int32_t vnodeCheckRead(void *param) { + SVnodeObj *pVnode = param; if (pVnode->status != TAOS_VN_STATUS_READY) { - vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType], - vnodeStatus[pVnode->status]); + vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } // tsdb may be in reset state if (pVnode->tsdb == NULL) { - vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]); - return TSDB_CODE_APP_NOT_READY; - } - - if (pVnode->status == TAOS_VN_STATUS_CLOSING) { - vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType], - vnodeStatus[pVnode->status]); + vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], - pVnode->syncCfg.replica, syncRole[pVnode->role]); + vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica, + syncRole[pVnode->role], pVnode->refCount, pVnode); return TSDB_CODE_APP_NOT_READY; } - return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); + return TSDB_CODE_SUCCESS; } +static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { + int32_t code = vnodeCheckRead(pVnode); + if (code != TSDB_CODE_SUCCESS) return code; -int32_t vnodeProcessRead(void *param, SReadMsg *pRead) { - SVnodeObj *pVnode = (SVnodeObj *)param; - int32_t code = vnodeProcessReadImp(pVnode, pRead); - - if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) { - // After the fetch request enters the vnode queue - // If the vnode cannot provide services, the following operations are still required - // Or, there will be a deadlock - void **qhandle = (void **)pRead->pCont; - vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]); - - // qKillQuery(*qhandle); - // qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); - return TSDB_CODE_APP_NOT_READY; - } else { - return code; - } -} - -static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->pCont = qhandle; @@ -104,6 +92,8 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *a vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); + + return TSDB_CODE_SUCCESS; } /** @@ -122,8 +112,13 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) { if (continueExec) { *freeHandle = false; - vnodePutItemIntoReadQueue(pVnode, handle, ahandle); - pRet->qhandle = *handle; + code = vnodePutItemIntoReadQueue(pVnode, handle, ahandle); + if (code != TSDB_CODE_SUCCESS) { + *freeHandle = true; + return code; + } else { + pRet->qhandle = *handle; + } } else { *freeHandle = true; vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle); @@ -226,7 +221,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (handle != NULL) { vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); - vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle); + code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle); + if (code != TSDB_CODE_SUCCESS) { + pRsp->code = code; + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); + return pRsp->code; + } } } else { assert(pCont != NULL); diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 492a396d936b0126c04ef093cfb1a820e3565e79..855df81a1b9029d7ddd2f5e5ca6d42e8c680f7ac 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -56,15 +56,6 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return TSDB_CODE_VND_MSG_NOT_PROCESSED; } - if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { - vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]); - return TSDB_CODE_VND_NO_WRITE_AUTH; - } - - // tsdb may be in reset state - if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; - if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY; - if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], @@ -105,6 +96,28 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return syncCode; } +int32_t vnodeCheckWrite(void *param) { + SVnodeObj *pVnode = param; + if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { + vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + return TSDB_CODE_VND_NO_WRITE_AUTH; + } + + // tsdb may be in reset state + if (pVnode->tsdb == NULL) { + vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode); + return TSDB_CODE_APP_NOT_READY; + } + + if (pVnode->status == TAOS_VN_STATUS_CLOSING) { + vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status], + pVnode->refCount, pVnode); + return TSDB_CODE_APP_NOT_READY; + } + + return TSDB_CODE_SUCCESS; +} + void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { SVnodeObj *pVnode = (SVnodeObj *)param; syncConfirmForward(pVnode->sync, version, code);