diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6d14cd010c99fe7dbe833d9b74dab7865146775a..6cc19fbdfbbd6457babe0eda48890e17b9d215a4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1596,35 +1596,38 @@ typedef struct SMqColData { int16_t colId; int16_t type; int16_t bytes; - char data[]; -} SMqColData; +} SMqColMeta; typedef struct SMqTbData { int64_t uid; - int32_t numOfCols; int32_t numOfRows; - SMqColData colData[]; + char colData[]; } SMqTbData; typedef struct SMqTopicBlk { - char topicName[TSDB_TOPIC_FNAME_LEN]; - int64_t committedOffset; - int64_t reqOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t bodyLen; - int32_t numOfTb; - SMqTbData tbData[]; + char topicName[TSDB_TOPIC_FNAME_LEN]; + int64_t committedOffset; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t bodyLen; + int32_t numOfTb; + SMqTbData* tbData; } SMqTopicData; typedef struct SMqConsumeRsp { - int64_t reqId; - int64_t consumerId; - int32_t bodyLen; - int32_t numOfTopics; - SMqTopicData data[]; + int64_t consumerId; + int32_t numOfCols; + SMqColMeta* meta; + int32_t numOfTopics; + SMqTopicData* data; } SMqConsumeRsp; +static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { + int32_t tlen = 0; + return tlen; +} + // one req for one vg+topic typedef struct SMqConsumeReq { //0: commit only, current offset diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index da66e7e529e54131a204bbce1dc1b44e61cdb08a..d273ab9e5f59b9b0ed3350876d9f4337ba976780 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -328,6 +328,7 @@ struct tmq_t { char clientId[256]; int64_t consumerId; int64_t status; + tsem_t rspSem; STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; @@ -344,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->commit_cb = conf->commit_cb; + tsem_init(&pTmq->rspSem, 0, 0); pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); return pTmq; @@ -372,6 +374,14 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { } +int32_t tmq_null_cb(void* param, const SDataBuf* pMsg, int32_t code) { + if (code == 0) { + // + } + // + return 0; +} + TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj *pRequest = NULL; int32_t sz = topic_list->cnt; @@ -433,6 +443,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + /*sendInfo->fp*/ SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; @@ -641,6 +652,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { } if(set) tmq->status = 1; // unlock + tsem_post(&tmq->rspSem); return 0; } @@ -679,7 +691,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(&pRequest->body.rspSem); + tsem_wait(&tmq->rspSem); } if (taosArrayGetSize(tmq->clientTopics) == 0) { @@ -695,13 +707,13 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); req.offset = pVg->currentOffset; + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; - pRequest->type = TDMT_VND_CONSUME; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = &tmq_message; - sendInfo->fp = tmq_poll_cb_inner; + /*sendInfo->requestObjRefId = 0;*/ + /*sendInfo->param = &tmq_message;*/ + /*sendInfo->fp = tmq_poll_cb_inner;*/ int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ea87542e8a11e4b9e4d8c13b5766e56c97155790..1136a813d3727ae6bfa3a8299f68b48493294022 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -591,7 +591,7 @@ TEST(testCase, tmq_subscribe_Test) { while (1) { tmq_message_t* msg = tmq_consume_poll(tmq, 0); printf("get msg\n"); - if (msg == NULL) break; + //if (msg == NULL) break; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 314e8d7644a8427a87b256184990446448b2a98b..fb9d6a213cc821c9e95368773a2a1c6903a12013 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -735,23 +735,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { break; } if (pDataBlock != NULL) { - SMqTbData tbData = { - .uid = pDataBlock->info.uid, - .numOfCols = pDataBlock->info.numOfCols, - .numOfRows = pDataBlock->info.rows, - }; - for (int i = 0; i < pDataBlock->info.numOfCols; i++) { - SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, i); - int32_t sz = pColData->info.bytes * pDataBlock->info.rows; - SMqColData colData = { - .bytes = pColData->info.bytes, - .colId = pColData->info.colId, - .type = pColData->info.type, - }; - memcpy(colData.data, pColData->pData, colData.bytes * pDataBlock->info.rows); - memcpy(&tbData.colData[i], &colData, sz); - } - /*pDataBlock->info.*/ taosArrayPush(pRes, pDataBlock); } else { break;