diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b3d595d73b57e6da426575dc97a67b711fea72af..398e46e6a4f74ded6827dd6e070fc27b327460af 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1241,6 +1241,12 @@ typedef struct { char data[]; } SVShowTablesFetchRsp; +typedef struct SMqCMGetSubEpReq { + int64_t consumerId; + int32_t epoch; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqCMGetSubEpReq; + #pragma pack(pop) static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { @@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); - tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); + tlen += taosEncodeString(buf, (char*)pReq->qmsg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); - buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); + buf = taosDecodeString(buf, (char**)&pReq->qmsg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } @@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq { char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; +typedef struct SMqSubVgEp { + int32_t vgId; + SEpSet epSet; +} SMqSubVgEp; + +typedef struct SMqSubTopicEp { + char topic[TSDB_TOPIC_FNAME_LEN]; + SArray* vgs; // SArray +} SMqSubTopicEp; + +typedef struct SMqCMGetSubEpRsp { + int64_t consumerId; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + SArray* topics; // SArray +} SMqCMGetSubEpRsp; + +static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI16(buf, pVgEp->vgId); + tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { + buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeSEpSet(buf, &pVgEp->epSet); + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pTopicEp->topic); + int32_t sz = taosArrayGetSize(pTopicEp->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); + tlen += tEncodeSMqSubVgEp(buf, pVgEp); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { + buf = taosDecodeStringTo(buf, pTopicEp->topic); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); + if (pTopicEp->vgs == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp vgEp; + buf = tDecodeSMqSubVgEp(buf, &vgEp); + taosArrayPush(pTopicEp->vgs, &vgEp); + } + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += taosEncodeString(buf, pRsp->cgroup); + int32_t sz = taosArrayGetSize(pRsp->topics); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i); + tlen += tEncodeSMqSubTopicEp(buf, pVgEp); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + buf = taosDecodeStringTo(buf, pRsp->cgroup); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); + if (pRsp->topics == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp topicEp; + buf = tDecodeSMqSubTopicEp(buf, &topicEp); + taosArrayPush(pRsp->topics, &topicEp); + } + return buf; +} #ifdef __cplusplus } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 26d8bc81eb2feb5767addae606bcf539a6b858e5..e81dd798f668f9f37d73e5d4b4ed66bae08e61d2 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -140,6 +140,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) // Requests handled by VNODE diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 179e46527f861185cbb48f5cff11f26db9f78178..28a4e0c87d5f27a87ad6b043814fc26a7122a3be 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) typedef struct SMqClientVg { // statistics - int64_t consumeCnt; + int64_t pollCnt; // offset int64_t committedOffset; int64_t currentOffset; @@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err strcpy(pTmq->groupId, conf->groupId); pTmq->commit_cb = conf->commit_cb; pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); + pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); return pTmq; } @@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tSerializeSCMSubscribeReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT); + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); if (pRequest == NULL) { tscError("failed to malloc sqlObj"); } pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; - pRequest->type = TDMT_MND_SUBSCRIBE; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -596,6 +596,37 @@ struct tmq_message_t { SMqConsumeRsp rsp; }; +int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + return 0; +} + +int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { + tmq_t* tmq = (tmq_t*)param; + SMqCMGetSubEpRsp rsp; + tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); + int32_t sz = taosArrayGetSize(rsp.topics); + // TODO: lock + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet + }; + taosArrayPush(topic.vgs, &clientVg); + } + taosArrayPush(tmq->clientTopics, &topic); + } + // unlock + return 0; +} + tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) { return NULL; @@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { req.reqType = 1; req.blockingTime = blocking_time; req.consumerId = tmq->consumerId; + tmq_message_t* tmq_message = NULL; strcpy(req.cgroup, tmq->groupId); - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); + if (taosArrayGetSize(tmq->clientTopics) == 0) { + int32_t tlen = sizeof(SMqCMGetSubEpReq); + SMqCMGetSubEpReq* buf = malloc(tlen); + if (buf == NULL) { + tscError("failed to malloc get subscribe ep buf"); + } + buf->consumerId = htobe64(buf->consumerId); + + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); + if (pRequest == NULL) { + tscError("failed to malloc subscribe ep request"); + } + + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = tmq; + sendInfo->fp = tmq_ask_ep_cb; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + tsem_wait(&pRequest->body.rspSem); + } + + SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); strcpy(req.topic, pTopic->topicName); int32_t nextVgIdx = pTopic->nextVgIdx; @@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pRequest->type = TDMT_VND_CONSUME; - SMsgSendInfo* body = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = &tmq_message; + sendInfo->fp = tmq_poll_cb_inner; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); - return (tmq_message_t*)pRequest->body.resInfo.pData; + return tmq_message; /*tsem_wait(&pRequest->body.rspSem);*/ diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index d62d7cb826ee5b916e362bf9d547b98f769f87d1..a1adf58f6a485c878fb47629451e8b3dacb58170 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -61,472 +61,472 @@ TEST(testCase, connect_Test) { taos_close(pConn); } -TEST(testCase, create_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, create_account_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); +//TEST(testCase, create_user_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, drop_account_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "show users"); - TAOS_ROW pRow = NULL; - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, drop_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop user abc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_db_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "show databases"); - TAOS_ROW pRow = NULL; - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_close(pConn); -} - -TEST(testCase, create_db_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); + //TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + //if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + //printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + //} - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database abc1 vgroups 4"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - taos_close(pConn); -} - -TEST(testCase, create_dnode_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); - if (taos_errno(pRes) != 0) { - printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //taos_free_result(pRes); + //taos_close(pConn); +//} - pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); - if (taos_errno(pRes) != 0) { - printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); +//TEST(testCase, create_account_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - taos_close(pConn); -} + //TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + //if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + //printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + //} -TEST(testCase, drop_dnode_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //taos_free_result(pRes); + //taos_close(pConn); +//} - TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); - if (taos_errno(pRes) != 0) { - printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); - } +//TEST(testCase, drop_account_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); + //TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + //if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + //printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + //} - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); + //taos_free_result(pRes); + //taos_close(pConn); +//} - pRes = taos_query(pConn, "drop dnode 4"); - if (taos_errno(pRes) != 0) { - printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); - } +//TEST(testCase, show_user_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - taos_free_result(pRes); - taos_close(pConn); -} + //TAOS_RES* pRes = taos_query(pConn, "show users"); + //TAOS_ROW pRow = NULL; -TEST(testCase, use_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); + //taos_free_result(pRes); + //taos_close(pConn); +//} - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); +//TEST(testCase, drop_user_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - taos_close(pConn); -} + //TAOS_RES* pRes = taos_query(pConn, "drop user abc"); + //if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + //printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + //} - TEST(testCase, drop_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //taos_free_result(pRes); + //taos_close(pConn); +//} - showDB(pConn); +//TEST(testCase, show_db_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //TAOS_RES* pRes = taos_query(pConn, "show databases"); + //TAOS_ROW pRow = NULL; - showDB(pConn); + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); - pRes = taos_query(pConn, "create database abc1"); - if (taos_errno(pRes) != 0) { - printf("create to drop db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - taos_close(pConn); -} + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} -TEST(testCase, create_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //taos_close(pConn); +//} - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); +//TEST(testCase, create_db_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("error in create stable, reason:%s\n", taos_errstr(pRes)); - } + //TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); + //if (taos_errno(pRes) != 0) { + //printf("error in create db, reason:%s\n", taos_errstr(pRes)); + //} - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //ASSERT_TRUE(pFields == NULL); - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - taos_free_result(pRes); + //int32_t numOfFields = taos_num_fields(pRes); + //ASSERT_EQ(numOfFields, 0); - pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } + //taos_free_result(pRes); - pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - pRes = taos_query(pConn, "drop stable `123_$^)`"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } + //pRes = taos_query(pConn, "create database abc1 vgroups 4"); + //if (taos_errno(pRes) != 0) { + //printf("error in create db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_close(pConn); +//} - taos_close(pConn); -} +//TEST(testCase, create_dnode_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); -TEST(testCase, create_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); + //if (taos_errno(pRes) != 0) { + //printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); + //pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); - ASSERT_EQ(taos_errno(pRes), 0); + //taos_close(pConn); +//} - taos_free_result(pRes); +//TEST(testCase, drop_dnode_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); - ASSERT_NE(taos_errno(pRes), 0); + //TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); + //if (taos_errno(pRes) != 0) { + //printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); + //} - taos_free_result(pRes); - taos_close(pConn); -} + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //ASSERT_TRUE(pFields == NULL); -TEST(testCase, create_ctable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //int32_t numOfFields = taos_num_fields(pRes); + //ASSERT_EQ(numOfFields, 0); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //pRes = taos_query(pConn, "drop dnode 4"); + //if (taos_errno(pRes) != 0) { + //printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); + //} - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //taos_free_result(pRes); + //taos_close(pConn); +//} - pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); - } +//TEST(testCase, use_db_test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - taos_free_result(pRes); - taos_close(pConn); -} + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("error in use db, reason:%s\n", taos_errstr(pRes)); + //} -TEST(testCase, show_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //ASSERT_TRUE(pFields == NULL); - TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); - if (taos_errno(pRes) != 0) { - printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } + //int32_t numOfFields = taos_num_fields(pRes); + //ASSERT_EQ(numOfFields, 0); - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + //taos_close(pConn); +//} - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } + //TEST(testCase, drop_db_test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - taos_free_result(pRes); - taos_close(pConn); -} + //showDB(pConn); -TEST(testCase, show_vgroup_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); + //if (taos_errno(pRes) != 0) { + //printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //showDB(pConn); - pRes = taos_query(pConn, "show vgroups"); - if (taos_errno(pRes) != 0) { - printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } + //pRes = taos_query(pConn, "create database abc1"); + //if (taos_errno(pRes) != 0) { + //printf("create to drop db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + //taos_close(pConn); +//} - TAOS_ROW pRow = NULL; +//TEST(testCase, create_stable_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); + + //TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + //if (taos_errno(pRes) != 0) { + //printf("error in create db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); + //if (taos_errno(pRes) != 0) { + //printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + //} + + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //ASSERT_TRUE(pFields == NULL); + + //int32_t numOfFields = taos_num_fields(pRes); + //ASSERT_EQ(numOfFields, 0); + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); + //} + + //pRes = taos_query(pConn, "use abc1"); + //taos_free_result(pRes); + //pRes = taos_query(pConn, "drop stable `123_$^)`"); + //if (taos_errno(pRes) != 0) { + //printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); + //} + + //taos_close(pConn); +//} - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); +//TEST(testCase, create_table_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //taos_free_result(pRes); - taos_free_result(pRes); - taos_close(pConn); -} + //pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); + //ASSERT_EQ(taos_errno(pRes), 0); -TEST(testCase, create_multiple_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); + //taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - taos_close(pConn); - return; - } + //pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); + //ASSERT_NE(taos_errno(pRes), 0); - taos_free_result(pRes); + //taos_free_result(pRes); + //taos_close(pConn); +//} - pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } +//TEST(testCase, create_ctable_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); + + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); + //} + + //taos_free_result(pRes); + //taos_close(pConn); +//} - taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } +//TEST(testCase, show_stable_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != nullptr); + + //TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); + //if (taos_errno(pRes) != 0) { + //printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} + + //TAOS_ROW pRow = NULL; + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); + + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} + + //taos_free_result(pRes); + //taos_close(pConn); +//} - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); +//TEST(testCase, show_vgroup_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - taos_free_result(pRes); + //pRes = taos_query(pConn, "show vgroups"); + //if (taos_errno(pRes) != 0) { + //printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} - for (int32_t i = 0; i < 20; ++i) { - char sql[512] = {0}; - snprintf(sql, tListLen(sql), - "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, - (i + 1) * 30, (i + 2) * 40); - TAOS_RES* pres = taos_query(pConn, sql); - if (taos_errno(pres) != 0) { - printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); - } - taos_free_result(pres); - } + //TAOS_ROW pRow = NULL; - taos_close(pConn); -} + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); -TEST(testCase, show_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} - TAOS_RES* pRes = taos_query(pConn, "show tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - } + //taos_free_result(pRes); + //taos_close(pConn); +//} - taos_free_result(pRes); +//TEST(testCase, create_multiple_tables) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //ASSERT_NE(pConn, nullptr); + + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //taos_close(pConn); + //return; + //} + + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} + + //taos_free_result(pRes); + //pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} + + //TAOS_ROW pRow = NULL; + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); + + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} + + //taos_free_result(pRes); + + //for (int32_t i = 0; i < 20; ++i) { + //char sql[512] = {0}; + //snprintf(sql, tListLen(sql), + //"create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, + //(i + 1) * 30, (i + 2) * 40); + //TAOS_RES* pres = taos_query(pConn, sql); + //if (taos_errno(pres) != 0) { + //printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + //} + //taos_free_result(pres); + //} + + //taos_close(pConn); +//} - pRes = taos_query(pConn, "show abc1.tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - } +//TEST(testCase, show_table_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + //TAOS_RES* pRes = taos_query(pConn, "show tables"); + //if (taos_errno(pRes) != 0) { + //printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //} - int32_t count = 0; - char str[512] = {0}; + //taos_free_result(pRes); - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%d: %s\n", ++count, str); - } + //pRes = taos_query(pConn, "show abc1.tables"); + //if (taos_errno(pRes) != 0) { + //printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //} - taos_free_result(pRes); - taos_close(pConn); -} + //TAOS_ROW pRow = NULL; + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); -TEST(testCase, drop_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); + //int32_t count = 0; + //char str[512] = {0}; - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in creating db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%d: %s\n", ++count, str); + //} - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in using db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //taos_free_result(pRes); + //taos_close(pConn); +//} - pRes = taos_query(pConn, "drop stable st1"); - if (taos_errno(pRes) != 0) { - printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); - } +//TEST(testCase, drop_stable_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != nullptr); + + //TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); + //if (taos_errno(pRes) != 0) { + //printf("error in creating db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("error in using db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "drop stable st1"); + //if (taos_errno(pRes) != 0) { + //printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); + //} + + //taos_free_result(pRes); + //taos_close(pConn); +//} - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, generated_request_id_test) { + //SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); -TEST(testCase, generated_request_id_test) { - SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - for (int32_t i = 0; i < 50000; ++i) { - uint64_t v = generateRequestId(); - void* result = taosHashGet(phash, &v, sizeof(v)); - if (result != nullptr) { - printf("0x%lx, index:%d\n", v, i); - } - assert(result == nullptr); - taosHashPut(phash, &v, sizeof(v), NULL, 0); - } + //for (int32_t i = 0; i < 50000; ++i) { + //uint64_t v = generateRequestId(); + //void* result = taosHashGet(phash, &v, sizeof(v)); + //if (result != nullptr) { + //printf("0x%lx, index:%d\n", v, i); + //} + //assert(result == nullptr); + //taosHashPut(phash, &v, sizeof(v), NULL, 0); + //} - taosHashCleanup(phash); -} + //taosHashCleanup(phash); +//} TEST(testCase, create_topic_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -552,50 +552,54 @@ TEST(testCase, create_topic_Test) { taos_close(pConn); } -TEST(testCase, insert_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); +//TEST(testCase, insert_test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //taos_free_result(pRes); - pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } + //pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} - taos_free_result(pRes); - taos_close(pConn); -} + //taos_free_result(pRes); + //taos_close(pConn); +//} -#if 0 -TEST(testCase, tmq_subscribe_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); +//TEST(testCase, tmq_subscribe_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); + + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("error in use db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg1"); - tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + //tmq_conf_t* conf = tmq_conf_new(); + //tmq_conf_set(conf, "group.id", "tg1"); + //tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_topic_1"); - tmq_subscribe(tmq, topic_list); - - while (1) { - tmq_message_t* msg = tmq_consume_poll(tmq, 0); - printf("get msg\n"); - if (msg == NULL) break; - } -} + //tmq_list_t* topic_list = tmq_list_new(); + //tmq_list_append(topic_list, "test_topic_1"); + //tmq_subscribe(tmq, topic_list); + + //while (1) { + //tmq_message_t* msg = tmq_consume_poll(tmq, 0); + //printf("get msg\n"); + //if (msg == NULL) break; + //} +//} TEST(testCase, tmq_consume_Test) { } TEST(testCase, tmq_commit_TEST) { } -#endif //TEST(testCase, insert_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -615,56 +619,56 @@ TEST(testCase, tmq_commit_TEST) { // taos_close(pConn); //} -TEST(testCase, projection_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tu using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - for(int32_t i = 0; i < 100000; ++i) { - char sql[512] = {0}; - sprintf(sql, "insert into tu values(now+%da, %d)", i, i); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } - - taos_free_result(p); - } - - pRes = taos_query(pConn, "select * from tu"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, projection_query_tables) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //ASSERT_NE(pConn, nullptr); + + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //pRes = taos_query(pConn, "create table tu using st1 tags(1)"); + //if (taos_errno(pRes) != 0) { + //printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); + + //for(int32_t i = 0; i < 100000; ++i) { + //char sql[512] = {0}; + //sprintf(sql, "insert into tu values(now+%da, %d)", i, i); + //TAOS_RES* p = taos_query(pConn, sql); + //if (taos_errno(p) != 0) { + //printf("failed to insert data, reason:%s\n", taos_errstr(p)); + //} + + //taos_free_result(p); + //} + + //pRes = taos_query(pConn, "select * from tu"); + //if (taos_errno(pRes) != 0) { + //printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); + //taos_free_result(pRes); + //ASSERT_TRUE(false); + //} + + //TAOS_ROW pRow = NULL; + //TAOS_FIELD* pFields = taos_fetch_fields(pRes); + //int32_t numOfFields = taos_num_fields(pRes); + + //char str[512] = {0}; + //while ((pRow = taos_fetch_row(pRes)) != NULL) { + //int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + //printf("%s\n", str); + //} + + //taos_free_result(pRes); + //taos_close(pConn); +//} //TEST(testCase, projection_query_stables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 1da13bc3d93b4bc7e083f32477b5cdbf9e70624c..275b09fc0fad0efa45f53cb38c70046a3b7b0ab2 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 9d1dd084ee27cc44febf1c19a26bc20cb4a35c22..42b2de01cc4ff38d53b310741c1ccf9677f540ae 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7591caebc5014600ca2459de11a3680abaed68cf..e642b578fa6e0a1359a7a2ab1355b5602c192123 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -30,24 +30,23 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); -static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); -static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); -static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_CONSUMER, - .keyType = SDB_KEY_BINARY, + .keyType = SDB_KEY_INT64, .encodeFp = (SdbEncodeFp)mndConsumerActionEncode, .decodeFp = (SdbDecodeFp)mndConsumerActionDecode, .insertFp = (SdbInsertFp)mndConsumerActionInsert, .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; - return sdbSetTable(pMnode->pSdb, table); } @@ -61,10 +60,10 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) goto CM_ENCODE_OVER; - void* buf = malloc(tlen); + void *buf = malloc(tlen); if (buf == NULL) goto CM_ENCODE_OVER; - void* abuf = buf; + void *abuf = buf; tEncodeSMqConsumerObj(&abuf, pConsumer); int32_t dataPos = 0; @@ -106,7 +105,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); - void* buf = malloc(len); + void *buf = malloc(len); if (buf == NULL) goto CM_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); @@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, return 0; } -SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) { SSdb *pSdb = pMnode->pSdb; SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); if (pConsumer == NULL) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 78e9a7c17c24ac8610944c6a9db41f54a4e3878b..df6a4a82f33ff1cabe6ed63916b45e935bc25b05 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -30,6 +30,8 @@ #define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 +static char *mndMakeSubscribeKey(char *cgroup, char *topicName); + static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); @@ -41,9 +43,10 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic); + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); + mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq); return sdbSetTable(pMnode->pSdb, table); } +static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont; + SMqCMGetSubEpRsp rsp; + int64_t consumerId = be64toh(pReq->consumerId); + + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); + if (pConsumer == NULL) { + /*terrno = */ + return -1; + } + ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + + strcpy(rsp.cgroup, pReq->cgroup); + rsp.consumerId = consumerId; + SArray *pTopics = pConsumer->topics; + int32_t sz = taosArrayGetSize(pTopics); + rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp topicEp; + SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i); + strcpy(topicEp.topic, pConsumerTopic->name); + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name); + int32_t assignedSz = taosArrayGetSize(pSub->assigned); + topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); + for (int32_t j = 0; j < assignedSz; j++) { + SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); + if (pCEp->consumerId == consumerId) { + taosArrayPush(pSub->assigned, pCEp); + } + } + if (taosArrayGetSize(topicEp.vgs) != 0) { + taosArrayPush(rsp.topics, &topicEp); + } + } + int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); + void *buf = malloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = buf; + tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); + //TODO: free rsp + pMsg->pCont = buf; + pMsg->contLen = tlen; + return 0; +} + static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { int i = 0; while (key[i] != ':') { @@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // build msg - SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen); + SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->qmsgLen = pCEp->qmsgLen; - memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); + /*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/ + pReq->qmsg = strdup(pCEp->qmsg); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -146,11 +201,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { - //convert phyplan to dag + // convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); - SArray *pArray; - SArray* inner = taosArrayGet(pDag->pSubplans, 0); - SSubplan *plan = taosArrayGetP(inner, 0); + SArray *pArray; + SArray *inner = taosArrayGet(pDag->pSubplans, 0); + SSubplan *plan = taosArrayGetP(inner, 0); plan->execNode.inUse = 0; strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); plan->execNode.epAddr[0].port = 6030; @@ -161,21 +216,24 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } int32_t sz = taosArrayGetSize(pArray); - //convert dag to msg + // convert dag to msg for (int32_t i = 0; i < sz; i++) { SMqConsumerEp CEp; CEp.status = 0; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - STaskInfo* pTaskInfo = taosArrayGet(pArray, i); + STaskInfo *pTaskInfo = taosArrayGet(pArray, i); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); - /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ + /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], + * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; - CEp.qmsgLen = pTaskInfo->msg->contentLen; - CEp.qmsg = malloc(CEp.qmsgLen); - if (CEp.qmsg == NULL) { - return -1; - } - memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen); + CEp.qmsg = strdup(pTaskInfo->msg->msg); + CEp.qmsgLen = strlen(CEp.qmsg) + 1; + printf("abc:\n%s\n", CEp.qmsg); + /*CEp.qmsg = malloc(CEp.qmsgLen);*/ + /*if (CEp.qmsg == NULL) {*/ + /*return -1;*/ + /*}*/ + /*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/ taosArrayPush(unassignedVg, &CEp); } @@ -184,7 +242,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) { + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) { int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); for (int32_t i = 0; i < sz; i++) { int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); @@ -199,6 +257,8 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume req.sql = pTopic->sql; req.logicalPlan = pTopic->logicalPlan; req.physicalPlan = pTopic->physicalPlan; + req.qmsg = strdup(pCEp->qmsg); + req.qmsgLen = strlen(req.qmsg); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { @@ -206,18 +266,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume return -1; } - SMsgHead* pMsgHead = (SMsgHead*)buf; + SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); pMsgHead->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; - action.contLen = tlen; + action.contLen = sizeof(SMsgHead) + tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndReleaseVgroup(pMnode, pVgObj); @@ -285,7 +345,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); - void *buf = malloc(tlen + 1); + void *buf = malloc(tlen + 1); if (buf == NULL) goto SUB_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); @@ -493,11 +553,11 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - char* key = mndMakeSubscribeKey(consumerGroup, newTopicName); + char *key = mndMakeSubscribeKey(consumerGroup, newTopicName); strcpy(pSub->key, key); // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); - //TODO: disable alter + // TODO: disable alter } taosArrayPush(pSub->availConsumer, &consumerId); @@ -506,12 +566,15 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1); - int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); - // send setmsg to vnode - if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) { - // TODO - return -1; + int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); + SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned); + if (pCEp->vgId == vgId) { + if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) { + // TODO + return -1; + } } + // send setmsg to vnode } SSdbRaw *pRaw = mndSubActionEncode(pSub); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cfe2d6fd69ae9c05861c3b804dd820e6cc3cb384..c164487aa2f470fffce462e8a29d24a4f4baddf4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -811,7 +811,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); terrno = TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index b60943d284ab2b2d58f4cfd9f11547d598103fa2..9d69b24fce78f55805803f502b66199e74ee374f 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas if (code != TSDB_CODE_SUCCESS) { goto _error; } - - code = dsCreateDataSinker(pSubplan->pDataSink, handle); + if (handle) { + code = dsCreateDataSinker(pSubplan->pDataSink, handle); + } _error: // if failed to add ref for all tables in this query, abort current query @@ -461,4 +462,4 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return error; } -#endif \ No newline at end of file +#endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 223fa300dfb43225cf4432eed342881d00146ed8..934b222f641fe4e0b2fdffaf0531ae9f02c3c849 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1482,13 +1482,14 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { } int32_t msgSize = sizeof(SSubQueryMsg) + msgLen; - msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; + SSubQueryMsg* pMsg = calloc(1, msgSize); + /*SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;*/ + memcpy(pMsg->msg, msg, msgLen); pMsg->header.vgId = tInfo.addr.nodeId; @@ -1497,7 +1498,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { pMsg->taskId = schGenUUID(); pMsg->taskType = TASK_TYPE_PERSISTENT; pMsg->contentLen = msgLen; - memcpy(pMsg->msg, msg, msgLen); + /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/ tInfo.msg = pMsg; diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 4b6311022d997d87ee09c3b379e7f8e9c28c7dad..383a00232a451f3288fbb0bf629191ce4488f80a 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -13,12 +13,12 @@ IF (HEADER_GTEST_INCLUDE_DIR AND (LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR)) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(utilTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread gcov) + TARGET_LINK_LIBRARIES(utilTest util common os gtest pthread) LIST(REMOVE_ITEM SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/cacheTest.cpp) LIST(APPEND SOURCE_LIST ${CMAKE_CURRENT_SOURCE_DIR}/hashTest.cpp) ADD_EXECUTABLE(hashTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread gcov) + TARGET_LINK_LIBRARIES(hashTest util common os gtest pthread) LIST(APPEND BIN_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trefTest.c) ADD_EXECUTABLE(trefTest ${BIN_SRC})