未验证 提交 a15f7c6f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4166 from taosdata/feature/wal

add log for wal
...@@ -218,6 +218,8 @@ int32_t (*monitorStartSystemFp)() = NULL; ...@@ -218,6 +218,8 @@ int32_t (*monitorStartSystemFp)() = NULL;
void (*monitorStopSystemFp)() = NULL; void (*monitorStopSystemFp)() = NULL;
void (*monitorExecuteSQLFp)(char *sql) = NULL; void (*monitorExecuteSQLFp)(char *sql) = NULL;
char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"};
static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT; static pthread_once_t tsInitGlobalCfgOnce = PTHREAD_ONCE_INIT;
void taosSetAllDebugFlag() { void taosSetAllDebugFlag() {
......
...@@ -207,8 +207,8 @@ static void *dnodeProcessVWriteQueue(void *param) { ...@@ -207,8 +207,8 @@ static void *dnodeProcessVWriteQueue(void *param) {
bool forceFsync = false; bool forceFsync = false;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%d version:%" PRIu64, pWrite->rpcAhandle, pWrite, dTrace("%p, msg:%p:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite->rpcAhandle, pWrite,
taosMsg[pWrite->pHead->msgType], qtype, pWrite->pHead->version); taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet);
if (pWrite->code <= 0) pWrite->processedCount = 1; if (pWrite->code <= 0) pWrite->processedCount = 1;
......
...@@ -467,6 +467,8 @@ typedef enum { ...@@ -467,6 +467,8 @@ typedef enum {
TSDB_CHECK_ITEM_MAX TSDB_CHECK_ITEM_MAX
} ECheckItemType; } ECheckItemType;
extern char *qtypeStr[];
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -1221,8 +1221,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1221,8 +1221,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], sDebug("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
qtype, pWalHead->version); syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
......
...@@ -153,7 +153,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) { ...@@ -153,7 +153,7 @@ static int syncRestoreWal(SSyncPeer *pPeer) {
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret < 0) break; if (ret < 0) break;
sDebug("%s, restore a record, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, restore a record, qtype:wal hver:%" PRIu64, pPeer->id, pHead->version);
(*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL); (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_WAL, NULL);
} }
......
...@@ -256,7 +256,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -256,7 +256,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
} }
(void)lseek(sfd, offset, SEEK_SET); (void)lseek(sfd, offset, SEEK_SET);
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fver:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); int wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
...@@ -325,7 +325,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) { ...@@ -325,7 +325,7 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
// if all data up to fversion is read out, it is over // if all data up to fversion is read out, it is over
if (pPeer->sversion >= fversion && fversion > 0) { if (pPeer->sversion >= fversion && fversion > 0) {
code = 0; code = 0;
sDebug("%s, data up to fversion:%" PRId64 " has been read out, bytes:%d", pPeer->id, fversion, bytes); sDebug("%s, data up to fver:%" PRIu64 " has been read out, bytes:%d", pPeer->id, fversion, bytes);
break; break;
} }
......
...@@ -577,10 +577,12 @@ static int vnodeProcessTsdbStatus(void *arg, int status) { ...@@ -577,10 +577,12 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
if (status == TSDB_STATUS_COMMIT_START) { if (status == TSDB_STATUS_COMMIT_START) {
pVnode->fversion = pVnode->version; pVnode->fversion = pVnode->version;
vDebug("vgId:%d, start commit, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
return walRenew(pVnode->wal); return walRenew(pVnode->wal);
} }
if (status == TSDB_STATUS_COMMIT_OVER) { if (status == TSDB_STATUS_COMMIT_OVER) {
vDebug("vgId:%d, commit over, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion, pVnode->version);
return vnodeSaveVersion(pVnode); return vnodeSaveVersion(pVnode);
} }
...@@ -656,11 +658,12 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) { ...@@ -656,11 +658,12 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle; SVnodeObj *pVnode = ahandle;
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion; pVnode->fversion = fversion;
pVnode->version = fversion; pVnode->version = fversion;
vnodeSaveVersion(pVnode); vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId, pVnode->fversion,
pVnode->version);
return vnodeResetTsdb(pVnode); return vnodeResetTsdb(pVnode);
} }
...@@ -64,7 +64,7 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) { ...@@ -64,7 +64,7 @@ int32_t vnodeReadVersion(SVnodeObj *pVnode) {
pVnode->version = (uint64_t)ver->valueint; pVnode->version = (uint64_t)ver->valueint;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read %s successfully, version:%" PRIu64, pVnode->vgId, file, pVnode->version); vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
PARSE_VER_ERROR: PARSE_VER_ERROR:
if (content != NULL) free(content); if (content != NULL) free(content);
...@@ -98,6 +98,6 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) { ...@@ -98,6 +98,6 @@ int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
free(content); free(content);
terrno = 0; terrno = 0;
vInfo("vgId:%d, successed to write %s, version:%" PRIu64, pVnode->vgId, file, pVnode->fversion); vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -52,20 +52,24 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara ...@@ -52,20 +52,24 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
SRspRet * pRspRet = rparam; SRspRet * pRspRet = rparam;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) { if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]); vError("vgId:%d, msg:%s not processed since no handle, qtype:%s hver:%" PRIu64, pVnode->vgId,
taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version);
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
} }
vTrace("vgId:%d, msg:%s will be processed in vnode, qtype:%s hver:%" PRIu64 " vver:%" PRIu64, pVnode->vgId,
taosMsg[pHead->msgType], qtypeStr[qtype], pHead->version, pVnode->version);
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], vDebug("vgId:%d, msg:%s not processed since vstatus:%d, qtype:%s hver:%" PRIu64, pVnode->vgId,
pVnode->status); taosMsg[pHead->msgType], pVnode->status, qtypeStr[qtype], pHead->version);
return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state return TSDB_CODE_APP_NOT_READY; // it may be in deleting or closing state
} }
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[pHead->msgType], vDebug("vgId:%d, msg:%s not processed since replica:%d role:%s, qtype:%s hver:%" PRIu64, pVnode->vgId,
pVnode->syncCfg.replica, syncRole[pVnode->role]); taosMsg[pHead->msgType], pVnode->syncCfg.replica, syncRole[pVnode->role], qtypeStr[qtype], pHead->version);
return TSDB_CODE_APP_NOT_READY; return TSDB_CODE_APP_NOT_READY;
} }
......
...@@ -99,8 +99,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) { ...@@ -99,8 +99,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno));
} else { } else {
wTrace("vgId:%d, fileId:%" PRId64 " fd:%d, write wal ver:%" PRId64 ", head ver:%" PRIu64 ", len:%d ", pWal->vgId, wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->fileId, pWal->fd, pWal->version, pHead->version, pHead->len); pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version; pWal->version = pHead->version;
} }
...@@ -261,7 +261,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -261,7 +261,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, ver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); code = walSkipCorruptedRecord(pWal, pHead, fd, &offset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -297,8 +297,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -297,8 +297,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, fileId:%" PRId64 ", restore wal ver:%" PRIu64 ", head ver:%" PRIu64 " len:%d", pWal->vgId, fileId, wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d", pWal->vgId,
pWal->version, pHead->version, pHead->len); fileId, pHead->version, pWal->version, pHead->len);
pWal->version = pHead->version; pWal->version = pHead->version;
(*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册