提交 e571ddf3 编写于 作者: K kailixu

fix: tsma transaction refactor

上级 a696cd9f
...@@ -157,6 +157,7 @@ static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) { ...@@ -157,6 +157,7 @@ static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
return 0; return 0;
} }
#if 0
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) { static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
if (pReq->isTsma) { if (pReq->isTsma) {
SMsgHead *smaMsg = pReq->pTsma; SMsgHead *smaMsg = pReq->pTsma;
...@@ -165,6 +166,7 @@ static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) { ...@@ -165,6 +166,7 @@ static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
} }
return 0; return 0;
} }
#endif
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq req = {0}; SCreateVnodeReq req = {0};
...@@ -245,12 +247,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -245,12 +247,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER; goto _OVER;
} }
#if 0
code = vmTsmaProcessCreate(pImpl, &req); code = vmTsmaProcessCreate(pImpl, &req);
if (code != 0) { if (code != 0) {
dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr()); dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
code = terrno; code = terrno;
goto _OVER; goto _OVER;
} }
#endif
code = vnodeStart(pImpl); code = vnodeStart(pImpl);
if (code != 0) { if (code != 0) {
......
...@@ -457,8 +457,10 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, ...@@ -457,8 +457,10 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
if (pReq == NULL) {
taosMemoryFreeClear(pSmaReq); taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1; return -1;
}
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
...@@ -466,6 +468,18 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, ...@@ -466,6 +468,18 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST; action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFreeClear(pSmaReq);
taosMemoryFree(pReq);
return -1;
}
action.pCont = pSmaReq;
action.contLen = smaContLen;
action.msgType = TDMT_VND_CREATE_SMA;
action.acceptableCode = TSDB_CODE_TSMA_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFreeClear(pSmaReq);
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
} }
......
...@@ -25,14 +25,13 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char * ...@@ -25,14 +25,13 @@ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); static int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// TODO: Who is responsible for resource allocate and release?
int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
} }
// TODO: destroy SSDataBlocks(msg)
return code; return code;
} }
...@@ -42,7 +41,6 @@ int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) { ...@@ -42,7 +41,6 @@ int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) {
if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) { if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) {
smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno)); smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno));
} }
// TODO: destroy SSDataBlocks(msg)
return code; return code;
} }
......
...@@ -1716,6 +1716,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1716,6 +1716,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*resetTableScanInfo(pTSInfo, pWin);*/ /*resetTableScanInfo(pTSInfo, pWin);*/
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
pInfo->pTableScanOp->status = OP_OPENED;
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->currentGroupId = -1;
......
...@@ -6,6 +6,7 @@ sql connect ...@@ -6,6 +6,7 @@ sql connect
print =============== create database print =============== create database
sql create database d1 keep 36500d vgroups 1 sql create database d1 keep 36500d vgroups 1
sql alter local 'querySmaOptimize' '1';
sql use d1 sql use d1
print =============== create super table, include column type for count/sum/min/max/first print =============== create super table, include column type for count/sum/min/max/first
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册