提交 3aca8851 编写于 作者: L Liu Jicong

fix invalid wirte

上级 7f50ad02
...@@ -1596,35 +1596,38 @@ typedef struct SMqColData { ...@@ -1596,35 +1596,38 @@ typedef struct SMqColData {
int16_t colId; int16_t colId;
int16_t type; int16_t type;
int16_t bytes; int16_t bytes;
char data[]; } SMqColMeta;
} SMqColData;
typedef struct SMqTbData { typedef struct SMqTbData {
int64_t uid; int64_t uid;
int32_t numOfCols;
int32_t numOfRows; int32_t numOfRows;
SMqColData colData[]; char colData[];
} SMqTbData; } SMqTbData;
typedef struct SMqTopicBlk { typedef struct SMqTopicBlk {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int64_t committedOffset; int64_t committedOffset;
int64_t reqOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
int32_t bodyLen; int32_t bodyLen;
int32_t numOfTb; int32_t numOfTb;
SMqTbData tbData[]; SMqTbData* tbData;
} SMqTopicData; } SMqTopicData;
typedef struct SMqConsumeRsp { typedef struct SMqConsumeRsp {
int64_t reqId; int64_t consumerId;
int64_t consumerId; int32_t numOfCols;
int32_t bodyLen; SMqColMeta* meta;
int32_t numOfTopics; int32_t numOfTopics;
SMqTopicData data[]; SMqTopicData* data;
} SMqConsumeRsp; } SMqConsumeRsp;
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
int32_t tlen = 0;
return tlen;
}
// one req for one vg+topic // one req for one vg+topic
typedef struct SMqConsumeReq { typedef struct SMqConsumeReq {
//0: commit only, current offset //0: commit only, current offset
......
...@@ -328,6 +328,7 @@ struct tmq_t { ...@@ -328,6 +328,7 @@ struct tmq_t {
char clientId[256]; char clientId[256];
int64_t consumerId; int64_t consumerId;
int64_t status; int64_t status;
tsem_t rspSem;
STscObj* pTscObj; STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
int32_t nextTopicIdx; int32_t nextTopicIdx;
...@@ -344,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err ...@@ -344,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq; return pTmq;
...@@ -372,6 +374,14 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { ...@@ -372,6 +374,14 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
} }
int32_t tmq_null_cb(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == 0) {
//
}
//
return 0;
}
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
...@@ -433,6 +443,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { ...@@ -433,6 +443,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->fp*/
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
...@@ -641,6 +652,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -641,6 +652,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
if(set) tmq->status = 1; if(set) tmq->status = 1;
// unlock // unlock
tsem_post(&tmq->rspSem);
return 0; return 0;
} }
...@@ -679,7 +691,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -679,7 +691,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&tmq->rspSem);
} }
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0) {
...@@ -695,13 +707,13 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -695,13 +707,13 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
req.offset = pVg->currentOffset; req.offset = pVg->currentOffset;
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
pRequest->type = TDMT_VND_CONSUME;
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; /*sendInfo->requestObjRefId = 0;*/
sendInfo->param = &tmq_message; /*sendInfo->param = &tmq_message;*/
sendInfo->fp = tmq_poll_cb_inner; /*sendInfo->fp = tmq_poll_cb_inner;*/
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
......
...@@ -591,7 +591,7 @@ TEST(testCase, tmq_subscribe_Test) { ...@@ -591,7 +591,7 @@ TEST(testCase, tmq_subscribe_Test) {
while (1) { while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 0); tmq_message_t* msg = tmq_consume_poll(tmq, 0);
printf("get msg\n"); printf("get msg\n");
if (msg == NULL) break; //if (msg == NULL) break;
} }
} }
......
...@@ -735,23 +735,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { ...@@ -735,23 +735,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
break; break;
} }
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
SMqTbData tbData = {
.uid = pDataBlock->info.uid,
.numOfCols = pDataBlock->info.numOfCols,
.numOfRows = pDataBlock->info.rows,
};
for (int i = 0; i < pDataBlock->info.numOfCols; i++) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t sz = pColData->info.bytes * pDataBlock->info.rows;
SMqColData colData = {
.bytes = pColData->info.bytes,
.colId = pColData->info.colId,
.type = pColData->info.type,
};
memcpy(colData.data, pColData->pData, colData.bytes * pDataBlock->info.rows);
memcpy(&tbData.colData[i], &colData, sz);
}
/*pDataBlock->info.*/
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
} else { } else {
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册