提交 9ebdee32 编写于 作者: L Liu Jicong

fix txn

上级 e916f83f
...@@ -78,6 +78,7 @@ struct tmq_t { ...@@ -78,6 +78,7 @@ struct tmq_t {
STscObj* pTscObj; STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
int32_t nextTopicIdx; int32_t nextTopicIdx;
int8_t epStatus;
int32_t waitingRequest; int32_t waitingRequest;
int32_t readyRequest; int32_t readyRequest;
SArray* clientTopics; // SArray<SMqClientTopic> SArray* clientTopics; // SArray<SMqClientTopic>
...@@ -311,6 +312,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs ...@@ -311,6 +312,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->epoch = 0; pTmq->epoch = 0;
pTmq->waitingRequest = 0; pTmq->waitingRequest = 0;
pTmq->readyRequest = 0; pTmq->readyRequest = 0;
pTmq->epStatus = 0;
// set conf // set conf
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
...@@ -833,7 +835,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -833,7 +835,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
tscWarn("msg discard, code:%x", code); tscWarn("msg discard, code:%x", code);
goto WRITE_QUEUE_FAIL; goto CREATE_MSG_FAIL;
} }
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
...@@ -873,7 +875,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -873,7 +875,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));
if (pRsp == NULL) { if (pRsp == NULL) {
goto WRITE_QUEUE_FAIL; goto CREATE_MSG_FAIL;
} }
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg);
...@@ -886,7 +888,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -886,7 +888,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (pRsp->msg.numOfTopics == 0) { if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
taosFreeQitem(pRsp); taosFreeQitem(pRsp);
goto WRITE_QUEUE_FAIL; goto CREATE_MSG_FAIL;
} }
#endif #endif
...@@ -899,7 +901,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -899,7 +901,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tsem_post(&tmq->rspSem); tsem_post(&tmq->rspSem);
return 0; return 0;
WRITE_QUEUE_FAIL: CREATE_MSG_FAIL:
if (pParam->epoch == tmq->epoch) { if (pParam->epoch == tmq->epoch) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
} }
...@@ -940,7 +942,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -940,7 +942,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for (int32_t k = 0; k < vgNumCur; k++) { for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey); tscDebug("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
} }
break; break;
...@@ -954,12 +956,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -954,12 +956,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset; int64_t offset = pVgEp->offset;
printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset); tscDebug("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);
if (pOffset != NULL) { if (pOffset != NULL) {
offset = *pOffset; offset = *pOffset;
printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey); tscDebug("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);
} }
printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset); tscDebug("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffset = offset,
...@@ -1020,6 +1022,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1020,6 +1022,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
END: END:
atomic_store_8(&tmq->epStatus, 0);
if (pParam->sync) { if (pParam->sync) {
tsem_post(&pParam->rspSem); tsem_post(&pParam->rspSem);
} }
...@@ -1027,6 +1030,10 @@ END: ...@@ -1027,6 +1030,10 @@ END:
} }
int32_t tmqAskEp(tmq_t* tmq, bool sync) { int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
return 0;
}
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
if (req == NULL) { if (req == NULL) {
...@@ -1207,7 +1214,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1207,7 +1214,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
/*printf("skip vg %d\n", pVg->vgId);*/ tscDebug("skip vg %d", pVg->vgId);
continue; continue;
} }
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
...@@ -1251,7 +1258,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { ...@@ -1251,7 +1258,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1); atomic_add_fetch_32(&tmq->waitingRequest, 1);
/*tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);*/ tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
...@@ -1315,7 +1322,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese ...@@ -1315,7 +1322,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
tmqHandleNoPollRsp(tmq, rspHead, &reset); tmqHandleNoPollRsp(tmq, rspHead, &reset);
taosFreeQitem(rspHead); taosFreeQitem(rspHead);
if (pollIfReset && reset) { if (pollIfReset && reset) {
printf("reset and repoll\n"); tscDebug("reset and repoll\n");
tmqPollImpl(tmq, blockingTime); tmqPollImpl(tmq, blockingTime);
} }
} }
......
...@@ -160,6 +160,7 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { ...@@ -160,6 +160,7 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId); mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
pOldConsumer->epoch++;
// TODO handle update // TODO handle update
/*taosWLockLatch(&pOldConsumer->lock);*/ /*taosWLockLatch(&pOldConsumer->lock);*/
......
...@@ -446,26 +446,21 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -446,26 +446,21 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
SMqConsumerObj* pNewRebConsumer = taosMemoryMalloc(sizeof(SMqConsumerObj)); /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
ASSERT(pNewRebConsumer); /*pRebConsumer->epoch++;*/
memcpy(pNewRebConsumer, pRebConsumer, sizeof(SMqConsumerObj)); /*}*/
pNewRebConsumer->currentTopics = taosArrayDup(pRebConsumer->currentTopics);
pNewRebConsumer->recentRemovedTopics = taosArrayDup(pRebConsumer->recentRemovedTopics);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
pNewRebConsumer->epoch++;
}
if (vgThisConsumerAfterRb != 0) { if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else { } else {
atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
} }
mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pNewRebConsumer->consumerId, status, mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status,
pNewRebConsumer->status); pRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pNewRebConsumer); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw); mndTransAppendCommitlog(pTrans, pConsumerRaw);
} }
mndReleaseConsumer(pMnode, pRebConsumer); mndReleaseConsumer(pMnode, pRebConsumer);
} }
...@@ -512,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { ...@@ -512,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
// TODO: log rebalance statistics // TODO: log rebalance statistics
SSdbRaw *pSubRaw = mndSubActionEncode(pSub); SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING);
mndTransAppendRedolog(pTrans, pSubRaw); mndTransAppendRedolog(pTrans, pSubRaw);
} }
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
......
...@@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset = pReq->currentOffset + 1; fetchOffset = pReq->currentOffset + 1;
} }
printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); /*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/
SMqPollRsp rsp = { SMqPollRsp rsp = {
/*.consumerId = consumerId,*/ /*.consumerId = consumerId,*/
...@@ -299,8 +299,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -299,8 +299,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// response to user // response to user
break; break;
} }
printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
pReq->epoch);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/ /*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
...@@ -353,7 +352,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -353,7 +352,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch); /*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosMemoryFree(pHead); taosMemoryFree(pHead);
return 0; return 0;
...@@ -384,7 +383,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -384,7 +383,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch); /*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/
/*}*/ /*}*/
return 0; return 0;
...@@ -451,7 +450,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -451,7 +450,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task); ASSERT(pTopic->buffer.output[i].task);
} }
printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); /*printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);*/
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId); tqHandleCommit(pTq->tqMeta, req.consumerId);
......
...@@ -2304,8 +2304,6 @@ int32_t catalogInit(SCatalogCfg *cfg) { ...@@ -2304,8 +2304,6 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
CTG_ERR_RET(ctgStartUpdateThread());
tsem_init(&gCtgMgmt.queue.reqSem, 0, 0); tsem_init(&gCtgMgmt.queue.reqSem, 0, 0);
tsem_init(&gCtgMgmt.queue.rspSem, 0, 0); tsem_init(&gCtgMgmt.queue.rspSem, 0, 0);
...@@ -2316,6 +2314,8 @@ int32_t catalogInit(SCatalogCfg *cfg) { ...@@ -2316,6 +2314,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
} }
gCtgMgmt.queue.tail = gCtgMgmt.queue.head; gCtgMgmt.queue.tail = gCtgMgmt.queue.head;
CTG_ERR_RET(ctgStartUpdateThread());
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -35,7 +35,7 @@ sql connect ...@@ -35,7 +35,7 @@ sql connect
$dbNamme = d0 $dbNamme = d0
print =============== create database , vgroup 1 print =============== create database , vgroup 1
sql create database $dbNamme vgroups 10 sql create database $dbNamme vgroups 1
sql show databases sql show databases
print $data00 $data01 $data02 print $data00 $data01 $data02
if $rows != 2 then if $rows != 2 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册