diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 385c123fec8f87c7c0785916139d2082e273f33c..9c36856f1d10bc8e3429645c743aba34807d0c25 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,25 +54,28 @@ typedef struct SColumnDataAgg { } SColumnDataAgg; typedef struct SDataBlockInfo { - STimeWindow window; - int32_t rows; - int32_t rowSize; - int16_t numOfCols; - int16_t hasVarCol; - union {int64_t uid; int64_t blockId;}; + STimeWindow window; + int32_t rows; + int32_t rowSize; + int16_t numOfCols; + int16_t hasVarCol; + union { + int64_t uid; + int64_t blockId; + }; } SDataBlockInfo; -//typedef struct SConstantItem { -// SColumnInfo info; -// int32_t startRow; // run-length-encoding to save the space for multiple rows -// int32_t endRow; -// SVariant value; -//} SConstantItem; +// typedef struct SConstantItem { +// SColumnInfo info; +// int32_t startRow; // run-length-encoding to save the space for multiple rows +// int32_t endRow; +// SVariant value; +// } SConstantItem; // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { - SColumnDataAgg *pBlockAgg; - SArray *pDataBlock; // SArray + SColumnDataAgg* pBlockAgg; + SArray* pDataBlock; // SArray SDataBlockInfo info; } SSDataBlock; @@ -108,13 +111,13 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; - tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + // tlen += taosEncodeFixedI64(buf, pRsp->consumerId); tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics); if (pRsp->numOfTopics == 0) return tlen; - tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas); + tlen += tEncodeSSchemaWrapper(buf, pRsp->schema); if (pRsp->pBlockData) { sz = taosArrayGetSize(pRsp->pBlockData); } @@ -128,15 +131,15 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { int32_t sz; - buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + // buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics); if (pRsp->numOfTopics == 0) return buf; - pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); - if (pRsp->schemas == NULL) return NULL; - buf = tDecodeSSchemaWrapper(buf, pRsp->schemas); + pRsp->schema = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); + if (pRsp->schema == NULL) return NULL; + buf = tDecodeSSchemaWrapper(buf, pRsp->schema); buf = taosDecodeFixedI32(buf, &sz); pRsp->pBlockData = taosArrayInit(sz, sizeof(SSDataBlock)); for (int32_t i = 0; i < sz; i++) { @@ -148,11 +151,11 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { } static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { - if (pRsp->schemas) { - if (pRsp->schemas->nCols) { - tfree(pRsp->schemas->pSchema); + if (pRsp->schema) { + if (pRsp->schema->nCols) { + tfree(pRsp->schema->pSchema); } - free(pRsp->schemas); + free(pRsp->schema); } taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); pRsp->pBlockData = NULL; @@ -196,7 +199,7 @@ typedef struct SGroupbyExpr { typedef struct SFunctParam { int32_t type; - SColumn *pCol; + SColumn* pCol; SVariant param; } SFunctParam; @@ -214,12 +217,12 @@ typedef struct SResSchame { typedef struct SExprBasicInfo { SResSchema resSchema; int16_t numOfParams; // argument value of each function - SFunctParam *pParam; + SFunctParam* pParam; } SExprBasicInfo; typedef struct SExprInfo { - struct SExprBasicInfo base; - struct tExprNode *pExpr; + struct SExprBasicInfo base; + struct tExprNode* pExpr; } SExprInfo; typedef struct SStateWindow { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 75c2be7932fa69d0fb5d1119d287d2e97fd3e260..22e8e446f0000df7ea5788a17ec40f711809089c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1282,7 +1282,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { if (pRebSub == NULL) { goto _err; } - pRebSub->key = key; + pRebSub->key = strdup(key); pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); if (pRebSub->lostConsumers == NULL) { goto _err; @@ -2116,25 +2116,16 @@ typedef struct { int8_t mqMsgType; int32_t code; int32_t epoch; + int64_t consumerId; } SMqRspHead; -typedef struct { - int64_t consumerId; - SSchemaWrapper* schemas; - int64_t reqOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t numOfTopics; - SArray* pBlockData; // SArray -} SMqPollRsp; - -// one req for one vg+topic typedef struct { SMsgHead head; int64_t consumerId; int64_t blockingTime; int32_t epoch; + int8_t withSchema; char cgroup[TSDB_CGROUP_LEN]; int64_t currentOffset; @@ -2153,19 +2144,21 @@ typedef struct { } SMqSubTopicEp; typedef struct { - int64_t consumerId; - char cgroup[TSDB_CGROUP_LEN]; - SArray* topics; // SArray -} SMqCMGetSubEpRsp; + SMqRspHead head; + // TODO: remove from msg + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t numOfTopics; + SSchemaWrapper* schema; + SArray* pBlockData; // SArray +} SMqPollRsp; typedef struct { SMqRspHead head; - union { - SMqPollRsp consumeRsp; - SMqCMGetSubEpRsp getEpRsp; - }; - void* extra; -} SMqMsgWrapper; + char cgroup[TSDB_CGROUP_LEN]; + SArray* topics; // SArray +} SMqCMGetSubEpRsp; typedef struct { int32_t curBlock; @@ -2173,11 +2166,13 @@ typedef struct { void** uData; } SMqRowIter; -struct tmq_message_t_v1 { - SMqPollRsp rsp; +struct tmq_message_t { + SMqPollRsp msg; + void* vg; SMqRowIter iter; }; +#if 0 struct tmq_message_t { SMqRspHead head; union { @@ -2189,6 +2184,7 @@ struct tmq_message_t { int32_t curRow; void** uData; }; +#endif static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } @@ -2241,8 +2237,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeString(buf, pRsp->cgroup); + // tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { @@ -2253,8 +2248,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu } static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeStringTo(buf, pRsp->cgroup); + // buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -2275,8 +2269,8 @@ enum { }; typedef struct { - void* inputHandle; - void** executor; + void* inputHandle; + void* executor[4]; } SStreamTaskParRunner; typedef struct { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a8fc3947209d85f65e872f320d6044366d2782cd..4cb39ef4f83eb848e85eb0b65c5245a0a6da82af 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -681,7 +681,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; - SMqPollRsp* pRsp = &tmq_message->consumeRsp; + SMqPollRsp* pRsp = &tmq_message->msg; return pRsp->skipLogNum; } @@ -690,15 +690,15 @@ void tmqShowMsg(tmq_message_t* tmq_message) { static bool noPrintSchema; char pBuf[128]; - SMqPollRsp* pRsp = &tmq_message->consumeRsp; - int32_t colNum = pRsp->schemas->nCols; + SMqPollRsp* pRsp = &tmq_message->msg; + int32_t colNum = pRsp->schema->nCols; if (!noPrintSchema) { printf("|"); for (int32_t i = 0; i < colNum; i++) { if (i == 0) - printf(" %25s |", pRsp->schemas->pSchema[i].name); + printf(" %25s |", pRsp->schema->pSchema[i].name); else - printf(" %15s |", pRsp->schemas->pSchema[i].name); + printf(" %15s |", pRsp->schema->pSchema[i].name); } printf("\n"); printf("===============================================\n"); @@ -778,19 +778,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { goto WRITE_QUEUE_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); - pRsp->curBlock = 0; - pRsp->curRow = 0; + tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); + pRsp->iter.curBlock = 0; + pRsp->iter.curRow = 0; // TODO: alloc mem /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ - if (pRsp->consumeRsp.numOfTopics == 0) { + if (pRsp->msg.numOfTopics == 0) { /*printf("no data\n");*/ taosFreeQitem(pRsp); goto WRITE_QUEUE_FAIL; } - pRsp->extra = pParam->pVg; + pRsp->vg = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); atomic_add_fetch_32(&tmq->readyRequest, 1); tsem_post(&tmq->rspSem); @@ -860,14 +860,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } tDeleteSMqCMGetSubEpRsp(&rsp); } else { - tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; goto END; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->getEpRsp); + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp); taosWriteQitem(tmq->mqueue, pRsp); tsem_post(&tmq->rspSem); @@ -983,6 +983,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo return pReq; } +#if 0 tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* msg = NULL; for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -1050,6 +1051,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { } return NULL; } +#endif int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { /*printf("call poll\n");*/ @@ -1111,11 +1113,12 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } // return -int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { +int32_t tmqHandleRes(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { + if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) { /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ - if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { - tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); + if (rspHead->epoch > atomic_load_32(&tmq->epoch)) { + SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead; + tmqUpdateEp(tmq, rspHead->epoch, rspMsg); tmqClearUnhandleMsg(tmq); *pReset = true; } else { @@ -1129,21 +1132,22 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { while (1) { - tmq_message_t* rspMsg = NULL; - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) { + SMqRspHead* rspHead = NULL; + taosGetQitem(tmq->qall, (void**)&rspHead); + if (rspHead == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) return NULL; + taosGetQitem(tmq->qall, (void**)&rspHead); + if (rspHead == NULL) return NULL; } - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + if (rspHead->mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + tmq_message_t* rspMsg = (tmq_message_t*)rspHead; atomic_sub_fetch_32(&tmq->readyRequest, 1); /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ - if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { + if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) { /*printf("epoch match\n");*/ - SMqClientVg* pVg = rspMsg->extra; - pVg->currentOffset = rspMsg->consumeRsp.rspOffset; + SMqClientVg* pVg = rspMsg->vg; + pVg->currentOffset = rspMsg->msg.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return rspMsg; } else { @@ -1153,8 +1157,8 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } else { /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; - tmqHandleRes(tmq, rspMsg, &reset); - taosFreeQitem(rspMsg); + tmqHandleRes(tmq, rspHead, &reset); + taosFreeQitem(rspHead); if (pollIfReset && reset) { printf("reset and repoll\n"); tmqPollImpl(tmq, blockingTime); @@ -1163,6 +1167,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese } } +#if 0 tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); @@ -1185,6 +1190,7 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { return NULL; } } +#endif tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg; @@ -1350,7 +1356,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - SMqPollRsp* pRsp = &tmq_message->consumeRsp; + SMqPollRsp* pRsp = &tmq_message->msg; tDeleteSMqConsumeRsp(pRsp); /*free(tmq_message);*/ taosFreeQitem(tmq_message); @@ -1366,24 +1372,24 @@ const char* tmq_err2str(tmq_resp_err_t err) { } TAOS_ROW tmq_get_row(tmq_message_t* message) { - SMqPollRsp* rsp = &message->consumeRsp; + SMqPollRsp* rsp = &message->msg; while (1) { - if (message->curBlock < taosArrayGetSize(rsp->pBlockData)) { - SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->curBlock); - if (message->curRow < pBlock->info.rows) { + if (message->iter.curBlock < taosArrayGetSize(rsp->pBlockData)) { + SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->iter.curBlock); + if (message->iter.curRow < pBlock->info.rows) { for (int i = 0; i < pBlock->info.numOfCols; i++) { SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i); - if (colDataIsNull_s(pData, message->curRow)) - message->uData[i] = NULL; + if (colDataIsNull_s(pData, message->iter.curRow)) + message->iter.uData[i] = NULL; else { - message->uData[i] = colDataGetData(pData, message->curRow); + message->iter.uData[i] = colDataGetData(pData, message->iter.curRow); } } - message->curRow++; - return message->uData; + message->iter.curRow++; + return message->iter.uData; } else { - message->curBlock++; - message->curRow = 0; + message->iter.curBlock++; + message->iter.curRow = 0; continue; } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a2382d1dd1f94db415ed839fc2f59b6a726bdeee..b4caf5ba971eb0cfdce7c774b5788a000060ad1a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2720,6 +2720,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->pipeEnd) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->parallel) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; tEndEncode(pEncoder); @@ -2732,6 +2734,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->pipeEnd) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->parallel) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index fe01b19d2d25fb396a28af8a5e10eaa1c0e02d66..61aafab8a23bfe7fbbb1fe4f1daaa046323b1c45 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -77,8 +77,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); + // TODO: handle invalid write + /*rpcFreeCont(pMsg->rpcMsg.pCont);*/ + /*taosFreeQitem(pMsg);*/ } static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4fbf0352e5c021433e451aa7e5f564e099b5c07b..a448c16ff3f7f8b017234d9c3efa948bcfc774ef 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -272,7 +272,6 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { /*sdbWrite(pMnode->pSdb, pConsumerRaw);*/ strcpy(rsp.cgroup, pReq->cgroup); - rsp.consumerId = consumerId; if (epoch != pConsumer->epoch) { mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); SArray *pTopics = pConsumer->currentTopics; @@ -322,6 +321,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { } ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; ((SMqRspHead *)buf)->epoch = pConsumer->epoch; + ((SMqRspHead *)buf)->consumerId = pConsumer->consumerId; void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); @@ -344,14 +344,14 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) } static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { - SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key)); + SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); if (pRebSub == NULL) { pRebSub = tNewSMqRebSubscribe(key); if (pRebSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe)); + taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); } return pRebSub; } @@ -441,6 +441,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { if (pIter == NULL) break; SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); + tfree(pRebSub->key); mInfo("mq rebalance subscription: %s", pSub->key); @@ -503,7 +504,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); } - mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, pRebConsumer->status); + mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, + pRebConsumer->status); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); @@ -543,8 +545,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndReleaseTopic(pMnode, pTopic); } else { - mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", pConsumerEp->vgId, - pConsumerEp->oldConsumerId, pConsumerEp->consumerId); + mInfo("mq rebalance: assign vgroup %d, from consumer %" PRId64 " to consumer %" PRId64 "", + pConsumerEp->vgId, pConsumerEp->oldConsumerId, pConsumerEp->consumerId); mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp); } @@ -1099,7 +1101,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName); bool createSub = false; if (pSub == NULL) { - mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, newTopicName); + mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup, + newTopicName); pSub = mndCreateSubscription(pMnode, pTopic, cgroup); createSub = true; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index a29baca3bd778563869b738f3f49bcd0da8054d3..147f44b26017aa63fd1704aee827526b13bb518c 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,7 +57,9 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + for (int i = 0; i < pTask->parallel; i++) { + pTask->runner.executor[i] = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + } return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } @@ -95,6 +97,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = malloc(sizeof(SStreamTask)); if (pTask == NULL) { ASSERT(0); + return; } SCoder decoder; tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b94e7d7c0302e05cf389c80e70a104c75c34d097..8911c1f903c866cf022accc5707b6cd7d8e79ddf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -245,7 +245,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } SMqPollRsp rsp = { - .consumerId = consumerId, + /*.consumerId = consumerId,*/ .numOfTopics = 0, .pBlockData = NULL, }; @@ -298,7 +298,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } taosArrayPush(pRes, pDataBlock); - rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; + rsp.schema = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; rsp.rspOffset = fetchOffset; rsp.numOfTopics = 1; @@ -312,6 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = consumerId; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqPollRsp(&abuf, &rsp); diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 7d0b594e956ef8a4c8c113f5ee2ee603d6941ea3..a64e834ff85210cfe2b2a46fd135230f287025ea 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -14,8 +14,8 @@ */ #define _DEFAULT_SOURCE -#include "vnd.h" #include "sync.h" +#include "vnd.h" // #include "vnodeInt.h" int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } @@ -41,6 +41,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { } int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("sync message is processed"); + /*vInfo("sync message is processed");*/ return 0; }