提交 01c94a77 编写于 作者: L Liu Jicong

feat(tmq): push optimization

上级 1ad4d163
......@@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else if(TD_RES_TMQ_METADATA(msg)) {
} else if (TD_RES_TMQ_METADATA(msg)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
topic = pRspObj->topic;
vgId = pRspObj->vgId;
......@@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
int32_t epoch = tmq->epoch;
SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq));
if (pReq == NULL) goto OVER;
pReq->consumerId = consumerId;
pReq->consumerId = htobe64(consumerId);
pReq->epoch = epoch;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
......@@ -1661,9 +1661,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void* pRsp = NULL;
if(pollRspWrapper->taosxRsp.createTableNum == 0){
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
}else{
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
}
taosFreeQitem(pollRspWrapper);
......@@ -1850,12 +1850,12 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
}
return NULL;
}
......
......@@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
mError("consumer %ld not exist", consumerId);
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1;
}
......
......@@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
qDebugL("ast %s", topicObj.ast);
SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
taosMemoryFree(topicObj.ast);
......
......@@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) {
}
}
static void tqPushEntryFree(void* data) {
void* p = *(void**)data;
taosMemoryFree(p);
}
STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) {
......@@ -80,7 +85,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosInitRWLatch(&pTq->pushLock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
......@@ -548,6 +553,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pPushEntry != NULL) {
pPushEntry->pHandle = pHandle;
pPushEntry->pInfo = pMsg->info;
dataRsp.withTbName = 0;
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
pPushEntry->rspHead.consumerId = consumerId;
pPushEntry->rspHead.epoch = reqEpoch;
......
......@@ -245,7 +245,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, NULL, &ts) < 0) {
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
......@@ -256,21 +256,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
}
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
// remove from hash
size_t kLen;
void* key = taosHashGetKey(pPushEntry, &kLen);
void* key = taosHashGetKey(pIter, &kLen);
void* keyCopy = taosMemoryMalloc(kLen);
memcpy(keyCopy, key, kLen);
taosArrayPush(cachedKeys, &keyCopy);
taosArrayPush(cachedKeyLens, &kLen);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
ASSERT(0);
}
tqPushDataRsp(pTq, pPushEntry);
}
}
......@@ -278,7 +276,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i);
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
taosHashRemove(pTq->pPushMgr, key, kLen);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
ASSERT(0);
}
}
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
......
......@@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) {
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) {
int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows);
blockDataKeepFirstNRows(pBlock, keepRows);
//TODO: optimize it later when partition by + limit
// TODO: optimize it later when partition by + limit
if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) ||
(pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) {
doSetOperatorCompleted(pOperator);
......@@ -206,6 +206,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
blockDataCleanup(pFinalRes);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pTaskInfo->streamInfo.pReq) {
pOperator->status = OP_OPENED;
}
if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pOperator->status = OP_OPENED;
......@@ -254,7 +258,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
setInfoForNewGroup(pBlock, pLimitInfo, pOperator);
if (pOperator->status == OP_EXEC_DONE) {
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) {
break;
}
......
......@@ -1443,6 +1443,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0);
}
}
......@@ -1468,6 +1469,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
return NULL;
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册