From 660a600dd6bc52a6afd8f626f3967b1cd6d1203e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 28 Jan 2022 13:15:41 +0800 Subject: [PATCH] refactor --- source/client/src/clientImpl.c | 230 +++++++++++++++++++-------------- 1 file changed, 130 insertions(+), 100 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3b71805e14..7e95d55ec7 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -21,6 +21,93 @@ } \ } while (0) +typedef struct SMqClientVg { + // statistics + int64_t pollCnt; + // offset + int64_t committedOffset; + int64_t currentOffset; + //connection info + int32_t vgId; + SEpSet epSet; +} SMqClientVg; + +typedef struct SMqClientTopic { + // subscribe info + int32_t sqlLen; + char* sql; + char* topicName; + int64_t topicId; + int32_t nextVgIdx; + SArray* vgs; //SArray +} SMqClientTopic; + +struct tmq_resp_err_t { + int32_t code; +}; + +struct tmq_topic_vgroup_t { + char* topic; + int32_t vgId; + int64_t commitOffset; +}; + +struct tmq_topic_vgroup_list_t { + int32_t cnt; + int32_t size; + tmq_topic_vgroup_t* elems; +}; + +typedef struct SMqConsumeCbParam { + tmq_t* tmq; + SMqClientVg* pVg; + tmq_message_t** retMsg; +} SMqConsumeCbParam; + +struct tmq_conf_t { + char clientId[256]; + char groupId[256]; + char* ip; + uint16_t port; + tmq_commit_cb* commit_cb; +}; + +struct tmq_message_t { + SMqConsumeRsp rsp; +}; + + +tmq_conf_t* tmq_conf_new() { + tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); + return conf; +} + +int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { + if (strcmp(key, "group.id") == 0) { + strcpy(conf->groupId, value); + } + if (strcmp(key, "client.id") == 0) { + strcpy(conf->clientId, value); + } + return 0; +} + +struct tmq_t { + char groupId[256]; + char clientId[256]; + SRWLatch lock; + int64_t consumerId; + int64_t epoch; + int64_t status; + tsem_t rspSem; + STscObj* pTscObj; + tmq_commit_cb* commit_cb; + int32_t nextTopicIdx; + SArray* clientTopics; //SArray + //stat + int64_t pollCnt; +}; + static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); @@ -259,81 +346,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) } -typedef struct SMqClientVg { - // statistics - int64_t pollCnt; - // offset - int64_t committedOffset; - int64_t currentOffset; - //connection info - int32_t vgId; - SEpSet epSet; -} SMqClientVg; - -typedef struct SMqClientTopic { - // subscribe info - int32_t sqlLen; - char* sql; - char* topicName; - int64_t topicId; - int32_t nextVgIdx; - SArray* vgs; //SArray -} SMqClientTopic; - -struct tmq_resp_err_t { - int32_t code; -}; - -struct tmq_topic_vgroup_t { - char* topic; - int32_t vgId; - int64_t commitOffset; -}; - -struct tmq_topic_vgroup_list_t { - int32_t cnt; - int32_t size; - tmq_topic_vgroup_t* elems; -}; - -struct tmq_conf_t { - char clientId[256]; - char groupId[256]; - char* ip; - uint16_t port; - tmq_commit_cb* commit_cb; -}; - -tmq_conf_t* tmq_conf_new() { - tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); - return conf; -} - -int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { - if (strcmp(key, "group.id") == 0) { - strcpy(conf->groupId, value); - } - if (strcmp(key, "client.id") == 0) { - strcpy(conf->clientId, value); - } - return 0; -} - -struct tmq_t { - char groupId[256]; - char clientId[256]; - SRWLatch lock; - int64_t consumerId; - int64_t epoch; - int64_t status; - tsem_t rspSem; - STscObj* pTscObj; - tmq_commit_cb* commit_cb; - int32_t nextTopicIdx; - SArray* clientTopics; //SArray - //stat - int64_t pollCnt; -}; tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); @@ -580,7 +592,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int tlen = tSerializeSCMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); - if(buf == NULL) { + if (buf == NULL) { goto _return; } @@ -614,10 +626,6 @@ _return: /*typedef SMqConsumeRsp tmq_message_t;*/ -struct tmq_message_t { - SMqConsumeRsp rsp; -}; - int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { if (code == -1) { printf("discard\n"); @@ -766,47 +774,69 @@ END: return 0; } +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, SMqClientVg** ppVg) { + SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); + if (pReq == NULL) { + return NULL; + } + pReq->reqType = type; + pReq->blockingTime = blocking_time; + pReq->consumerId = tmq->consumerId; + strcpy(pReq->cgroup, tmq->groupId); + + tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); + strcpy(pReq->topic, pTopic->topicName); + pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); + pReq->offset = pVg->currentOffset+1; + + pReq->head.vgId = htonl(pVg->vgId); + pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); + return pReq; +} + tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + tmq_message_t* tmq_message = NULL; + int64_t status = atomic_load_64(&tmq->status); tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); - if (blocking_time < 0) blocking_time = 500; + /*if (blocking_time < 0) blocking_time = 500;*/ + blocking_time = 1000; if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); usleep(blocking_time * 1000); return NULL; } - - SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); - pReq->reqType = 1; - pReq->blockingTime = blocking_time; - pReq->consumerId = tmq->consumerId; - tmq_message_t* tmq_message = NULL; - strcpy(pReq->cgroup, tmq->groupId); - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); - tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - strcpy(pReq->topic, pTopic->topicName); - int32_t vgSz = taosArrayGetSize(pTopic->vgs); - if (vgSz == 0) { - free(pReq); + if (taosArrayGetSize(pTopic->vgs) == 0) { usleep(blocking_time * 1000); return NULL; } - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % vgSz); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); - pReq->offset = pVg->currentOffset+1; - pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); + SMqClientVg* pVg = NULL; + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, &pVg); + if (pReq == NULL) { + usleep(blocking_time * 1000); + return NULL; + } + + SMqConsumeCbParam* param = malloc(sizeof(SMqConsumeCbParam)); + if (param == NULL) { + usleep(blocking_time * 1000); + return NULL; + } + param->tmq = tmq; + param->retMsg = &tmq_message; + param->pVg = pVg; SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; - sendInfo->param = pVg; + sendInfo->param = param; sendInfo->fp = tmq_poll_cb_inner; /*printf("req offset: %ld\n", pReq->offset);*/ -- GitLab