未验证 提交 7a0ef5b5 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #15538 from taosdata/fix/hzcheng_3.0

fix: replica 3 coredump
......@@ -225,9 +225,9 @@ typedef struct SRequestObj {
SArray* targetTableList;
SQueryExecMetric metric;
SRequestSendRecvBody body;
bool syncQuery; // todo refactor: async query object
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool syncQuery; // todo refactor: async query object
bool stableQuery; // todo refactor
bool validateOnly; // todo refactor
bool killed;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
......
......@@ -591,7 +591,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
return code;
}
void freeVgList(void *list) {
void freeVgList(void* list) {
SArray* pList = *(SArray**)list;
taosArrayDestroy(pList);
}
......@@ -1278,8 +1278,8 @@ int32_t doProcessMsgFromServer(void* param) {
char tbuf[40] = {0};
TRACE_TO_STR(trace, tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code),
tbuf);
tscDebug("processMsgFromServer handle %p, message: %s, code: %s, gtid: %s", pMsg->info.handle,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), tbuf);
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
......@@ -2114,7 +2114,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
return NULL;
}
TAOS_RES* pRes = execQuery(connId, sql, sqlLen, validateOnly);
TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly);
return pRes;
#endif
}
......@@ -374,11 +374,12 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
p = taosArrayInsert(pMemTable->aTbData, idx, &pTbData);
taosWUnLockLatch(&pMemTable->latch);
tsdbDebug("vgId:%d add table data %p at idx:%d", TD_VID(pMemTable->pTsdb->pVnode), pTbData, idx);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
_exit:
*ppTbData = pTbData;
return code;
......
......@@ -694,8 +694,8 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue;
_write_block:
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW,
pWriter->cmprAlg);
code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW,
&pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
......@@ -756,7 +756,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) {
ASSERT(pWriter->pDataFReader);
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock);
SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx);
int32_t c = tTABLEIDCmprFn(pBlockIdx, &id);
ASSERT(c >= 0);
......@@ -833,7 +833,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
}
_exit:
tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
tsdbInfo("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode));
return code;
_err:
......
......@@ -288,7 +288,7 @@ int vnodeCommit(SVnode *pVnode) {
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
vInfo("vgId:%d, commit over", TD_VID(pVnode));
vInfo("vgId:%d, commit end", TD_VID(pVnode));
return 0;
}
......
......@@ -180,6 +180,7 @@ struct SVSnapWriter {
SVnode *pVnode;
int64_t sver;
int64_t ever;
int64_t commitID;
int64_t index;
// meta
SMetaSnapWriter *pMetaSnapWriter;
......@@ -201,7 +202,16 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->sver = sver;
pWriter->ever = ever;
vInfo("vgId:%d vnode snapshot writer opened", TD_VID(pVnode));
// commit it
code = vnodeCommit(pVnode);
if (code) goto _err;
// inc commit ID
pVnode->state.commitID++;
pWriter->commitID = pVnode->state.commitID;
vInfo("vgId:%d vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
sver, ever, pWriter->commitID);
*ppWriter = pWriter;
return code;
......@@ -244,6 +254,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
code = vnodeCommitInfo(dir, &info);
if (code) goto _err;
vnodeBegin(pVnode);
} else {
ASSERT(0);
}
......
......@@ -513,7 +513,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msgSendInfo->paramFreeFp = taosMemoryFree;
msgSendInfo->paramFreeFp = taosMemoryFree;
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
......@@ -541,7 +541,7 @@ _return:
}
taosMemoryFree(msg);
SCH_RET(code);
}
......@@ -681,7 +681,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
param->pTrans = pJob->conn.pTrans;
pMsgSendInfo->param = param;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = fp;
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
......@@ -801,7 +801,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
pDst->param = NULL;
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
pDst->paramFreeFp = taosMemoryFree;
pDst->paramFreeFp = taosMemoryFree;
*dst = pDst;
......@@ -1094,14 +1094,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
break;
}
#if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
#else
if (TDMT_VND_SUBMIT != msgType) {
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
} else {
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
}
#endif
return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册