diff --git a/include/client/taos.h b/include/client/taos.h index 029aad8715aae49d80f4b171ca2937f01dd6682a..ac2893651d82663a4b58d02f0fb9e37f1db80014 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -202,34 +202,36 @@ enum tmq_resp_err_t { typedef enum tmq_resp_err_t tmq_resp_err_t; -typedef struct tmq_t tmq_t; -typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; +typedef struct tmq_t tmq_t; +typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; -typedef struct tmq_message_t tmq_message_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; +typedef struct tmq_message_t tmq_message_t; -typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); -DLL_EXPORT tmq_list_t* tmq_list_new(); -DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); +typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); -DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); -DLL_EXPORT tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen); +DLL_EXPORT tmq_list_t *tmq_list_new(); +DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *); + +DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); +DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); +DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ -DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); +DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); #if 0 DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics); #endif -DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); +DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); #if 0 DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq); DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups); #endif -DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async); +DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); #endif @@ -243,10 +245,13 @@ enum tmq_conf_res_t { typedef enum tmq_conf_res_t tmq_conf_res_t; -DLL_EXPORT tmq_conf_t* tmq_conf_new(); -DLL_EXPORT void tmq_conf_destroy(tmq_conf_t* conf); -DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); +DLL_EXPORT tmq_conf_t *tmq_conf_new(); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); +DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); + +//temporary used function for demo only +void tmqShowMsg(tmq_message_t* tmq_message); #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5b9d356f86ee71d7dcdb78d7c67a74f220eef02..7cd8ffb6f3b8f2e9a82f72263bd47d02e6627c9d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1570,6 +1570,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct SMqSetCVgReq { + int64_t leftForVer; int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; @@ -1604,6 +1605,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); @@ -1612,12 +1614,13 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, (char*)pReq->qmsg); + tlen += taosEncodeString(buf, pReq->qmsg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); @@ -1626,7 +1629,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = taosDecodeString(buf, (char**)&pReq->qmsg); + buf = taosDecodeString(buf, &pReq->qmsg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 421c8fae30357e73f235362b3c34018b979d24fc..d5296620086ae1df57abcbae55d1212d6e504c54 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -34,18 +34,18 @@ struct tmq_list_t { struct tmq_topic_vgroup_t { char* topic; int32_t vgId; - int64_t commitOffset; + int64_t offset; }; struct tmq_topic_vgroup_list_t { - int32_t cnt; - int32_t size; + int32_t cnt; + int32_t size; tmq_topic_vgroup_t* elems; }; struct tmq_conf_t { - char clientId[256]; - char groupId[256]; + char clientId[256]; + char groupId[256]; /*char* ip;*/ /*uint16_t port;*/ tmq_commit_cb* commit_cb; @@ -62,9 +62,9 @@ struct tmq_t { STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; - SArray* clientTopics; //SArray - //stat - int64_t pollCnt; + SArray* clientTopics; // SArray + // stat + int64_t pollCnt; }; struct tmq_message_t { @@ -77,7 +77,7 @@ typedef struct SMqClientVg { // offset int64_t committedOffset; int64_t currentOffset; - //connection info + // connection info int32_t vgId; SEpSet epSet; } SMqClientVg; @@ -89,9 +89,14 @@ typedef struct SMqClientTopic { char* topicName; int64_t topicId; int32_t nextVgIdx; - SArray* vgs; //SArray + SArray* vgs; // SArray } SMqClientTopic; +typedef struct SMqSubscribeCbParam { + tmq_t* tmq; + tsem_t rspSem; + tmq_resp_err_t rspErr; +} SMqSubscribeCbParam; typedef struct SMqAskEpCbParam { tmq_t* tmq; @@ -102,13 +107,15 @@ typedef struct SMqConsumeCbParam { tmq_t* tmq; SMqClientVg* pVg; tmq_message_t** retMsg; + tsem_t rspSem; } SMqConsumeCbParam; -typedef struct SMqSubscribeCbParam { - tmq_t* tmq; - tsem_t rspSem; - tmq_resp_err_t rspErr; -} SMqSubscribeCbParam; +typedef struct SMqCommitCbParam { + tmq_t* tmq; + SMqClientVg* pVg; + int32_t async; + tsem_t rspSem; +} SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t)); @@ -116,7 +123,7 @@ tmq_conf_t* tmq_conf_new() { } void tmq_conf_destroy(tmq_conf_t* conf) { - if(conf) free(conf); + if (conf) free(conf); } tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { @@ -130,7 +137,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } tmq_list_t* tmq_list_new() { - tmq_list_t *ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); + tmq_list_t* ptr = malloc(sizeof(tmq_list_t) + 8 * sizeof(char*)); if (ptr == NULL) { return ptr; } @@ -140,7 +147,7 @@ tmq_list_t* tmq_list_new() { } int32_t tmq_list_append(tmq_list_t* ptr, char* src) { - if (ptr->cnt >= ptr->tot-1) return -1; + if (ptr->cnt >= ptr->tot - 1) return -1; ptr->elems[ptr->cnt] = strdup(src); ptr->cnt++; return 0; @@ -153,6 +160,16 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } +int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqCommitCbParam* pParam = (SMqCommitCbParam*) param; + tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + if (pParam->tmq->commit_cb) { + pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); + } + if (!pParam->async) tsem_post(&pParam->rspSem); + return 0; +} + tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = calloc(sizeof(tmq_t), 1); if (pTmq == NULL) { @@ -161,7 +178,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->pTscObj = (STscObj*)conn; pTmq->status = 0; pTmq->pollCnt = 0; - pTmq->epoch = 0; + pTmq->epoch = 0; taosInitRWLatch(&pTmq->lock); strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -173,9 +190,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { - SRequestObj *pRequest = NULL; - int32_t sz = topic_list->cnt; - //destroy ex + SRequestObj* pRequest = NULL; + int32_t sz = topic_list->cnt; + // destroy ex taosArrayDestroy(tmq->clientTopics); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); @@ -190,34 +207,34 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SName name = {0}; char* dbName = getDbOfConnection(tmq->pTscObj); - tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); + tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); tNameFromString(&name, topicName, T_NAME_TABLE); char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFname == NULL) { - } tNameExtractFullName(&name, topicFname); tscDebug("subscribe topic: %s", topicFname); SMqClientTopic topic = { - .nextVgIdx = 0, - .sql = NULL, - .sqlLen = 0, - .topicId = 0, - .topicName = topicFname, - .vgs = NULL + .nextVgIdx = 0, + .sql = NULL, + .sqlLen = 0, + .topicId = 0, + .topicName = topicFname, + .vgs = NULL }; topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); - taosArrayPush(tmq->clientTopics, &topic); + taosArrayPush(tmq->clientTopics, &topic); /*SMqClientTopic topic = {*/ - /*.*/ + /*.*/ /*};*/ taosArrayPush(req.topicNames, &topicFname); + free(dbName); } - int tlen = tSerializeSCMSubscribeReq(NULL, &req); + int tlen = tSerializeSCMSubscribeReq(NULL, &req); void* buf = malloc(tlen); - if(buf == NULL) { + if (buf == NULL) { goto _return; } @@ -230,13 +247,10 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tscError("failed to malloc sqlObj"); } - SMqSubscribeCbParam param = { - .rspErr = TMQ_RESP_ERR__SUCCESS, - .tmq = tmq - }; + SMqSubscribeCbParam param = {.rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq}; tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->param = ¶m; @@ -251,18 +265,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { _return: /*if (sendInfo != NULL) {*/ - /*destroySendMsgInfo(sendInfo);*/ + /*destroySendMsgInfo(sendInfo);*/ /*}*/ return param.rspErr; } -void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { - conf->commit_cb = cb; -} +void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { - tmq_t* pTmq = (void*)param; + tmq_t* pTmq = (void*)param; SArray* pArray = taosArrayInit(0, sizeof(SKv)); if (pArray == NULL) { return NULL; @@ -276,12 +288,12 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { } pMqHb->consumerId = connKey.connId; SArray* clientTopics = pTmq->clientTopics; - int sz = taosArrayGetSize(clientTopics); + int sz = taosArrayGetSize(clientTopics); for (int i = 0; i < sz; i++) { SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); /*if (pCTopic->vgId == -1) {*/ - /*pMqHb->status = 1;*/ - /*break;*/ + /*pMqHb->status = 1;*/ + /*break;*/ /*}*/ } kv.value = pMqHb; @@ -291,24 +303,11 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { return pArray; } -tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { - tmq_t* pTmq = malloc(sizeof(tmq_t)); - if (pTmq == NULL) { - return NULL; - } - strcpy(pTmq->groupId, conf->groupId); - strcpy(pTmq->clientId, conf->clientId); - pTmq->pTscObj = (STscObj*)conn; - pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ; - - return pTmq; -} - -TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { - STscObj *pTscObj = (STscObj*)taos; - SRequestObj *pRequest = NULL; - SQueryNode *pQueryNode = NULL; - char *pStr = NULL; +TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + SRequestObj* pRequest = NULL; + SQueryNode* pQueryNode = NULL; + char* pStr = NULL; terrno = TSDB_CODE_SUCCESS; if (taos == NULL || topicName == NULL || sql == NULL) { @@ -334,17 +333,18 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); - SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode; + SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo*)pQueryNode; pQueryStmtInfo->info.continueQuery = true; // todo check for invalid sql statement and return with error code - SSchema *schema = NULL; + SSchema* schema = NULL; int32_t numOfCols = 0; - CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), _return); + CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, &schema, &numOfCols, NULL, pRequest->requestId), + _return); pStr = qDagToString(pRequest->body.pDag); - if(pStr == NULL) { + if (pStr == NULL) { goto _return; } @@ -352,24 +352,24 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, // The topic should be related to a database that the queried table is belonged to. SName name = {0}; - char dbName[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(&((SQueryStmtInfo*) pQueryNode)->pTableMetaInfo[0]->name, dbName); + char dbName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&((SQueryStmtInfo*)pQueryNode)->pTableMetaInfo[0]->name, dbName); - tNameFromString(&name, dbName, T_NAME_ACCT|T_NAME_DB); + tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB); tNameFromString(&name, topicName, T_NAME_TABLE); char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; tNameExtractFullName(&name, topicFname); SCMCreateTopicReq req = { - .name = (char*) topicFname, - .igExists = 1, - .physicalPlan = (char*) pStr, - .sql = (char*) sql, - .logicalPlan = "no logic plan", + .name = (char*)topicFname, + .igExists = 1, + .physicalPlan = (char*)pStr, + .sql = (char*)sql, + .logicalPlan = "no logic plan", }; - int tlen = tSerializeSCMCreateTopicReq(NULL, &req); + int tlen = tSerializeSCMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; @@ -379,11 +379,11 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, tSerializeSCMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; pRequest->type = TDMT_MND_CREATE_TOPIC; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); @@ -393,7 +393,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, _return: qDestroyQuery(pQueryNode); /*if (sendInfo != NULL) {*/ - /*destroySendMsgInfo(sendInfo);*/ + /*destroySendMsgInfo(sendInfo);*/ /*}*/ if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { @@ -403,7 +403,7 @@ _return: return pRequest; } -static char *formatTimestamp(char *buf, int64_t val, int precision) { +static char* formatTimestamp(char* buf, int64_t val, int precision) { time_t tt; int32_t ms = 0; if (precision == TSDB_TIME_PRECISION_NANO) { @@ -439,7 +439,7 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { } } - struct tm *ptm = localtime(&tt); + struct tm* ptm = localtime(&tt); size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); if (precision == TSDB_TIME_PRECISION_NANO) { @@ -453,43 +453,35 @@ static char *formatTimestamp(char *buf, int64_t val, int precision) { return buf; } -int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { - if (code == -1) { - printf("msg discard\n"); - free(param); - return 0; - } - char pBuf[128]; - SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; - SMqClientVg* pVg = pParam->pVg; - SMqConsumeRsp rsp = {0}; - tDecodeSMqConsumeRsp(pMsg->pData, &rsp); - if (rsp.numOfTopics == 0) { - /*printf("no data\n");*/ - free(param); - return 0; +void tmqShowMsg(tmq_message_t* tmq_message) { + if (tmq_message == NULL) return; + + static bool noPrintSchema; + char pBuf[128]; + SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + int32_t colNum = pRsp->schemas->nCols; + if (!noPrintSchema) { + printf("|"); + for (int32_t i = 0; i < colNum; i++) { + if (i == 0) + printf(" %25s |", pRsp->schemas->pSchema[i].name); + else + printf(" %15s |", pRsp->schemas->pSchema[i].name); + } + printf("\n"); + printf("===============================================\n"); + noPrintSchema = true; } - int32_t colNum = rsp.schemas->nCols; - pVg->currentOffset = rsp.rspOffset; - /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ - /*printf("-----msg begin----\n");*/ - printf("|"); - for (int32_t i = 0; i < colNum; i++) { - if (i == 0) printf(" %25s |", rsp.schemas->pSchema[i].name); - else printf(" %15s |", rsp.schemas->pSchema[i].name); - } - printf("\n"); - printf("===============================================\n"); - int32_t sz = taosArrayGetSize(rsp.pBlockData); + int32_t sz = taosArrayGetSize(pRsp->pBlockData); for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); - int32_t rows = pDataBlock->info.rows; + SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i); + int32_t rows = pDataBlock->info.rows; for (int32_t j = 0; j < rows; j++) { printf("|"); for (int32_t k = 0; k < colNum; k++) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch(pColInfoData->info.type) { + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); printf(" %25s |", pBuf); @@ -503,15 +495,42 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { printf("\n"); } } - tDeleteSMqConsumeRsp(&rsp); - free(param); +} + +int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; + SMqClientVg* pVg = pParam->pVg; + if (code != 0) { + printf("msg discard\n"); + tsem_post(&pParam->rspSem); + return 0; + } + + SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); + if (pRsp == NULL) { + tsem_post(&pParam->rspSem); + return -1; + } + tDecodeSMqConsumeRsp(pMsg->pData, pRsp); + printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset); + if (pRsp->numOfTopics == 0) { + /*printf("no data\n");*/ + free(pRsp); + tsem_post(&pParam->rspSem); + return 0; + } + *pParam->retMsg = (tmq_message_t*)pRsp; + pVg->currentOffset = pRsp->rspOffset; + /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ + /*printf("-----msg begin----\n");*/ + tsem_post(&pParam->rspSem); /*printf("\n-----msg end------\n");*/ return 0; } -int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { +int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; - tmq_t* tmq = pParam->tmq; + tmq_t* tmq = pParam->tmq; if (code != 0) { printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); if (pParam->wait) { @@ -520,15 +539,15 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } tscDebug("tmq ask ep cb called"); - bool set = false; + bool set = false; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); int32_t sz = taosArrayGetSize(rsp.topics); // TODO: lock - /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ - /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ + /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ + /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ if (rsp.epoch != tmq->epoch) { - //TODO + // TODO if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); for (int32_t i = 0; i < sz; i++) { @@ -540,12 +559,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { for (int32_t j = 0; j < vgSz; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); SMqClientVg clientVg = { - .pollCnt = 0, - .committedOffset = -1, - .currentOffset = -1, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet - }; + .pollCnt = 0, .committedOffset = -1, .currentOffset = -1, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet}; taosArrayPush(topic.vgs, &clientVg); set = true; } @@ -562,66 +576,67 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { tsem_post(&tmq->rspSem); } tDeleteSMqCMGetSubEpRsp(&rsp); - free(pParam); return 0; } int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { - int32_t tlen = sizeof(SMqCMGetSubEpReq); - SMqCMGetSubEpReq* buf = malloc(tlen); - if (buf == NULL) { - goto END; - tscError("failed to malloc get subscribe ep buf"); - } - buf->consumerId = htobe64(tmq->consumerId); - strcpy(buf->cgroup, tmq->groupId); - - SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); - if (pRequest == NULL) { - goto END; - tscError("failed to malloc subscribe ep request"); - } + int32_t tlen = sizeof(SMqCMGetSubEpReq); + SMqCMGetSubEpReq* buf = malloc(tlen); + if (buf == NULL) { + tscError("failed to malloc get subscribe ep buf"); + goto END; + } + buf->consumerId = htobe64(tmq->consumerId); + strcpy(buf->cgroup, tmq->groupId); - pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); + if (pRequest == NULL) { + tscError("failed to malloc subscribe ep request"); + goto END; + } - SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); - if (pParam == NULL) { - goto END; - } - pParam->tmq = tmq; - pParam->wait = wait; + pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmq_ask_ep_cb; + SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); + if (pParam == NULL) { + tscError("failed to malloc subscribe param"); + goto END; + } + pParam->tmq = tmq; + pParam->wait = wait; - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqAskEpCb; - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); END: - if (wait) tsem_wait(&tmq->rspSem); - return 0; + if (wait) tsem_wait(&tmq->rspSem); + return 0; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, SMqClientVg** ppVg) { +SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t type, SMqClientTopic* pTopic, + SMqClientVg* pVg) { SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); if (pReq == NULL) { return NULL; } pReq->reqType = type; + strcpy(pReq->topic, pTopic->topicName); pReq->blockingTime = blocking_time; pReq->consumerId = tmq->consumerId; strcpy(pReq->cgroup, tmq->groupId); - tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - strcpy(pReq->topic, pTopic->topicName); - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); - pReq->offset = pVg->currentOffset+1; - *ppVg = pVg; + if (type == TMQ_REQ_TYPE_COMMIT_ONLY) { + pReq->offset = pVg->currentOffset; + } else { + pReq->offset = pVg->currentOffset + 1; + } pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); @@ -632,10 +647,11 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; int64_t status = atomic_load_64(&tmq->status); - tmqAsyncAskEp(tmq, taosArrayGetSize(tmq->clientTopics)); + tmqAsyncAskEp(tmq, status == 0); - /*if (blocking_time < 0) blocking_time = 500;*/ - blocking_time = 1; + if (blocking_time < 0) blocking_time = 1; + if (blocking_time > 1000) blocking_time = 1000; + /*blocking_time = 1;*/ if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); @@ -648,8 +664,10 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } - SMqClientVg* pVg = NULL; - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, &pVg); + tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); + pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg); if (pReq == NULL) { usleep(blocking_time * 1000); return NULL; @@ -663,14 +681,15 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { param->tmq = tmq; param->retMsg = &tmq_message; param->pVg = pVg; + tsem_init(¶m->rspSem, 0, 0); SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; + pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; sendInfo->param = param; - sendInfo->fp = tmq_poll_cb_inner; + sendInfo->fp = tmqPollCb; /*printf("req offset: %ld\n", pReq->offset);*/ @@ -678,34 +697,92 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); tmq->pollCnt++; - usleep(blocking_time * 1000); + tsem_wait(¶m->rspSem); + tsem_destroy(¶m->rspSem); + free(param); + + if (tmq_message == NULL) { + usleep(blocking_time * 1000); + } return tmq_message; /*tsem_wait(&pRequest->body.rspSem);*/ /*if (body != NULL) {*/ - /*destroySendMsgInfo(body);*/ + /*destroySendMsgInfo(body);*/ /*}*/ /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/ - /*pRequest->code = terrno;*/ + /*pRequest->code = terrno;*/ /*}*/ /*return pRequest;*/ } tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { - SMqConsumeReq req = {0}; + + if (tmq_topic_vgroup_list != NULL) { + //TODO + } + + //TODO: change semaphore to gate + for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg); + + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; + SMqCommitCbParam *pParam = malloc(sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + continue; + } + pParam->tmq = tmq; + pParam->pVg = pVg; + pParam->async = async; + if (!async) tsem_init(&pParam->rspSem, 0, 0); + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqCommitCb; + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + + if (!async) tsem_wait(&pParam->rspSem); + } + } + return 0; } void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; + SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + tDeleteSMqConsumeRsp(pRsp); + free(tmq_message); } +#if 0 +tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { + tmq_t* pTmq = malloc(sizeof(tmq_t)); + if (pTmq == NULL) { + return NULL; + } + strcpy(pTmq->groupId, conf->groupId); + strcpy(pTmq->clientId, conf->clientId); + pTmq->pTscObj = (STscObj*)conn; + pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ; + return pTmq; +} + + static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { assert(pMsgBody != NULL); tfree(pMsgBody->msgInfo.pData); tfree(pMsgBody); } +#endif diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 7ef06397f0a91a9e053aec94bf38dfa7f96e6a89..b7ee64e19fae0a8cdd048f2964392b2e4e30b192 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -583,7 +583,7 @@ TEST(testCase, create_topic_ctb_Test) { taos_free_result(pRes); char* sql = "select * from tu"; - pRes = taos_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql)); + pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); } @@ -607,7 +607,7 @@ TEST(testCase, create_topic_stb_Test) { taos_free_result(pRes); char* sql = "select * from st1"; - pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); + pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); } @@ -633,11 +633,11 @@ TEST(testCase, tmq_subscribe_ctb_Test) { while (1) { tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); + tmq_message_destroy(msg); //printf("get msg\n"); //if (msg == NULL) break; } } -#endif TEST(testCase, tmq_subscribe_stb_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -656,11 +656,18 @@ TEST(testCase, tmq_subscribe_stb_Test) { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "test_stb_topic_1"); tmq_subscribe(tmq, topic_list); - + + int cnt = 1; while (1) { tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); + if (msg == NULL) continue; + tmqShowMsg(msg); + if (cnt++ % 10 == 0){ + tmq_commit(tmq, NULL, 0); + } + //tmq_commit(tmq, NULL, 0); + tmq_message_destroy(msg); //printf("get msg\n"); - //if (msg == NULL) break; } } @@ -669,6 +676,7 @@ TEST(testCase, tmq_consume_Test) { TEST(testCase, tmq_commit_TEST) { } +#endif #if 0 TEST(testCase, projection_query_tables) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a9a633cf63e20931b6a09e8daebc9c3d37db2d6d..ab9a1cb5eab31b38f51d02d6fb6e878c4a3120b9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -633,7 +633,7 @@ typedef struct SMqConsumerTopic { } SMqConsumerTopic; static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, - SMqSubscribeObj* pSub) { + SMqSubscribeObj* pSub, int64_t* oldConsumerId) { SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic)); if (pCTopic == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -646,6 +646,7 @@ static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqT int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg); if (unassignedVgSz > 0) { SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg); + *oldConsumerId = pCEp->consumerId; pCEp->consumerId = consumerId; taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId); taosArrayPush(pSub->assigned, pCEp); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 47bdefea3d06d658f682c823c4c87dd8f47d6795..bf138d335be2fd6331345a6302aa845e2ea1f00c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -48,7 +48,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub); + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub, + int64_t oldConsumerId); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -166,7 +167,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { // put consumer into lostConsumer taosArrayPush(pSub->lostConsumer, pCEp); - // put vg into unassgined + // put vg into unassigned taosArrayPush(pSub->unassignedVg, pCEp); // remove from assigned // TODO: swap with last one, reduce size and reset i @@ -202,6 +203,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { for (int32_t i = 0; i < sz; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); + int64_t oldConsumerId = pCEp->consumerId; pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); @@ -222,7 +224,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { req.logicalPlan = pTopic->logicalPlan; req.physicalPlan = pTopic->physicalPlan; req.qmsg = pCEp->qmsg; + req.oldConsumerId = oldConsumerId; req.newConsumerId = consumerId; + req.vgId = pCEp->vgId; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { @@ -237,6 +241,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { tEncodeSMqSetCVgReq(&abuf, &req); // persist msg + // TODO: no need for txn STransAction action = {0}; action.epSet = pCEp->epSet; action.pCont = buf; @@ -303,13 +308,12 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas ASSERT(CEp.vgId == pVgroup->vgId); CEp.qmsg = strdup(pTaskInfo->msg->msg); taosArrayPush(unassignedVg, &CEp); - //TODO: free taskInfo + // TODO: free taskInfo taosArrayDestroy(pArray); /*SEpSet *pEpSet = &plan->execNode.epset;*/ /*pEpSet->inUse = 0;*/ /*addEpIntoEpSet(pEpSet, "localhost", 6030);*/ - } /*qDestroyQueryDag(pDag);*/ @@ -317,14 +321,15 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) { + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp, + int64_t oldConsumerId) { int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); for (int32_t i = 0; i < sz; i++) { int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); SMqSetCVgReq req = { .vgId = vgId, - .oldConsumerId = -1, + .oldConsumerId = oldConsumerId, .newConsumerId = pConsumer->consumerId, }; strcpy(req.cgroup, pConsumer->cgroup); @@ -459,10 +464,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc return 0; } -static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { - return 0; -} - static char *mndMakeSubscribeKey(char *cgroup, char *topicName) { char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); if (key == NULL) { @@ -642,7 +643,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { free(key); // set unassigned vg if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) { - //TODO: free memory + // TODO: free memory return -1; } // TODO: disable alter @@ -650,7 +651,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } taosArrayPush(pSub->availConsumer, &consumerId); - SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); + int64_t oldConsumerId; + SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub, &oldConsumerId); taosArrayPush(pConsumer->topics, pConsumerTopic); if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { @@ -658,7 +660,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned); if (pCEp->vgId == vgId) { - if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) { + if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp, oldConsumerId) < 0) { // TODO return -1; } diff --git a/source/dnode/vnode/src/inc/tqMetaStore.h b/source/dnode/vnode/src/inc/tqMetaStore.h index ef71d8bf145ae535edf7740e864661e576078814..3bf9bb713880b6d2d499a7b317dfcd61fdcab5b9 100644 --- a/source/dnode/vnode/src/inc/tqMetaStore.h +++ b/source/dnode/vnode/src/inc/tqMetaStore.h @@ -40,6 +40,7 @@ int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); // delete committed kv pair // notice that a delete action still needs to be committed int32_t tqHandleDel(STqMetaStore*, int64_t key); +int32_t tqHandlePurge(STqMetaStore*, int64_t key); int32_t tqHandleCommit(STqMetaStore*, int64_t key); int32_t tqHandleAbort(STqMetaStore*, int64_t key); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 139120a46d7ccd11143ad7713ef244de3cd3cd7f..4e52ecd8585240f26ebff9e417889eb22efd5e98 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -25,17 +25,6 @@ // handle management message // -int tqGroupSSize(const STqGroup* pGroup); -int tqTopicSSize(); -int tqItemSSize(); - -void* tqSerializeListHandle(STqList* listHandle, void* ptr); -void* tqSerializeTopic(STqTopic* pTopic, void* ptr); -void* tqSerializeItem(STqMsgItem* pItem, void* ptr); - -const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic); -const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); - int tqInit() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); if (old == 1) return 0; @@ -88,177 +77,6 @@ void tqClose(STQ* pTq) { // TODO } -static int tqProtoCheck(STqMsgHead* pMsg) { - // TODO - return pMsg->protoVer == 0; -} - -static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) { - // clean old item and move forward - int32_t consumeOffset = pAck->consumeOffset; - int idx = consumeOffset % TQ_BUFFER_SIZE; - ASSERT(pTopic->buffer[idx].content && pTopic->buffer[idx].executor); - tfree(pTopic->buffer[idx].content); - if (1 /* TODO: need to launch new query */) { - STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg)); - if (pNewQuery == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - // TODO: lock executor - // TODO: read from wal and assign to src - /*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/ - /*pNewQuery->exec->src = 0;*/ - /*pNewQuery->exec->dest = &pTopic->buffer[idx];*/ - /*pNewQuery->next = *ppQuery;*/ - /**ppQuery = pNewQuery;*/ - } - return 0; -} - -static int tqAck(STqGroup* pGroup, STqAcks* pAcks) { - int32_t ackNum = pAcks->ackNum; - STqOneAck* acks = pAcks->acks; - // double ptr for acks and list - int i = 0; - STqList* node = pGroup->head; - int ackCnt = 0; - STqQueryMsg* pQuery = NULL; - while (i < ackNum && node->next) { - if (acks[i].topicId == node->next->topic.topicId) { - ackCnt++; - tqAckOneTopic(&node->next->topic, &acks[i], &pQuery); - } else if (acks[i].topicId < node->next->topic.topicId) { - i++; - } else { - node = node->next; - } - } - if (pQuery) { - // post message - } - return ackCnt; -} - -static int tqCommitGroup(STqGroup* pGroup) { - // persist modification into disk - return 0; -} - -int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup** ppGroup) { - // create in disk - STqGroup* pGroup = (STqGroup*)malloc(sizeof(STqGroup)); - if (pGroup == NULL) { - // TODO - return -1; - } - *ppGroup = pGroup; - memset(pGroup, 0, sizeof(STqGroup)); - - pGroup->topicList = tdListNew(sizeof(STqTopic)); - if (pGroup->topicList == NULL) { - free(pGroup); - return -1; - } - *ppGroup = pGroup; - - return 0; -} - -STqGroup* tqOpenGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - STqGroup* pGroup = tqHandleGet(pTq->tqMeta, cId); - if (pGroup == NULL) { - int code = tqCreateGroup(pTq, topicId, cgId, cId, &pGroup); - if (code < 0) { - // TODO - return NULL; - } - tqHandleMovePut(pTq->tqMeta, cId, pGroup); - } - ASSERT(pGroup); - - return pGroup; -} - -int tqCloseGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - // TODO - return 0; -} - -int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - // delete from disk - return 0; -} - -static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) { - STqList* pHead = pGroup->head; - STqList* pNode = pHead; - int totSize = 0; - int numOfMsgs = 0; - // TODO: make it a macro - int sizeLimit = 4 * 1024; - - void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit); - if (ptr == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - *pRsp = ptr; - STqMsgContent* buffer = (*pRsp)->msgs; - - // iterate the list to get msgs of all topics - // until all topic iterated or msgs over sizeLimit - while (pNode->next) { - pNode = pNode->next; - STqTopic* pTopic = &pNode->topic; - int idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE; - if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) { - totSize += pTopic->buffer[idx].size; - if (totSize > sizeLimit) { - void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize); - if (ptr == NULL) { - totSize -= pTopic->buffer[idx].size; - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - // return msgs already copied - break; - } - *pRsp = ptr; - break; - } - *((int64_t*)buffer) = pTopic->topicId; - buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = pTopic->buffer[idx].size; - buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size); - numOfMsgs++; - if (totSize > sizeLimit) { - break; - } - } - } - (*pRsp)->bodySize = totSize; - return numOfMsgs; -} - -STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); } - -int tqSendLaunchQuery(STqMsgItem* bufItem, int64_t offset) { - if (tqQueryExecuting(bufItem->status)) { - return 0; - } - bufItem->status = 1; - // load data from wal or buffer pool - // put into exec - // send exec into non blocking queue - // when query finished, put into buffer pool - return 0; -} - -/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/ -/*return 0;*/ -/*}*/ - int tqPushMsg(STQ* pTq, void* p, int64_t version) { // add reference // judge and launch new query @@ -270,218 +88,6 @@ int tqCommit(STQ* pTq) { return 0; } -int tqBufferSetOffset(STqTopic* pTopic, int64_t offset) { - int code; - memset(pTopic->buffer, 0, sizeof(pTopic->buffer)); - // launch query - for (int i = offset; i < offset + TQ_BUFFER_SIZE; i++) { - int pos = i % TQ_BUFFER_SIZE; - code = tqSendLaunchQuery(&pTopic->buffer[pos], offset); - if (code < 0) { - // TODO: error handling - } - } - // set offset - pTopic->nextConsumeOffset = offset; - pTopic->floatingCursor = offset; - return 0; -} - -STqTopic* tqFindTopic(STqGroup* pGroup, int64_t topicId) { - // TODO - return NULL; -} - -int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) { - int code; - int64_t clientId = pMsg->head.clientId; - int64_t topicId = pMsg->topicId; - int64_t offset = pMsg->offset; - STqGroup* gHandle = tqGetGroup(pTq, clientId); - if (gHandle == NULL) { - // client not connect - return -1; - } - STqTopic* topicHandle = tqFindTopic(gHandle, topicId); - if (topicHandle == NULL) { - return -1; - } - if (pMsg->offset == topicHandle->nextConsumeOffset) { - return 0; - } - // TODO: check log last version - - code = tqBufferSetOffset(topicHandle, offset); - if (code < 0) { - // set error code - return -1; - } - - return 0; -} - -// temporary -int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) { - int64_t clientId = pMsg->head.clientId; - STqGroup* pGroup = tqGetGroup(pTq, clientId); - if (pGroup == NULL) { - terrno = TSDB_CODE_TQ_GROUP_NOT_SET; - return -1; - } - pGroup->rspHandle.handle = pRsp->handle; - pGroup->rspHandle.ahandle = pRsp->ahandle; - - return 0; -} - -int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { - STqConsumeReq* pMsg = pReq->pCont; - int64_t clientId = pMsg->head.clientId; - STqGroup* pGroup = tqGetGroup(pTq, clientId); - if (pGroup == NULL) { - terrno = TSDB_CODE_TQ_GROUP_NOT_SET; - return -1; - } - - SList* topicList = pGroup->topicList; - - int totSize = 0; - int numOfMsgs = 0; - int sizeLimit = 4096; - - STqConsumeRsp* pCsmRsp = (*pRsp)->pCont; - void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); - if (ptr == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; - } - (*pRsp)->pCont = ptr; - - SListIter iter; - tdListInitIter(topicList, &iter, TD_LIST_FORWARD); - - STqMsgContent* buffer = NULL; - SArray* pArray = taosArrayInit(0, sizeof(void*)); - - SListNode* pn; - while ((pn = tdListNext(&iter)) != NULL) { - STqTopic* pTopic = *(STqTopic**)pn->data; - int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; - STqMsgItem* pItem = &pTopic->buffer[idx]; - if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) { - if (pItem->status == TQ_ITEM_READY) { - // if has data - totSize += pTopic->buffer[idx].size; - if (totSize > sizeLimit) { - void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize); - if (ptr == NULL) { - totSize -= pTopic->buffer[idx].size; - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - // return msgs already copied - break; - } - (*pRsp)->pCont = ptr; - break; - } - *((int64_t*)buffer) = htobe64(pTopic->topicId); - buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - *((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); - memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size); - buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size); - numOfMsgs++; - if (totSize > sizeLimit) { - break; - } - } else if (pItem->status == TQ_ITEM_PROCESS) { - // if not have data but in process - - } else if (pItem->status == TQ_ITEM_EMPTY) { - // if not have data and not in process - int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS); - if (old != TQ_ITEM_EMPTY) { - continue; - } - pItem->offset = pTopic->floatingCursor; - taosArrayPush(pArray, &pItem); - } else { - ASSERT(0); - } - } - } - - if (numOfMsgs > 0) { - // set code and other msg - rpcSendResponse(*pRsp); - } else { - // most recent data has been fetched - - // enable timer for blocking wait - // once new data written when waiting, launch query and rsp - } - - // fetched a num of msgs, rpc response - for (int i = 0; i < pArray->size; i++) { - STqMsgItem* pItem = taosArrayGet(pArray, i); - - // read from wal - void* raw = NULL; - /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ - /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ - /*if (code < 0) {*/ - // TODO: error - /*}*/ - // get msgType - // if submitblk - pItem->executor->assign(pItem->executor->runtimeEnv, raw); - SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); - pItem->content = content; - // if other type, send just put into buffer - /*pItem->content = raw;*/ - - int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); - ASSERT(old == TQ_ITEM_PROCESS); - } - taosArrayDestroy(pArray); - - return 0; -} - -#if 0 -int tqConsume(STQ* pTq, STqConsumeReq* pMsg) { - if (!tqProtoCheck((STqMsgHead*)pMsg)) { - // proto version invalid - return -1; - } - int64_t clientId = pMsg->head.clientId; - STqGroup* pGroup = tqGetGroup(pTq, clientId); - if (pGroup == NULL) { - // client not connect - return -1; - } - if (pMsg->acks.ackNum != 0) { - if (tqAck(pGroup, &pMsg->acks) != 0) { - // ack not success - return -1; - } - } - - STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg; - - if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) { - // fetch error - return -1; - } - - // judge and launch new query - /*if (tqSendLaunchQuery(gHandle)) {*/ - // launch query error - /*return -1;*/ - /*}*/ - return 0; -} -#endif - int tqSerializeConsumer(const STqConsumerHandle* pConsumer, STqSerializedHead** ppHead) { int32_t num = taosArrayGetSize(pConsumer->topics); int32_t sz = sizeof(STqSerializedHead) + sizeof(int64_t) * 2 + TSDB_TOPIC_FNAME_LEN + @@ -535,138 +141,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan return NULL; } -#if 0 -int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) { - // calculate size - int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead); - if (sz > (*ppHead)->ssize) { - void* tmpPtr = realloc(*ppHead, sz); - if (tmpPtr == NULL) { - free(*ppHead); - // TODO: memory err - return -1; - } - *ppHead = tmpPtr; - (*ppHead)->ssize = sz; - } - void* ptr = (*ppHead)->content; - // do serialization - *(int64_t*)ptr = pGroup->clientId; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = pGroup->cgId; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int32_t*)ptr = pGroup->topicNum; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - if (pGroup->topicNum > 0) { - tqSerializeListHandle(pGroup->head, ptr); - } - return 0; -} - -void* tqSerializeListHandle(STqList* listHandle, void* ptr) { - STqList* node = listHandle; - ASSERT(node != NULL); - while (node) { - ptr = tqSerializeTopic(&node->topic, ptr); - node = node->next; - } - return ptr; -} - -void* tqSerializeTopic(STqTopic* pTopic, void* ptr) { - *(int64_t*)ptr = pTopic->nextConsumeOffset; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - *(int64_t*)ptr = pTopic->topicId; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - /**(int32_t*)ptr = pTopic->head;*/ - /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ - /**(int32_t*)ptr = pTopic->tail;*/ - /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqSerializeItem(&pTopic->buffer[i], ptr); - } - return ptr; -} - -void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) { - // TODO: do we need serialize this? - // mainly for executor - return ptr; -} - -const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) { - STqGroup* gHandle = *ppGroup; - const void* ptr = pHead->content; - gHandle->clientId = *(int64_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - gHandle->cgId = *(int64_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - gHandle->ahandle = NULL; - gHandle->topicNum = *(int32_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); - gHandle->head = NULL; - STqList* node = gHandle->head; - for (int i = 0; i < gHandle->topicNum; i++) { - if (gHandle->head == NULL) { - if ((node = malloc(sizeof(STqList))) == NULL) { - // TODO: error - return NULL; - } - node->next = NULL; - ptr = tqDeserializeTopic(ptr, &node->topic); - gHandle->head = node; - } else { - node->next = malloc(sizeof(STqList)); - if (node->next == NULL) { - // TODO: error - return NULL; - } - node->next->next = NULL; - ptr = tqDeserializeTopic(ptr, &node->next->topic); - node = node->next; - } - } - return ptr; -} - -const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) { - const void* ptr = pBytes; - topic->nextConsumeOffset = *(int64_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - topic->topicId = *(int64_t*)ptr; - ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); - /*topic->head = *(int32_t*)ptr;*/ - /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ - /*topic->tail = *(int32_t*)ptr;*/ - /*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/ - for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - ptr = tqDeserializeItem(ptr, &topic->buffer[i]); - } - return ptr; -} - -const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; } - -// TODO: make this a macro -int tqGroupSSize(const STqGroup* gHandle) { - return sizeof(int64_t) * 2 // cId + cgId - + sizeof(int32_t) // topicNum - + gHandle->topicNum * tqTopicSSize(); -} - -// TODO: make this a macro -int tqTopicSSize() { - return sizeof(int64_t) * 2 // nextConsumeOffset + topicId - + sizeof(int32_t) * 2 // head + tail - + TQ_BUFFER_SIZE * tqItemSSize(); -} - -int tqItemSSize() { - // TODO: do this need serialization? - // mainly for executor - return 0; -} -#endif int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; @@ -675,7 +149,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { int64_t fetchOffset = pReq->offset; int64_t blockingTime = pReq->blockingTime; - int rspLen = 0; SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); @@ -694,11 +167,25 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (strcmp(pTopic->topicName, pReq->topic) != 0) { continue; } + + if (pReq->reqType == TMQ_REQ_TYPE_COMMIT_ONLY) { + pTopic->committedOffset = pReq->offset; + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = 0; + rpcSendResponse(pMsg); + return 0; + } + + if (pReq->reqType == TMQ_REQ_TYPE_CONSUME_AND_COMMIT) { + pTopic->committedOffset = pReq->offset-1; + } + rsp.committedOffset = pTopic->committedOffset; rsp.reqOffset = pReq->offset; rsp.skipLogNum = 0; - if (fetchOffset == -1) { + if (fetchOffset <= pTopic->committedOffset) { fetchOffset = pTopic->committedOffset + 1; } int8_t pos; @@ -802,11 +289,22 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { - SMqSetCVgReq req; + SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); - STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); + + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); if (pConsumer == NULL) { - return -1; + pConsumer = calloc(sizeof(STqConsumerHandle), 1); + if (pConsumer == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; + } + } else { + tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.newConsumerId); + tqHandlePurge(pTq->tqMeta, req.oldConsumerId); + terrno = TSDB_CODE_SUCCESS; + return 0; } strcpy(pConsumer->cgroup, req.cgroup); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); @@ -819,9 +317,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return -1; } strcpy(pTopic->topicName, req.topicName); - pTopic->sql = strdup(req.sql); - pTopic->logicalPlan = strdup(req.logicalPlan); - pTopic->physicalPlan = strdup(req.physicalPlan); + pTopic->sql = req.sql; + pTopic->logicalPlan = req.logicalPlan; + pTopic->physicalPlan = req.physicalPlan; pTopic->committedOffset = -1; pTopic->currentOffset = -1; diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index 57e20010e3b12b615875e1ecaba1eddd05161562..d220966ba6d359e5a54e3b77d32633b445545be6 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -584,12 +584,30 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_MASK; STqMetaList* pNode = pMeta->bucket[bucketKey]; while (pNode) { - if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - if (pNode->handle.valueInTxn) { - pMeta->pDeleter(pNode->handle.valueInTxn); + if (pNode->handle.key == key) { + if (pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { + if (pNode->handle.valueInTxn) { + pMeta->pDeleter(pNode->handle.valueInTxn); + } + + pNode->handle.valueInTxn = TQ_DELETE_TOKEN; + tqLinkUnpersist(pMeta, pNode); + return 0; } + } else { + pNode = pNode->next; + } + } + terrno = TSDB_CODE_TQ_META_NO_SUCH_KEY; + return -1; +} - pNode->handle.valueInTxn = TQ_DELETE_TOKEN; +int32_t tqHandlePurge(STqMetaStore* pMeta, int64_t key) { + int64_t bucketKey = key & TQ_BUCKET_MASK; + STqMetaList* pNode = pMeta->bucket[bucketKey]; + while (pNode) { + if (pNode->handle.key == key) { + pNode->handle.valueInUse = TQ_DELETE_TOKEN; tqLinkUnpersist(pMeta, pNode); return 0; } else { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 326d99ddbbc6e912596324ac44c3ac5aa4f0eb75..faf3171d76e909346fb5024eae249c4e7808b393 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -16,6 +16,7 @@ #include "tq.h" #include "vnd.h" +#if 0 int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_VND_MQ_SET_CUR: @@ -26,6 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { } return 0; } +#endif int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SRpcMsg *pMsg; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 96d0c6c385d30ca5121354283da88851a8404640..41fff4d228b86033f3bdbb0b9ba45e72aa62805d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -169,10 +169,17 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { return -1; } - ASSERT(pRead->pHead->head.version == ver); + if (pRead->pHead->head.version != ver) { + /*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/ + pRead->curVersion = -1; + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } code = walValidBodyCksum(pRead->pHead); if (code != 0) { + /*wError("unexpected wal log version: checksum not passed");*/ + pRead->curVersion = -1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; }