提交 fd44bdbf 编写于 作者: L Liu Jicong

implement get vg info

上级 39ad8042
......@@ -204,12 +204,19 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
typedef struct tmq_resp_err_t tmq_resp_err_t;
typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t;
typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
DLL_EXPORT tmq_list_t* tmq_list_new();
DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*);
DLL_EXPORT tmq_conf_t* tmq_conf_new();
DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb);
DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen);
......@@ -217,13 +224,7 @@ DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr,
DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list);
DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time);
DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg);
DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg);
DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg);
DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter);
DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter);
DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time);
#ifdef __cplusplus
}
......
......@@ -254,6 +254,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7)
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
......
......@@ -280,23 +280,21 @@ typedef struct SMqClientTopic {
SArray* vgs; //SArray<SMqClientVg>
} SMqClientTopic;
typedef struct tmq_resp_err_t {
struct tmq_resp_err_t {
int32_t code;
} tmq_resp_err_t;
};
typedef struct tmq_topic_vgroup_t {
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
int64_t commitOffset;
} tmq_topic_vgroup_t;
};
typedef struct tmq_topic_vgroup_list_t {
struct tmq_topic_vgroup_list_t {
int32_t cnt;
int32_t size;
tmq_topic_vgroup_t* elems;
} tmq_topic_vgroup_list_t;
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
};
struct tmq_conf_t {
char clientId[256];
......@@ -677,8 +675,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("exit wait %d\n", pParam->wait);
printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait);
if (pParam->wait) {
tsem_post(&tmq->rspSem);
}
......@@ -769,7 +766,7 @@ END:
return 0;
}
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t status = atomic_load_64(&tmq->status);
tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics));
......
......@@ -562,9 +562,9 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, create_topic_Test) {
TEST(testCase, create_topic_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -583,13 +583,37 @@ TEST(testCase, create_topic_Test) {
taos_free_result(pRes);
char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
pRes = taos_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, create_topic_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
//taos_free_result(pRes);
TEST(testCase, tmq_subscribe_Test) {
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_free_result(pRes);
char* sql = "select * from st1";
pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes);
taos_close(pConn);
}
#if 0
TEST(testCase, tmq_subscribe_ctb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
......@@ -604,7 +628,32 @@ TEST(testCase, tmq_subscribe_Test) {
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1");
tmq_list_append(topic_list, "test_ctb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
......@@ -613,6 +662,7 @@ TEST(testCase, tmq_subscribe_Test) {
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_consume_Test) {
}
......
......@@ -96,7 +96,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
bool found = 0;
bool changed = 0;
for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t*)taosArrayGet(pSub->availConsumer, j) == consumerId) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) {
found = 1;
break;
}
......@@ -105,16 +105,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
taosArrayPush(pSub->availConsumer, &consumerId);
}
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
if (pCEp->consumerId == consumerId) {
pCEp->lastConsumerHbTs = currentTs;
SMqSubVgEp vgEp = {
.epSet = pCEp->epSet,
.vgId = pCEp->vgId
};
SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId};
taosArrayPush(topicEp.vgs, &vgEp);
changed = 1;
}
......@@ -123,7 +120,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
taosArrayPush(rsp.topics, &topicEp);
}
if (changed || found) {
SSdbRaw* pRaw = mndSubActionEncode(pSub);
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
}
......@@ -137,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
}
void *abuf = buf;
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
//TODO: free rsp
// TODO: free rsp
pMsg->pCont = buf;
pMsg->contLen = tlen;
return 0;
......@@ -164,9 +161,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
int32_t sz;
while (pIter != NULL) {
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId;
if(pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId;
if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) {
// put consumer into lostConsumer
taosArrayPush(pSub->lostConsumer, pCEp);
// put vg into unassgined
......@@ -176,7 +173,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayRemove(pSub->assigned, i);
// remove from available consumer
for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t*)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j);
break;
}
......@@ -209,7 +206,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
......@@ -269,33 +266,50 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
// convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
SArray *pArray = NULL;
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
plan->execNode.nodeId = 2;
SEpSet* pEpSet = &plan->execNode.epset;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) continue;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
}
if (pArray && taosArrayGetSize(pArray) != 1) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
return -1;
}
pEpSet->inUse = 0;
addEpIntoEpSet(pEpSet, "localhost", 6030);
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
return -1;
}
int32_t sz = taosArrayGetSize(pArray);
// convert dag to msg
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp;
CEp.status = 0;
CEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
CEp.epSet = pTaskInfo->addr.epset;
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId;
ASSERT(CEp.vgId == pVgroup->vgId);
CEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &CEp);
//TODO: free taskInfo
taosArrayDestroy(pArray);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
}
/*qDestroyQueryDag(pDag);*/
......@@ -608,7 +622,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName);
bool create = false;
bool create = false;
if (pSub == NULL) {
mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName);
pSub = tNewSubscribeObj();
......@@ -624,7 +638,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
strcpy(pSub->key, key);
free(key);
// set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) {
//TODO: free memory
return -1;
}
// TODO: disable alter
create = true;
}
......
......@@ -866,18 +866,16 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
if (pHandle->pBlock == NULL) return false;
pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);
if (pHandle->tbUid == pHandle->pBlock->uid) {
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
ASSERT(pHandle->tbIdHash);
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);
pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);
pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);
pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);
pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);
return true;
} else if (pHandle->tbIdHash != NULL) {
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t));
if (ret != NULL) {
return true;
}
}
}
return false;
......
......@@ -5477,7 +5477,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
// set the extract column id to streamHandle
tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList);
tqReadHandleSetTbUid(streamReadHandle, pTableIdList);
tqReadHandleSetTbUidList(streamReadHandle, pTableIdList);
pInfo->readerHandle = streamReadHandle;
......@@ -7776,7 +7776,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0};
int32_t code = doCreateTableGroup(((STqReadHandle*)readerHandle)->pMeta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
int32_t code = doCreateTableGroup(((STqReadHandle*)readerHandle)->pVnodeMeta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId);
SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0);
ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1);
......
......@@ -254,6 +254,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
// mnode-topic
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册