提交 8623f145 编写于 作者: S Shengliang Guan

TD-1652

上级 7f152760
...@@ -210,12 +210,12 @@ void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) { ...@@ -210,12 +210,12 @@ void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code) {
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = (SWriteWorker *)param;
SWriteMsg *pWrite; SWriteMsg * pWrite;
SWalHead *pHead; SWalHead * pHead;
int32_t numOfMsgs; int32_t numOfMsgs;
int type; int type;
void *pVnode, *item; void * pVnode, *item;
SRspRet *pRspRet; SRspRet * pRspRet;
dDebug("write worker:%d is running", pWorker->workerId); dDebug("write worker:%d is running", pWorker->workerId);
...@@ -237,16 +237,21 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -237,16 +237,21 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pWrite->contLen; pHead->len = pWrite->contLen;
dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]); dDebug("%p, rpc msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle,
taosMsg[pWrite->rpcMsg.msgType]);
} else { } else {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType], pHead->version); dTrace("%p, wal msg:%s will be processed in vwrite queue, version:%" PRIu64, pHead, taosMsg[pHead->msgType],
pHead->version);
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
if (pWrite) { dTrace("%p, msg:%s is processed in vwrite queue, version:%" PRIu64 ", result:%s", pHead, taosMsg[pHead->msgType],
pHead->version, tstrerror(code));
if (pWrite) {
pWrite->rpcMsg.code = code; pWrite->rpcMsg.code = code;
if (code <= 0) pWrite->processedCount = 1; if (code <= 0) pWrite->processedCount = 1;
} }
} }
...@@ -258,7 +263,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -258,7 +263,7 @@ static void *dnodeProcessWriteQueue(void *param) {
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code); dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
} else if (type == TAOS_QTYPE_FWD) { } else if (type == TAOS_QTYPE_FWD) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
vnodeConfirmForward(pVnode, pHead->version, 0); vnodeConfirmForward(pVnode, pHead->version, 0);
...@@ -279,13 +284,13 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { ...@@ -279,13 +284,13 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
int32_t num = taosGetQueueNumber(pWorker->qset); int32_t num = taosGetQueueNumber(pWorker->qset);
if (num > 0) { if (num > 0) {
usleep(30000); usleep(30000);
sched_yield(); sched_yield();
} else { } else {
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
pWorker->qset = NULL; pWorker->qset = NULL;
dDebug("write worker:%d is released", pWorker->workerId); dDebug("write worker:%d is released", pWorker->workerId);
pthread_exit(NULL); pthread_exit(NULL);
} }
} }
...@@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores ...@@ -247,6 +247,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 0x080B, "CPU cores
// sync // sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CONFIG, 0, 0x0900, "Invalid Sync Configuration")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync module not enabled")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
...@@ -594,7 +594,7 @@ static int sdbWrite(void *param, void *data, int type) { ...@@ -594,7 +594,7 @@ static int sdbWrite(void *param, void *data, int type) {
pthread_mutex_unlock(&tsSdbObj.mutex); pthread_mutex_unlock(&tsSdbObj.mutex);
sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64, sdbError("table:%s, failed to restore %s record:%s from source(%d), ver:%" PRId64 " too large, sdb ver:%" PRId64,
pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version); pTable->tableName, sdbGetActionStr(action), sdbGetKeyStr(pTable, pHead->cont), type, pHead->version, tsSdbObj.version);
return TSDB_CODE_MND_APP_ERROR; return TSDB_CODE_SYN_INVALID_VERSION;
} else { } else {
tsSdbObj.version = pHead->version; tsSdbObj.version = pHead->version;
} }
......
...@@ -313,7 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { ...@@ -313,7 +313,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sDebug("replica:%d nodeRole:%d qtype:%d", pNode->replica, nodeRole, qtype); sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
...@@ -883,7 +884,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -883,7 +884,7 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, sDebug("%s, status msg is received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack); syncRole[nodeRole], nodeVersion, syncRole[pPeersStatus->role], pPeersStatus->version, pPeersStatus->ack);
pPeer->version = pPeersStatus->version; pPeer->version = pPeersStatus->version;
...@@ -970,7 +971,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { ...@@ -970,7 +971,8 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
int retLen = write(pPeer->peerFd, msg, statusMsgLen); int retLen = write(pPeer->peerFd, msg, statusMsgLen);
if (retLen == statusMsgLen) { if (retLen == statusMsgLen) {
sDebug("%s, status msg is sent", pPeer->id); sDebug("%s, status msg is sent, self:%s ver:%" PRIu64 ", ack:%d", pPeer->id, syncRole[pPeersStatus->role],
pPeersStatus->version, pPeersStatus->ack);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
......
...@@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -49,18 +49,18 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { // if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
...@@ -80,7 +80,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) { ...@@ -80,7 +80,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead); taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
} }
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) { static int32_t vnodeDumpQueryResult(SRspRet *pRet, void *pVnode, void **handle, bool *freeHandle) {
bool continueExec = false; bool continueExec = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -106,55 +106,56 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, ...@@ -106,55 +106,56 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
return code; return code;
} }
static void vnodeBuildNoResultQueryRsp(SRspRet* pRet) { static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp); pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
SRetrieveTableRsp* pRsp = pRet->rsp; SRetrieveTableRsp *pRsp = pRet->rsp;
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont; SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont;
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
// qHandle needs to be freed correctly // qHandle needs to be freed correctly
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
SRetrieveTableMsg* killQueryMsg = (SRetrieveTableMsg*) pReadMsg->pCont; SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
killQueryMsg->free = htons(killQueryMsg->free); killQueryMsg->free = htons(killQueryMsg->free);
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle); killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1); assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle); void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
if (qhandle == NULL || *qhandle == NULL) { if (qhandle == NULL || *qhandle == NULL) {
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle); vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
pReadMsg->rpcMsg.handle);
} else { } else {
assert(*qhandle == (void*) killQueryMsg->qhandle); assert(*qhandle == (void *)killQueryMsg->qhandle);
qKillQuery(*qhandle); qKillQuery(*qhandle);
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
} }
return TSDB_CODE_TSC_QUERY_CANCELLED; return TSDB_CODE_TSC_QUERY_CANCELLED;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void** handle = NULL; void ** handle = NULL;
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
pRsp->qhandle = 0; pRsp->qhandle = 0;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
...@@ -163,7 +164,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -163,7 +164,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken // current connect is broken
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
if (handle == NULL) { // failed to register qhandle, todo add error test case if (handle == NULL) { // failed to register qhandle, todo add error test case
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code)); tstrerror(pRsp->code));
...@@ -171,13 +172,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -171,13 +172,15 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
qDestroyQueryInfo(pQInfo); // destroy it directly qDestroyQueryInfo(pQInfo); // destroy it directly
} else { } else {
assert(*handle == pQInfo); assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) pQInfo); pRsp->qhandle = htobe64((uint64_t)pQInfo);
} }
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (handle != NULL &&
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return pRsp->code; return pRsp->code;
} }
} else { } else {
...@@ -190,12 +193,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -190,12 +193,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} }
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
void** qhandle = (void**) pCont; void **qhandle = (void **)pCont;
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle); vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
bool freehandle = false; bool freehandle = false;
bool buildRes = qTableQuery(*qhandle); // do execute query bool buildRes = qTableQuery(*qhandle); // do execute query
// build query rsp, the retrieve request has reached here already // build query rsp, the retrieve request has reached here already
if (buildRes) { if (buildRes) {
...@@ -233,16 +236,17 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -233,16 +236,17 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRetrieve->free = htons(pRetrieve->free); pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle); pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle); vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
pRetrieve->free, pReadMsg->rpcMsg.handle);
memset(pRet, 0, sizeof(SRspRet)); memset(pRet, 0, sizeof(SRspRet));
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle); void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) { if (handle == NULL || (*handle) != (void *)pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE; code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle); vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void *)pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
return code; return code;
} }
...@@ -250,7 +254,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -250,7 +254,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
vnodeBuildNoResultQueryRsp(pRet); vnodeBuildNoResultQueryRsp(pRet);
code = TSDB_CODE_TSC_QUERY_CANCELLED; code = TSDB_CODE_TSC_QUERY_CANCELLED;
...@@ -259,26 +263,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -259,26 +263,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// register the qhandle to connect to quit query immediate if connection is broken // register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
pReadMsg->rpcMsg.handle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL; code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle); qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
return code; return code;
} }
bool freeHandle = true; bool freeHandle = true;
bool buildRes = false; bool buildRes = false;
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle); code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
//TODO handle malloc failure // TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
pRet->len = sizeof(SRetrieveTableRsp); pRet->len = sizeof(SRetrieveTableRsp);
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true; freeHandle = true;
} else { // result is not ready, return immediately } else { // result is not ready, return immediately
if (!buildRes) { if (!buildRes) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
return TSDB_CODE_QRY_NOT_READY; return TSDB_CODE_QRY_NOT_READY;
} }
...@@ -288,7 +293,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -288,7 +293,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// If qhandle is not added into vread queue, the query should be completed already or paused with error. // If qhandle is not added into vread queue, the query should be completed already or paused with error.
// Here free qhandle immediately // Here free qhandle immediately
if (freeHandle) { if (freeHandle) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
} }
return code; return code;
...@@ -296,13 +301,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -296,13 +301,13 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// notify connection(handle) that current qhandle is created, if current connection from // notify connection(handle) that current qhandle is created, if current connection from
// client is broken, the query needs to be killed immediately. // client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) { int32_t vnodeNotifyCurrentQhandle(void *handle, void *qhandle, int32_t vgId) {
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); SRetrieveTableMsg *killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
killQueryMsg->qhandle = htobe64((uint64_t) qhandle); killQueryMsg->qhandle = htobe64((uint64_t)qhandle);
killQueryMsg->free = htons(1); killQueryMsg->free = htons(1);
killQueryMsg->header.vgId = htonl(vgId); killQueryMsg->header.vgId = htonl(vgId);
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle); vDebug("QInfo:%p register qhandle to connect:%p", qhandle, handle);
return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg)); return rpcReportProgress(handle, (char *)killQueryMsg, sizeof(SRetrieveTableMsg));
} }
...@@ -47,11 +47,11 @@ void vnodeInitWriteFp(void) { ...@@ -47,11 +47,11 @@ void vnodeInitWriteFp(void) {
int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = (SVnodeObj *)param1; SVnodeObj *pVnode = (SVnodeObj *)param1;
SWalHead *pHead = param2; SWalHead * pHead = param2;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} }
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
...@@ -59,28 +59,29 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -59,28 +59,29 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
// tsdb may be in reset state // tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY; if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING) if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
return TSDB_CODE_APP_NOT_READY;
if (pHead->version == 0) { // from client or CQ
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType],
pVnode->status);
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
} }
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType],
pVnode->syncCfg.replica, syncRole[pVnode->role]);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
// assign version // assign version
pVnode->version++; pVnode->version++;
pHead->version = pVnode->version; pHead->version = pVnode->version;
if (pVnode->delay) usleep(pVnode->delay*1000); if (pVnode->delay) usleep(pVnode->delay * 1000);
} else { // from wal or forward } else { // from wal or forward
// for data from WAL or forward, version may be smaller // for data from WAL or forward, version may be smaller
if (pHead->version <= pVnode->version) return 0; if (pHead->version <= pVnode->version) return 0;
} }
...@@ -91,12 +92,12 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -91,12 +92,12 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
code = walWrite(pVnode->wal, pHead); code = walWrite(pVnode->wal, pHead);
if (code < 0) return code; if (code < 0) return code;
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync // forward to peers, even it is WAL/FWD, it shall be called to update version in sync
int32_t syncCode = 0; int32_t syncCode = 0;
syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype); syncCode = syncForwardToPeer(pVnode->sync, pHead, item, qtype);
if (syncCode < 0) return syncCode; if (syncCode < 0) return syncCode;
// write data locally // write data locally
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
...@@ -115,14 +116,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR ...@@ -115,14 +116,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
// save insert result into item // save insert result into item
SShellSubmitRspMsg *pRsp = NULL; SShellSubmitRspMsg *pRsp = NULL;
if (pRet) { if (pRet) {
pRet->len = sizeof(SShellSubmitRspMsg); pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len); pRet->rsp = rpcMallocCont(pRet->len);
pRsp = pRet->rsp; pRsp = pRet->rsp;
} }
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
return code; return code;
} }
...@@ -191,7 +192,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR ...@@ -191,7 +192,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
int vnodeWriteToQueue(void *param, void *data, int type) { int vnodeWriteToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
SWalHead *pHead = data; SWalHead * pHead = data;
int size = sizeof(SWalHead) + pHead->len; int size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
...@@ -204,4 +205,3 @@ int vnodeWriteToQueue(void *param, void *data, int type) { ...@@ -204,4 +205,3 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
return 0; return 0;
} }
...@@ -63,7 +63,7 @@ static void walRelease(SWal *pWal); ...@@ -63,7 +63,7 @@ static void walRelease(SWal *pWal);
static void walModuleInitFunc() { static void walModuleInitFunc() {
walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL"); walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
if (walTmrCtrl == NULL) if (walTmrCtrl == NULL)
walModuleInit = PTHREAD_ONCE_INIT; walModuleInit = PTHREAD_ONCE_INIT;
else else
wDebug("WAL module is initialized"); wDebug("WAL module is initialized");
...@@ -90,7 +90,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -90,7 +90,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
return NULL; return NULL;
} }
atomic_add_fetch_32(&tsWalNum, 1); atomic_add_fetch_32(&tsWalNum, 1);
pWal->fd = -1; pWal->fd = -1;
pWal->max = pCfg->wals; pWal->max = pCfg->wals;
pWal->id = 0; pWal->id = 0;
...@@ -117,18 +117,17 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { ...@@ -117,18 +117,17 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
walRelease(pWal); walRelease(pWal);
pWal = NULL; pWal = NULL;
} }
if (pCfg->keep == 1) return pWal; if (pCfg->keep == 1) return pWal;
if (walHandleExistingFiles(path) == 0) if (walHandleExistingFiles(path) == 0) walRenew(pWal);
walRenew(pWal);
if (pWal && pWal->fd <0) { if (pWal && pWal->fd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to open(%s)", path, strerror(errno)); wError("wal:%s, failed to open(%s)", path, strerror(errno));
walRelease(pWal); walRelease(pWal);
pWal = NULL; pWal = NULL;
} }
if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod); if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
return pWal; return pWal;
...@@ -154,7 +153,7 @@ int walAlter(twalh wal, const SWalCfg *pCfg) { ...@@ -154,7 +153,7 @@ int walAlter(twalh wal, const SWalCfg *pCfg) {
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncPeriod = pCfg->fsyncPeriod;
if (walNeedFsyncTimer(pWal)) { if (walNeedFsyncTimer(pWal)) {
wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer,walTmrCtrl); taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl);
} else { } else {
wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod); wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
taosTmrStop(pWal->timer); taosTmrStop(pWal->timer);
...@@ -167,16 +166,16 @@ int walAlter(twalh wal, const SWalCfg *pCfg) { ...@@ -167,16 +166,16 @@ int walAlter(twalh wal, const SWalCfg *pCfg) {
void walClose(void *handle) { void walClose(void *handle) {
if (handle == NULL) return; if (handle == NULL) return;
SWal *pWal = handle; SWal *pWal = handle;
taosClose(pWal->fd); taosClose(pWal->fd);
if (pWal->timer) taosTmrStopA(&pWal->timer); if (pWal->timer) taosTmrStopA(&pWal->timer);
if (pWal->keep == 0) { if (pWal->keep == 0) {
// remove all files in the directory // remove all files in the directory
for (int i=0; i<pWal->num; ++i) { for (int i = 0; i < pWal->num; ++i) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id-i); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i);
if (remove(pWal->name) <0) { if (remove(pWal->name) < 0) {
wError("wal:%s, failed to remove", pWal->name); wError("wal:%s, failed to remove", pWal->name);
} else { } else {
wDebug("wal:%s, it is removed", pWal->name); wDebug("wal:%s, it is removed", pWal->name);
...@@ -197,7 +196,7 @@ int walRenew(void *handle) { ...@@ -197,7 +196,7 @@ int walRenew(void *handle) {
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >=0) { if (pWal->fd >= 0) {
close(pWal->fd); close(pWal->fd);
pWal->id++; pWal->id++;
wDebug("wal:%s, it is closed", pWal->name); wDebug("wal:%s, it is closed", pWal->name);
...@@ -218,7 +217,7 @@ int walRenew(void *handle) { ...@@ -218,7 +217,7 @@ int walRenew(void *handle) {
// remove the oldest wal file // remove the oldest wal file
char name[TSDB_FILENAME_LEN * 3]; char name[TSDB_FILENAME_LEN * 3];
snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max); snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
if (remove(name) <0) { if (remove(name) < 0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno)); wError("wal:%s, failed to remove(%s)", name, strerror(errno));
} else { } else {
wDebug("wal:%s, it is removed", name); wDebug("wal:%s, it is removed", name);
...@@ -226,8 +225,8 @@ int walRenew(void *handle) { ...@@ -226,8 +225,8 @@ int walRenew(void *handle) {
pWal->num--; pWal->num--;
} }
} }
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
return terrno; return terrno;
...@@ -239,15 +238,18 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -239,15 +238,18 @@ int walWrite(void *handle, SWalHead *pHead) {
terrno = 0; terrno = 0;
// no wal // no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->version) return 0; if (pHead->version <= pWal->version) {
wError("wal:%s, failed to write ver:%" PRIu64 ", last ver:%" PRIu64, pWal->name, pHead->version, pWal->version);
return 0;
}
pHead->signature = walSignature; pHead->signature = walSignature;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead)); taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
int contLen = pHead->len + sizeof(SWalHead); int contLen = pHead->len + sizeof(SWalHead);
if(taosTWrite(pWal->fd, pHead, contLen) != contLen) { if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno)); wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} else { } else {
...@@ -258,7 +260,6 @@ int walWrite(void *handle, SWalHead *pHead) { ...@@ -258,7 +260,6 @@ int walWrite(void *handle, SWalHead *pHead) {
} }
void walFsync(void *handle) { void walFsync(void *handle) {
SWal *pWal = handle; SWal *pWal = handle;
if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return; if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
...@@ -276,12 +277,11 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -276,12 +277,11 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
uint32_t maxId = 0, minId = -1, index =0; uint32_t maxId = 0, minId = -1, index =0;
terrno = 0; terrno = 0;
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char opath[TSDB_FILENAME_LEN+5]; char opath[TSDB_FILENAME_LEN + 5];
int slen = snprintf(opath, sizeof(opath), "%s", pWal->path); int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
if ( pWal->keep == 0) if (pWal->keep == 0) strcpy(opath + slen, "/old");
strcpy(opath+slen, "/old");
DIR *dir = opendir(opath); DIR *dir = opendir(opath);
if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL && errno == ENOENT) return 0;
...@@ -290,8 +290,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -290,8 +290,8 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
return terrno; return terrno;
} }
while ((ent = readdir(dir))!= NULL) { while ((ent = readdir(dir)) != NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) { if (strncmp(ent->d_name, walPrefix, plen) == 0) {
index = atol(ent->d_name + plen); index = atol(ent->d_name + plen);
if (index > maxId) maxId = index; if (index > maxId) maxId = index;
if (index < minId) minId = index; if (index < minId) minId = index;
...@@ -306,13 +306,13 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -306,13 +306,13 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
return terrno; return terrno;
} }
if ( count != (maxId-minId+1) ) { if (count != (maxId - minId + 1)) {
wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId); wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
terrno = TSDB_CODE_WAL_APP_ERROR; terrno = TSDB_CODE_WAL_APP_ERROR;
} else { } else {
wDebug("wal:%s, %d files will be restored", opath, count); wDebug("wal:%s, %d files will be restored", opath, count);
for (index = minId; index<=maxId; ++index) { for (index = minId; index <= maxId; ++index) {
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index); snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
terrno = walRestoreWalFile(pWal, pVnode, writeFp); terrno = walRestoreWalFile(pWal, pVnode, writeFp);
if (terrno < 0) break; if (terrno < 0) break;
...@@ -328,7 +328,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -328,7 +328,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }
} }
} else { } else {
// open the existing WAL file in append mode // open the existing WAL file in append mode
pWal->num = count; pWal->num = count;
pWal->id = maxId; pWal->id = maxId;
...@@ -345,9 +345,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) ...@@ -345,9 +345,9 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
} }
int walGetWalFile(void *handle, char *name, uint32_t *index) { int walGetWalFile(void *handle, char *name, uint32_t *index) {
SWal *pWal = handle; SWal * pWal = handle;
int code = 1; int code = 1;
int32_t first = 0; int32_t first = 0;
name[0] = 0; name[0] = 0;
if (pWal == NULL || pWal->num == 0) return 0; if (pWal == NULL || pWal->num == 0) return 0;
...@@ -359,18 +359,17 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { ...@@ -359,18 +359,17 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
if (*index < first && *index > pWal->id) { if (*index < first && *index > pWal->id) {
code = -1; // index out of range code = -1; // index out of range
} else { } else {
sprintf(name, "wal/%s%d", walPrefix, *index); sprintf(name, "wal/%s%d", walPrefix, *index);
code = (*index == pWal->id) ? 0:1; code = (*index == pWal->id) ? 0 : 1;
} }
pthread_mutex_unlock(&(pWal->mutex)); pthread_mutex_unlock(&(pWal->mutex));
return code; return code;
} }
static void walRelease(SWal *pWal) { static void walRelease(SWal *pWal) {
pthread_mutex_destroy(&pWal->mutex); pthread_mutex_destroy(&pWal->mutex);
pWal->signature = NULL; pWal->signature = NULL;
free(pWal); free(pWal);
...@@ -385,12 +384,12 @@ static void walRelease(SWal *pWal) { ...@@ -385,12 +384,12 @@ static void walRelease(SWal *pWal) {
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
char *name = pWal->name; char *name = pWal->name;
int size = 1024 * 1024; // default 1M buffer size int size = 1024 * 1024; // default 1M buffer size
terrno = 0; terrno = 0;
char *buffer = malloc(size); char *buffer = malloc(size);
if (buffer == NULL) { if (buffer == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
} }
...@@ -487,21 +486,21 @@ int walHandleExistingFiles(const char *path) { ...@@ -487,21 +486,21 @@ int walHandleExistingFiles(const char *path) {
} else { } else {
// move all files to old directory // move all files to old directory
int count = 0; int count = 0;
while ((ent = readdir(dir))!= NULL) { while ((ent = readdir(dir)) != NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) { if (strncmp(ent->d_name, walPrefix, plen) == 0) {
snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name); snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name); snprintf(nname, sizeof(nname), "%s/old/%s", path, ent->d_name);
if (taosMkDir(opath, 0755) != 0) { if (taosMkDir(opath, 0755) != 0) {
wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno)); wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
if (rename(oname, nname) < 0) { if (rename(oname, nname) < 0) {
wError("wal:%s, failed to move to new:%s", oname, nname); wError("wal:%s, failed to move to new:%s", oname, nname);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
break; break;
} }
count++; count++;
} }
...@@ -509,34 +508,34 @@ int walHandleExistingFiles(const char *path) { ...@@ -509,34 +508,34 @@ int walHandleExistingFiles(const char *path) {
wDebug("wal:%s, %d files are moved for restoration", path, count); wDebug("wal:%s, %d files are moved for restoration", path, count);
} }
closedir(dir); closedir(dir);
return terrno; return terrno;
} }
static int walRemoveWalFiles(const char *path) { static int walRemoveWalFiles(const char *path) {
int plen = strlen(walPrefix); int plen = strlen(walPrefix);
char name[TSDB_FILENAME_LEN * 3]; char name[TSDB_FILENAME_LEN * 3];
terrno = 0; terrno = 0;
struct dirent *ent; struct dirent *ent;
DIR *dir = opendir(path); DIR *dir = opendir(path);
if (dir == NULL && errno == ENOENT) return 0; if (dir == NULL && errno == ENOENT) return 0;
if (dir == NULL) { if (dir == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
} }
while ((ent = readdir(dir))!= NULL) { while ((ent = readdir(dir)) != NULL) {
if ( strncmp(ent->d_name, walPrefix, plen) == 0) { if (strncmp(ent->d_name, walPrefix, plen) == 0) {
snprintf(name, sizeof(name), "%s/%s", path, ent->d_name); snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
if (remove(name) <0) { if (remove(name) < 0) {
wError("wal:%s, failed to remove(%s)", name, strerror(errno)); wError("wal:%s, failed to remove(%s)", name, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
} }
} }
} }
closedir(dir); closedir(dir);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册