未验证 提交 851193a0 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21060 from taosdata/fix/TS-3222

opti:change push mgr to consume msg for subscribe
......@@ -534,7 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
if (index) {
if (colField[*index].type != kv->type) {
uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key);
return TSDB_CODE_SML_INVALID_DATA;
}
......
......@@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgNumGet; j++) {
......@@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
if (pTopicCur->vgs) {
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[80];
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId,
vgKey, buf);
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
......@@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
// update the local offset value only for the returned values.
pVg->currentOffset = pDataRsp->rspOffset;
if(pDataRsp->rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values.
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
char buf[80];
......@@ -1801,12 +1802,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
" total:%" PRId64 " reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
taosFreeQitem(pollRspWrapper);
} else { // build rsp
int64_t numOfRows = 0;
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
tmq->totalRows += numOfRows;
pVg->emptyBlockReceiveTs = 0;
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
......@@ -1828,7 +1830,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
......@@ -1846,7 +1850,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) {
......
......@@ -5329,9 +5329,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SMsgHead *pHead = buf;
pHead->vgId = pReq->head.vgId;
pHead->contLen = pReq->head.contLen;
// SMsgHead *pHead = buf;
// pHead->vgId = pReq->head.vgId;
// pHead->contLen = pReq->head.contLen;
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
......@@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal)
if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1;
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1;
} else if (pOffsetVal->type < 0) {
// do nothing
} else {
ASSERT(0);
// do nothing
}
return 0;
}
......@@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) {
if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1;
} else if (pOffsetVal->type == TMQ_OFFSET__LOG) {
if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1;
} else if (pOffsetVal->type < 0) {
// do nothing
} else {
ASSERT(0);
// do nothing
}
return 0;
}
......
......@@ -519,7 +519,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -449,7 +449,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 1. check consumer status
int32_t status = atomic_load_32(&pConsumer->status);
#if 1
if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
mInfo("try to recover consumer:0x%" PRIx64, consumerId);
SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg));
......@@ -463,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg);
}
#endif
if (status != MQ_CONSUMER_STATUS__READY) {
mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status));
......
......@@ -100,6 +100,7 @@ typedef struct {
SWalRef* pRef;
STqPushHandle pushHandle; // push
STqExecHandle execHandle; // exec
SRpcMsg* msg;
} STqHandle;
typedef struct {
......@@ -113,7 +114,7 @@ struct STQ {
char* path;
int64_t walLogLastVer;
SRWLatch lock;
SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pPushMgr; // subKey -> STqHandle
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
......@@ -146,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
......
......@@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*);
......@@ -214,6 +213,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
......
......@@ -71,18 +71,11 @@ static void destroyTqHandle(void* data) {
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pTqReader);
}
}
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
tDeleteSMqDataRsp(p->pDataRsp);
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
if(pData->msg != NULL) {
rpcFreeCont(pData->msg->pCont);
taosMemoryFree(pData->msg);
pData->msg = NULL;
}
taosMemoryFree(p->pDataRsp);
taosMemoryFree(p);
}
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
......@@ -105,8 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
......@@ -228,17 +220,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
return 0;
}
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
char buf1[80] = {0};
char buf2[80] = {0};
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
return 0;
}
......@@ -382,13 +376,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
taosWLockLatch(&pTq->lock);
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
}
taosWUnLockLatch(&pTq->lock);
int32_t code = 0;
// taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
// if (code != 0) {
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
// }
// taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) {
......@@ -557,13 +551,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
taosWLockLatch(&pTq->lock);
atomic_store_32(&pHandle->epoch, -1);
atomic_store_32(&pHandle->epoch, 0);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
tqUnregisterPushHandle(pTq, pHandle);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_add_fetch_32(&pHandle->epoch, 1);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
......@@ -1069,6 +1062,34 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
return 0;
}
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
if(taosHashGetSize(pTq->pPushMgr) > 0){
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
while(pIter){
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId);
if(ASSERT(pHandle->msg != NULL)){
tqError("pHandle->msg should not be null");
break;
}else{
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosHashClear(pTq->pPushMgr);
}
// unlock
taosWUnLockLatch(&pTq->lock);
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL;
......
......@@ -206,121 +206,60 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
#endif
typedef struct {
void* pKey;
int64_t keyLen;
} SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
static void freeItem(void* param) {
SItem* p = (SItem*)param;
taosMemoryFree(p->pKey);
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
return;
}
qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data
SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
qStreamSetOpen(pTaskInfo);
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
totalRows);
if (pRsp->blockNum > 0) {
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pTq, pPushEntry);
recordPushedEntry(pCachedKey, pIter);
}
}
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
int32_t vgId = TD_VID(pTq->pVnode);
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
// int32_t vgId = TD_VID(pTq->pVnode);
if (msgType == TDMT_VND_SUBMIT) {
tqProcessSubmitReqForSubscribe(pTq);
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->lock);
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
if (numOfRegisteredPush > 0) {
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
void* data = taosMemoryMalloc(len);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
taosWUnLockLatch(&pTq->lock);
return -1;
}
memcpy(data, pReq, len);
SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) {
break;
}
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
if (pHandle == NULL) {
tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
pPushEntry->subKey);
continue;
}
STqExecHandle* pExec = &pHandle->execHandle;
doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
}
doRemovePushedEntry(cachedKey, pTq);
taosArrayDestroyEx(cachedKey, freeItem);
taosMemoryFree(data);
}
// unlock
taosWUnLockLatch(&pTq->lock);
// taosWLockLatch(&pTq->lock);
//
// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
// if (numOfRegisteredPush > 0) {
// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
//
// void* data = taosMemoryMalloc(len);
// if (data == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
// taosWUnLockLatch(&pTq->lock);
// return -1;
// }
//
// memcpy(data, pReq, len);
//
// SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
// void* pIter = NULL;
//
// while (1) {
// pIter = taosHashIterate(pTq->pPushMgr, pIter);
// if (pIter == NULL) {
// break;
// }
//
// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
// if (pHandle == NULL) {
// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
// pPushEntry->subKey);
// continue;
// }
//
// STqExecHandle* pExec = &pHandle->execHandle;
// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
// }
//
// doRemovePushedEntry(cachedKey, pTq);
// taosArrayDestroyEx(cachedKey, freeItem);
// taosMemoryFree(data);
// }
//
// // unlock
// taosWUnLockLatch(&pTq->lock);
}
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks));
......@@ -362,83 +301,39 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pTqHandle = pHandle;
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
if (pPushEntry == NULL) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId,
(int32_t)sizeof(STqPushEntry));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pPushEntry->info = pRpcMsg->info;
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = (STqHandle*) handle;
if(pHandle->msg == NULL){
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
}else{
void *tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
}
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
pHead->consumerId = consumerId;
pHead->epoch = pRequest->epoch;
pHead->mqMsgType = type;
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d",
consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
return 0;
}
int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
int32_t vgId = TD_VID(pTq->pVnode);
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
if (pEntry != NULL) {
uint64_t cId = (*pEntry)->pDataRsp->head.consumerId;
ASSERT(consumerId == cId);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
STqHandle *pHandle = (STqHandle*)handle;
int32_t vgId = TD_VID(pTq->pVnode);
if (rspConsumer) { // rsp the old consumer with empty block.
tqPushDataRsp(pTq, *pEntry);
}
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle);
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
return 0;
}
void recordPushedEntry(SArray* cachedKey, void* pIter) {
size_t kLen = 0;
void* key = taosHashGetKey(pIter, &kLen);
SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen};
taosArrayPush(cachedKey, &item);
}
void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys);
for (int32_t i = 0; i < numOfKeys; i++) {
SItem* pItem = taosArrayGet(pCachedKeys, i);
if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) {
tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey);
}
}
if (numOfKeys > 0) {
tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr));
}
}
......@@ -169,20 +169,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->lock);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
goto end;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// lock
taosWLockLatch(&pTq->lock);
code = tqRegisterPushEntry(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
......@@ -197,7 +197,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
taosWUnLockLatch(&pTq->lock);
// taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
}
return code;
......@@ -211,6 +211,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
......
......@@ -448,7 +448,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
/*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
......@@ -487,11 +487,16 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
return 0;
}
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
switch (pMsg->msgType) {
case TDMT_SCH_QUERY:
......@@ -499,6 +504,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_SCH_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
......@@ -508,17 +515,12 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) &&
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) {
vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
return 0;
}
switch (pMsg->msgType) {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
......@@ -537,8 +539,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
......
......@@ -262,7 +262,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
SWalCont *pReadHead = &pReader->pHead->head;
int64_t ver = pReadHead->version;
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver,
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
pReadHead->bodyLen);
if (pReader->capacity < pReadHead->bodyLen) {
......
......@@ -232,7 +232,7 @@ void saveConfigToLogFile() {
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
}
taosFprintfFile(g_fp, "\n");
taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
}
char tmpString[128];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册