From 8623f145139af5e5a7ea233b397fbcfb3f2c8dec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 14 Oct 2020 22:20:26 +0800 Subject: [PATCH] TD-1652 --- src/dnode/src/dnodeVWrite.c | 37 +++++++------ src/inc/taoserror.h | 1 + src/mnode/src/mnodeSdb.c | 2 +- src/sync/src/syncMain.c | 8 +-- src/vnode/src/vnodeRead.c | 93 +++++++++++++++++--------------- src/vnode/src/vnodeWrite.c | 34 ++++++------ src/wal/src/walMain.c | 105 ++++++++++++++++++------------------ 7 files changed, 146 insertions(+), 134 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 3f2c9df222..311bebc788 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -210,12 +210,12 @@ void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; - SWriteMsg *pWrite; - SWalHead *pHead; + SWriteMsg * pWrite; + SWalHead * pHead; int32_t numOfMsgs; int type; - void *pVnode, *item; - SRspRet *pRspRet; + void * pVnode, *item; + SRspRet * pRspRet; dDebug("write worker:%d is running", pWorker->workerId); @@ -237,16 +237,21 @@ static void *dnodeProcessWriteQueue(void *param) { pHead->msgType = pWrite->rpcMsg.msgType; pHead->version = 0; pHead->len = pWrite->contLen; - dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); + dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, + taosMsg[pWrite->rpcMsg.msgType]); } else { pHead = (SWalHead *)item; - dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version); + dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], + pHead->version); } int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); - if (pWrite) { + dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType], + pHead->version, tstrerror(code)); + + if (pWrite) { pWrite->rpcMsg.code = code; - if (code <= 0) pWrite->processedCount = 1; + if (code <= 0) pWrite->processedCount = 1; } } @@ -258,7 +263,7 @@ static void *dnodeProcessWriteQueue(void *param) { taosGetQitem(pWorker->qall, &type, &item); if (type == TAOS_QTYPE_RPC) { pWrite = (SWriteMsg *)item; - dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); + dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); } else if (type == TAOS_QTYPE_FWD) { pHead = (SWalHead *)item; vnodeConfirmForward(pVnode, pHead->version, 0); @@ -279,13 +284,13 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { int32_t num = taosGetQueueNumber(pWorker->qset); if (num > 0) { - usleep(30000); - sched_yield(); + usleep(30000); + sched_yield(); } else { - taosFreeQall(pWorker->qall); - taosCloseQset(pWorker->qset); - pWorker->qset = NULL; - dDebug("write worker:%d is released", pWorker->workerId); - pthread_exit(NULL); + taosFreeQall(pWorker->qall); + taosCloseQset(pWorker->qset); + pWorker->qset = NULL; + dDebug("write worker:%d is released", pWorker->workerId); + pthread_exit(NULL); } } diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 786342b5a6..2600bd2c55 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 895aa0400a..14558485aa 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -594,7 +594,7 @@ static int sdbWrite(void *param, void *data, int type) { pthread_mutex_unlock(&tsSdbObj.mutex); sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64, pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); - return TSDB_CODE_MND_APP_ERROR; + return TSDB_CODE_SYN_INVALID_VERSION; } else { tsSdbObj.version = pHead->version; } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 4d5e19af77..f5f1ff5853 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -313,7 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { // always update version nodeVersion = pWalHead->version; - sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype); + sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], + qtype, pWalHead->version); if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; @@ -883,7 +884,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { SSyncNode * pNode = pPeer->pSyncNode; SPeersStatus *pPeersStatus = (SPeersStatus *)cont; - sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, + sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); pPeer->version = pPeersStatus->version; @@ -970,7 +971,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { int retLen = write(pPeer->peerFd, msg, statusMsgLen); if (retLen == statusMsgLen) { - sDebug("%s, status msg is sent", pPeer->id); + sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role], + pPeersStatus->version, pPeersStatus->ack); } else { sDebug("%s, failed to send status msg, restart", pPeer->id); syncRestartConnection(pPeer); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index a3ed080f24..baaeae2a81 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { if (pVnode->status != TAOS_VN_STATUS_READY) { vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); - return TSDB_CODE_APP_NOT_READY; + return TSDB_CODE_APP_NOT_READY; } - // tsdb may be in reset state + // tsdb may be in reset state if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; - if (pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_APP_NOT_READY; + if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY; // TODO: Later, let slave to support query // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); + if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { + vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType], + pVnode->syncCfg.replica, syncRole[pVnode->role]); return TSDB_CODE_APP_NOT_READY; } @@ -80,7 +80,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); } -static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) { +static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) { bool continueExec = false; int32_t code = TSDB_CODE_SUCCESS; @@ -106,55 +106,56 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, return code; } -static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) { +static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp* pRsp = pRet->rsp; + SRetrieveTableRsp *pRsp = pRet->rsp; pRsp->completed = true; } static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { - void *pCont = pReadMsg->pCont; + void * pCont = pReadMsg->pCont; int32_t contLen = pReadMsg->contLen; SRspRet *pRet = &pReadMsg->rspRet; - SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; + SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont; memset(pRet, 0, sizeof(SRspRet)); // qHandle needs to be freed correctly if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; + SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont; killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); - vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); + vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); - void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle); + void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle); if (qhandle == NULL || *qhandle == NULL) { - vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); + vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle, + pReadMsg->rpcMsg.handle); } else { - assert(*qhandle == (void*) killQueryMsg->qhandle); + assert(*qhandle == (void *)killQueryMsg->qhandle); qKillQuery(*qhandle); - qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true); } return TSDB_CODE_TSC_QUERY_CANCELLED; } int32_t code = TSDB_CODE_SUCCESS; - void** handle = NULL; + void ** handle = NULL; if (contLen != 0) { qinfo_t pQInfo = NULL; code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); - SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; + SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; pRsp->qhandle = 0; pRet->len = sizeof(SQueryTableRsp); @@ -163,7 +164,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { - handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); + handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); if (handle == NULL) { // failed to register qhandle, todo add error test case vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); @@ -171,13 +172,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { qDestroyQueryInfo(pQInfo); // destroy it directly } else { assert(*handle == pQInfo); - pRsp->qhandle = htobe64((uint64_t) pQInfo); + pRsp->qhandle = htobe64((uint64_t)pQInfo); } - if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); + if (handle != NULL && + vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { + vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, + pReadMsg->rpcMsg.handle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); return pRsp->code; } } else { @@ -190,12 +193,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } } else { assert(pCont != NULL); - void** qhandle = (void**) pCont; + void **qhandle = (void **)pCont; vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); bool freehandle = false; - bool buildRes = qTableQuery(*qhandle); // do execute query + bool buildRes = qTableQuery(*qhandle); // do execute query // build query rsp, the retrieve request has reached here already if (buildRes) { @@ -233,16 +236,17 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { pRetrieve->free = htons(pRetrieve->free); pRetrieve->qhandle = htobe64(pRetrieve->qhandle); - vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle); + vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, + pRetrieve->free, pReadMsg->rpcMsg.handle); memset(pRet, 0, sizeof(SRspRet)); int32_t code = TSDB_CODE_SUCCESS; - void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); - if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { + void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); + if (handle == NULL || (*handle) != (void *)pRetrieve->qhandle) { code = TSDB_CODE_QRY_INVALID_QHANDLE; - vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); - + vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void *)pRetrieve->qhandle); + vnodeBuildNoResultQueryRsp(pRet); return code; } @@ -250,7 +254,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { if (pRetrieve->free == 1) { vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); vnodeBuildNoResultQueryRsp(pRet); code = TSDB_CODE_TSC_QUERY_CANCELLED; @@ -259,26 +263,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); + vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, + pReadMsg->rpcMsg.handle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); return code; } bool freeHandle = true; - bool buildRes = false; + bool buildRes = false; code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle); if (code != TSDB_CODE_SUCCESS) { - //TODO handle malloc failure + // TODO handle malloc failure pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->len = sizeof(SRetrieveTableRsp); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); freeHandle = true; - } else { // result is not ready, return immediately + } else { // result is not ready, return immediately if (!buildRes) { - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; } @@ -288,7 +293,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // If qhandle is not added into vread queue, the query should be completed already or paused with error. // Here free qhandle immediately if (freeHandle) { - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); } return code; @@ -296,13 +301,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // notify connection(handle) that current qhandle is created, if current connection from // client is broken, the query needs to be killed immediately. -int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { - SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - killQueryMsg->qhandle = htobe64((uint64_t) qhandle); +int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) { + SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); + killQueryMsg->qhandle = htobe64((uint64_t)qhandle); killQueryMsg->free = htons(1); killQueryMsg->header.vgId = htonl(vgId); killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); - return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); + return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg)); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 0c310439bb..b1f4539a6c 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -47,11 +47,11 @@ void vnodeInitWriteFp(void) { int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { int32_t code = 0; SVnodeObj *pVnode = (SVnodeObj *)param1; - SWalHead *pHead = param2; + SWalHead * pHead = param2; if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); - return TSDB_CODE_VND_MSG_NOT_PROCESSED; + return TSDB_CODE_VND_MSG_NOT_PROCESSED; } if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { @@ -59,28 +59,29 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { return TSDB_CODE_VND_NO_WRITE_AUTH; } - // tsdb may be in reset state + // tsdb may be in reset state if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; - if (pVnode->status == TAOS_VN_STATUS_CLOSING) - return TSDB_CODE_APP_NOT_READY; - - if (pHead->version == 0) { // from client or CQ + if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY; + + if (pHead->version == 0) { // from client or CQ if (pVnode->status != TAOS_VN_STATUS_READY) { - vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); + vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], + pVnode->status); return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state } if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { - vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); + vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType], + pVnode->syncCfg.replica, syncRole[pVnode->role]); return TSDB_CODE_APP_NOT_READY; } // assign version pVnode->version++; pHead->version = pVnode->version; - if (pVnode->delay) usleep(pVnode->delay*1000); + if (pVnode->delay) usleep(pVnode->delay * 1000); - } else { // from wal or forward + } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; } @@ -91,12 +92,12 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { code = walWrite(pVnode->wal, pHead); if (code < 0) return code; - // forward to peers, even it is WAL/FWD, it shall be called to update version in sync + // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); if (syncCode < 0) return syncCode; - // write data locally + // write data locally code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); if (code < 0) return code; @@ -115,14 +116,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR // save insert result into item SShellSubmitRspMsg *pRsp = NULL; - if (pRet) { + if (pRet) { pRet->len = sizeof(SShellSubmitRspMsg); pRet->rsp = rpcMallocCont(pRet->len); pRsp = pRet->rsp; } if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; - + return code; } @@ -191,7 +192,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; - SWalHead *pHead = data; + SWalHead * pHead = data; int size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); @@ -204,4 +205,3 @@ int vnodeWriteToQueue(void *param, void *data, int type) { return 0; } - diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 5d7f8ee20b..7e0f987213 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -63,7 +63,7 @@ static void walRelease(SWal *pWal); static void walModuleInitFunc() { walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL"); - if (walTmrCtrl == NULL) + if (walTmrCtrl == NULL) walModuleInit = PTHREAD_ONCE_INIT; else wDebug("WAL module is initialized"); @@ -90,7 +90,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { return NULL; } - atomic_add_fetch_32(&tsWalNum, 1); + atomic_add_fetch_32(&tsWalNum, 1); pWal->fd = -1; pWal->max = pCfg->wals; pWal->id = 0; @@ -117,18 +117,17 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { walRelease(pWal); pWal = NULL; } - + if (pCfg->keep == 1) return pWal; - if (walHandleExistingFiles(path) == 0) - walRenew(pWal); + if (walHandleExistingFiles(path) == 0) walRenew(pWal); - if (pWal && pWal->fd <0) { + if (pWal && pWal->fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to open(%s)", path, strerror(errno)); walRelease(pWal); pWal = NULL; - } + } if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod); return pWal; @@ -154,7 +153,7 @@ int walAlter(twalh wal, const SWalCfg *pCfg) { pWal->fsyncPeriod = pCfg->fsyncPeriod; if (walNeedFsyncTimer(pWal)) { wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); - taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer,walTmrCtrl); + taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl); } else { wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); taosTmrStop(pWal->timer); @@ -167,16 +166,16 @@ int walAlter(twalh wal, const SWalCfg *pCfg) { void walClose(void *handle) { if (handle == NULL) return; - - SWal *pWal = handle; + + SWal *pWal = handle; taosClose(pWal->fd); if (pWal->timer) taosTmrStopA(&pWal->timer); if (pWal->keep == 0) { // remove all files in the directory - for (int i=0; inum; ++i) { - snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id-i); - if (remove(pWal->name) <0) { + for (int i = 0; i < pWal->num; ++i) { + snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i); + if (remove(pWal->name) < 0) { wError("wal:%s, failed to remove", pWal->name); } else { wDebug("wal:%s, it is removed", pWal->name); @@ -197,7 +196,7 @@ int walRenew(void *handle) { pthread_mutex_lock(&pWal->mutex); - if (pWal->fd >=0) { + if (pWal->fd >= 0) { close(pWal->fd); pWal->id++; wDebug("wal:%s, it is closed", pWal->name); @@ -218,7 +217,7 @@ int walRenew(void *handle) { // remove the oldest wal file char name[TSDB_FILENAME_LEN * 3]; snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); - if (remove(name) <0) { + if (remove(name) < 0) { wError("wal:%s, failed to remove(%s)", name, strerror(errno)); } else { wDebug("wal:%s, it is removed", name); @@ -226,8 +225,8 @@ int walRenew(void *handle) { pWal->num--; } - } - + } + pthread_mutex_unlock(&pWal->mutex); return terrno; @@ -239,15 +238,18 @@ int walWrite(void *handle, SWalHead *pHead) { terrno = 0; - // no wal + // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (pHead->version <= pWal->version) return 0; + if (pHead->version <= pWal->version) { + wError("wal:%s, failed to write ver:%" PRIu64 ", last ver:%" PRIu64, pWal->name, pHead->version, pWal->version); + return 0; + } pHead->signature = walSignature; taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); int contLen = pHead->len + sizeof(SWalHead); - if(taosTWrite(pWal->fd, pHead, contLen) != contLen) { + if (taosTWrite(pWal->fd, pHead, contLen) != contLen) { wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } else { @@ -258,7 +260,6 @@ int walWrite(void *handle, SWalHead *pHead) { } void walFsync(void *handle) { - SWal *pWal = handle; if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; @@ -276,12 +277,11 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) uint32_t maxId = 0, minId = -1, index =0; terrno = 0; - int plen = strlen(walPrefix); - char opath[TSDB_FILENAME_LEN+5]; - + int plen = strlen(walPrefix); + char opath[TSDB_FILENAME_LEN + 5]; + int slen = snprintf(opath, sizeof(opath), "%s", pWal->path); - if ( pWal->keep == 0) - strcpy(opath+slen, "/old"); + if (pWal->keep == 0) strcpy(opath + slen, "/old"); DIR *dir = opendir(opath); if (dir == NULL && errno == ENOENT) return 0; @@ -290,8 +290,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) return terrno; } - while ((ent = readdir(dir))!= NULL) { - if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + while ((ent = readdir(dir)) != NULL) { + if (strncmp(ent->d_name, walPrefix, plen) == 0) { index = atol(ent->d_name + plen); if (index > maxId) maxId = index; if (index < minId) minId = index; @@ -306,13 +306,13 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) return terrno; } - if ( count != (maxId-minId+1) ) { + if (count != (maxId - minId + 1)) { wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); terrno = TSDB_CODE_WAL_APP_ERROR; } else { wDebug("wal:%s, %d files will be restored", opath, count); - for (index = minId; index<=maxId; ++index) { + for (index = minId; index <= maxId; ++index) { snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); terrno = walRestoreWalFile(pWal, pVnode, writeFp); if (terrno < 0) break; @@ -328,7 +328,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) terrno = TAOS_SYSTEM_ERROR(errno); } } - } else { + } else { // open the existing WAL file in append mode pWal->num = count; pWal->id = maxId; @@ -345,9 +345,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) } int walGetWalFile(void *handle, char *name, uint32_t *index) { - SWal *pWal = handle; + SWal * pWal = handle; int code = 1; - int32_t first = 0; + int32_t first = 0; name[0] = 0; if (pWal == NULL || pWal->num == 0) return 0; @@ -359,18 +359,17 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { if (*index < first && *index > pWal->id) { code = -1; // index out of range - } else { + } else { sprintf(name, "wal/%s%d", walPrefix, *index); - code = (*index == pWal->id) ? 0:1; + code = (*index == pWal->id) ? 0 : 1; } pthread_mutex_unlock(&(pWal->mutex)); return code; -} +} static void walRelease(SWal *pWal) { - pthread_mutex_destroy(&pWal->mutex); pWal->signature = NULL; free(pWal); @@ -385,12 +384,12 @@ static void walRelease(SWal *pWal) { static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { char *name = pWal->name; - int size = 1024 * 1024; // default 1M buffer size + int size = 1024 * 1024; // default 1M buffer size terrno = 0; char *buffer = malloc(size); if (buffer == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } @@ -487,21 +486,21 @@ int walHandleExistingFiles(const char *path) { } else { // move all files to old directory int count = 0; - while ((ent = readdir(dir))!= NULL) { - if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + while ((ent = readdir(dir)) != NULL) { + if (strncmp(ent->d_name, walPrefix, plen) == 0) { snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name); snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name); if (taosMkDir(opath, 0755) != 0) { wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - break; + break; } if (rename(oname, nname) < 0) { wError("wal:%s, failed to move to new:%s", oname, nname); terrno = TAOS_SYSTEM_ERROR(errno); break; - } + } count++; } @@ -509,34 +508,34 @@ int walHandleExistingFiles(const char *path) { wDebug("wal:%s, %d files are moved for restoration", path, count); } - + closedir(dir); return terrno; } static int walRemoveWalFiles(const char *path) { - int plen = strlen(walPrefix); - char name[TSDB_FILENAME_LEN * 3]; - + int plen = strlen(walPrefix); + char name[TSDB_FILENAME_LEN * 3]; + terrno = 0; struct dirent *ent; - DIR *dir = opendir(path); + DIR *dir = opendir(path); if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; - } + } - while ((ent = readdir(dir))!= NULL) { - if ( strncmp(ent->d_name, walPrefix, plen) == 0) { + while ((ent = readdir(dir)) != NULL) { + if (strncmp(ent->d_name, walPrefix, plen) == 0) { snprintf(name, sizeof(name), "%s/%s", path, ent->d_name); - if (remove(name) <0) { + if (remove(name) < 0) { wError("wal:%s, failed to remove(%s)", name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); } } - } + } closedir(dir); -- GitLab