提交 4e640b6d 编写于 作者: L Liu Jicong

enh(tmq): update tb list for existing topic

上级 9e71d03d
...@@ -42,7 +42,7 @@ typedef struct SReadHandle { ...@@ -42,7 +42,7 @@ typedef struct SReadHandle {
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
typedef enum { typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2, OPTR_EXEC_MODEL_STREAM = 0x2,
} EOPTR_EXEC_MODEL; } EOPTR_EXEC_MODEL;
...@@ -81,7 +81,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -81,7 +81,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
* @param isAdd * @param isAdd
* @return * @return
*/ */
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd); int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd);
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
...@@ -169,7 +169,7 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); ...@@ -169,7 +169,7 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -119,7 +119,7 @@ enum { ...@@ -119,7 +119,7 @@ enum {
enum { enum {
TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__READY,
/*TMQ_CONSUMER_STATUS__NO_TOPIC,*/ TMQ_CONSUMER_STATUS__NO_TOPIC,
}; };
enum { enum {
...@@ -753,19 +753,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in ...@@ -753,19 +753,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
} }
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (topic_list == NULL) {
return TMQ_RESP_ERR__FAIL;
}
const SArray* container = &topic_list->container; const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container); int32_t sz = taosArrayGetSize(container);
void* buf = NULL; void* buf = NULL;
SCMSubscribeReq req = {0}; SCMSubscribeReq req = {0};
int32_t code = -1; int32_t code = -1;
if (sz == 0) {
return TMQ_RESP_ERR__FAIL;
}
req.consumerId = tmq->consumerId; req.consumerId = tmq->consumerId;
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*)); req.topicNames = taosArrayInit(sz, sizeof(void*));
...@@ -1083,13 +1076,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { ...@@ -1083,13 +1076,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
taosHashCleanup(pHash); taosHashCleanup(pHash);
tmq->clientTopics = newTopics; tmq->clientTopics = newTopics;
ASSERT(taosArrayGetSize(tmq->clientTopics) != 0); if (taosArrayGetSize(tmq->clientTopics) == 0)
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else
/*if (taosArrayGetSize(tmq->clientTopics) == 0)*/ atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
/*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);*/
/*else*/
/*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);*/
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
return set; return set;
......
...@@ -96,8 +96,8 @@ STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy) ...@@ -96,8 +96,8 @@ STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy)
SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta); SArray* metaGetSmaTbUids(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb // tsdb
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
...@@ -117,6 +117,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); ...@@ -117,6 +117,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
...@@ -101,6 +101,21 @@ static void tdSRowDemo() { ...@@ -101,6 +101,21 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema); taosMemoryFree(pTSChema);
} }
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
void* pIter = NULL;
STqExec* pExec = NULL;
while (1) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
ASSERT(code == 0);
}
}
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL; void* pIter = NULL;
......
...@@ -333,6 +333,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -333,6 +333,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SVCreateTbRsp cRsp = {0}; SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN]; char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL; STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS; pRsp->code = TSDB_CODE_SUCCESS;
...@@ -348,7 +349,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -348,7 +349,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
} }
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
if (rsp.pArray == NULL) { tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
if (rsp.pArray == NULL || tbUids == NULL) {
rcode = -1; rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
...@@ -376,6 +378,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -376,6 +378,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
} else { } else {
cRsp.code = TSDB_CODE_SUCCESS; cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
} }
taosArrayPush(rsp.pArray, &cRsp); taosArrayPush(rsp.pArray, &cRsp);
...@@ -383,6 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -383,6 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids);
tdUpdateTbUidList(pVnode->pSma, pStore); tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
...@@ -402,6 +406,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -402,6 +406,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
_exit: _exit:
taosArrayDestroy(rsp.pArray); taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tEncoderClear(&encoder); tEncoderClear(&encoder);
return rcode; return rcode;
......
...@@ -126,7 +126,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { ...@@ -126,7 +126,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo; return pTaskInfo;
} }
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
// traverse to the stream scanner node to add this table id // traverse to the stream scanner node to add this table id
...@@ -141,12 +141,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA ...@@ -141,12 +141,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); metaReaderInit(&mr, pScanInfo->readHandle.meta, 0);
for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); int64_t* id = (int64_t*)taosArrayGet(tableIdList, i);
int32_t code = metaGetTableEntryByUid(&mr, *id); int32_t code = metaGetTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno)); qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue; continue;
} }
...@@ -160,7 +160,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA ...@@ -160,7 +160,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
metaReaderClear(&mr); metaReaderClear(&mr);
qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa)); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册