提交 8537e4e8 编写于 作者: wmmhello's avatar wmmhello

feat:add tag filter for stable subscribe

上级 8c1c2a2f
......@@ -1949,6 +1949,7 @@ typedef struct {
char* ast;
char subStbName[TSDB_TABLE_FNAME_LEN];
};
char* subStbFilterAst;
} SCMCreateTopicReq;
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
......@@ -2758,6 +2759,7 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen += taosEncodeString(buf, pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
tlen += taosEncodeFixedI64(buf, pReq->suid);
tlen += taosEncodeString(buf, pReq->qmsg);
}
return tlen;
}
......@@ -2773,6 +2775,7 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
buf = taosDecodeString(buf, &pReq->qmsg);
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
buf = taosDecodeString(buf, &pReq->qmsg);
buf = taosDecodeFixedI64(buf, &pReq->suid);
}
return (void*)buf;
......
......@@ -82,6 +82,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id);
int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList);
/**
* set the task Id, usually used by message queue process
* @param tinfo
......
......@@ -3841,6 +3841,10 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if (tEncodeI32(&encoder, strlen(pReq->sql)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tEncodeI32(&encoder, strlen(pReq->subStbFilterAst)) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subStbFilterAst) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -3879,6 +3883,15 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
}
if (TOPIC_SUB_TYPE__TABLE == pReq->subType) {
if (tDecodeI32(&decoder, &astLen) < 0) return -1;
if (astLen > 0) {
pReq->subStbFilterAst = taosMemoryCalloc(1, astLen + 1);
if (pReq->subStbFilterAst == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->subStbFilterAst) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
......@@ -3889,6 +3902,8 @@ void tFreeSCMCreateTopicReq(SCMCreateTopicReq *pReq) {
taosMemoryFreeClear(pReq->sql);
if (TOPIC_SUB_TYPE__COLUMN == pReq->subType) {
taosMemoryFreeClear(pReq->ast);
}else if(TOPIC_SUB_TYPE__TABLE == pReq->subType) {
taosMemoryFreeClear(pReq->subStbFilterAst);
}
}
......
......@@ -591,6 +591,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
} else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE){
pVgEp->qmsg = taosStrdup(pTopic->ast);
} else {
pVgEp->qmsg = taosStrdup("");
}
......
......@@ -462,6 +462,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj.stbUid = pStb->uid;
mndReleaseStb(pMnode, pStb);
topicObj.ast = taosStrdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1;
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/
......
......@@ -72,6 +72,8 @@ typedef struct {
typedef struct {
int64_t suid;
char* qmsg; // SubPlanToString
SNode* node;
} STqExecTb;
typedef struct {
......
......@@ -70,6 +70,8 @@ static void destroyTqHandle(void* data) {
} else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pTqReader);
taosMemoryFreeClear(pData->execHandle.execTb.qmsg);
nodesDestroyNode(pData->execHandle.execTb.node);
}
if(pData->msg != NULL) {
rpcFreeCont(pData->msg->pCont);
......@@ -470,7 +472,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle tqHandle = {0};
pHandle = &tqHandle;
uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pHandle->consumerId = req.newConsumerId;
pHandle->epoch = -1;
......@@ -514,14 +515,22 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid;
pHandle->execHandle.execTb.qmsg = req.qmsg;
req.qmsg = NULL;
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return -1;
}
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
SArray* tbUidList = NULL;
ret = qGetTableList(req.suid, pVnode->pMeta, pVnode, pHandle->execHandle.execTb.node, NULL, &tbUidList);
if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, req.subKey, pHandle->consumerId);
taosArrayDestroy(tbUidList);
goto end;
}
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, req.suid);
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList);
......@@ -532,8 +541,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
pHandle->consumerId, oldConsumerId);
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey,
pHandle->consumerId);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
} else {
......
......@@ -37,6 +37,7 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->execHandle.execTb.qmsg) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
......@@ -64,6 +65,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
}
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execTb.qmsg) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
......@@ -336,13 +338,19 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return -1;
}
SArray* tbUidList = NULL;
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, handle.execHandle.execTb.node, NULL, &tbUidList);
if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
taosArrayDestroy(tbUidList);
goto end;
}
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList);
......
......@@ -1040,34 +1040,15 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) {
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tbUidList, i);
int32_t code = metaGetTableEntryByUidCache(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
tqError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
continue;
}
tDecoderClear(&mr.coder);
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) {
tqDebug("table uid %" PRId64 " does not add to tq handle", *id);
continue;
}
tqDebug("table uid %" PRId64 " add to tq handle", *id);
taosArrayPush(qa, id);
}
metaReaderClear(&mr);
if (taosArrayGetSize(qa) > 0) {
tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa);
}
taosArrayDestroy(qa);
SArray* list = NULL;
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode->pMeta, pTq->pVnode, pTqHandle->execHandle.execTb.node, NULL, &list);
if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list);
return ret;
}
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list);
taosArrayDestroy(list);
} else {
tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList);
}
......
......@@ -935,6 +935,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
SArray* pBlockList = NULL;
SSDataBlock* pResBlock = NULL;
SScalarParam output = {0};
SArray* pUidTagList = NULL;
tagFilterAssist ctx = {0};
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
......@@ -954,7 +955,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
// int64_t stt = taosGetTimestampUs();
SArray* pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
pUidTagList = taosArrayInit(10, sizeof(STUidTagInfo));
copyExistedUids(pUidTagList, pUidList);
FilterCondType condType = checkTagCond(pTagCond);
......@@ -1121,6 +1122,19 @@ _end:
return code;
}
int32_t qGetTableList(int64_t suid, void* metaHandle, void* pVnode, void* pTagCond, void* pTagIndexCond, SArray **tableList){
SScanPhysiNode node = {0};
node.suid = suid;
node.uid = suid;
node.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate();
int code = getTableList(metaHandle, pVnode, &node, pTagCond, pTagIndexCond, pTableListInfo, "qGetTableList");
*tableList = pTableListInfo->pTableList;
pTableListInfo->pTableList = NULL;
tableListDestroy(pTableListInfo);
return code;
}
size_t getTableTagsBufLen(const SNodeList* pGroups) {
size_t keyLen = 0;
......
......@@ -5764,6 +5764,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
tNameGetFullDbName(&name, pReq->subDbName);
tNameExtractFullName(&name, pReq->subStbName);
code = nodesNodeToString(pStmt->pQuery, false, &pReq->subStbFilterAst, NULL);
} else if ('\0' != pStmt->subDbName[0]) {
pReq->subType = TOPIC_SUB_TYPE__DB;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->subDbName, strlen(pStmt->subDbName));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册