未验证 提交 82342ebd 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21934 from taosdata/feature/TD-20678

feat:subscribe only meta info
...@@ -131,10 +131,10 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, ...@@ -131,10 +131,10 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
enum { enum {
TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__POLL_DATA_RSP,
TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__TAOSX_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP, TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__END_RSP, TMQ_MSG_TYPE__END_RSP,
}; };
......
...@@ -2909,6 +2909,12 @@ enum { ...@@ -2909,6 +2909,12 @@ enum {
TMQ_OFFSET__SNAPSHOT_META = 3, TMQ_OFFSET__SNAPSHOT_META = 3,
}; };
enum {
WITH_DATA = 0,
WITH_META = 1,
ONLY_META = 2,
};
typedef struct { typedef struct {
int8_t type; int8_t type;
union { union {
......
此差异已折叠。
...@@ -123,8 +123,8 @@ typedef struct SSnapContext { ...@@ -123,8 +123,8 @@ typedef struct SSnapContext {
SHashObj* suidInfo; SHashObj* suidInfo;
SArray* idList; SArray* idList;
int32_t index; int32_t index;
bool withMeta; int8_t withMeta;
bool queryMeta; // true-get meta, false-get data int8_t queryMeta; // true-get meta, false-get data
} SSnapContext; } SSnapContext;
typedef struct { typedef struct {
......
...@@ -362,7 +362,7 @@ typedef struct SCreateTopicStmt { ...@@ -362,7 +362,7 @@ typedef struct SCreateTopicStmt {
char subDbName[TSDB_DB_NAME_LEN]; char subDbName[TSDB_DB_NAME_LEN];
char subSTbName[TSDB_TABLE_NAME_LEN]; char subSTbName[TSDB_TABLE_NAME_LEN];
bool ignoreExists; bool ignoreExists;
bool withMeta; int8_t withMeta;
SNode* pQuery; SNode* pQuery;
SNode* pWhere; SNode* pWhere;
} SCreateTopicStmt; } SCreateTopicStmt;
......
...@@ -919,7 +919,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { ...@@ -919,7 +919,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
...@@ -932,7 +932,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { ...@@ -932,7 +932,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFree(pRsp->metaRsp.metaRsp); taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper; SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset); taosMemoryFreeClear(pRsp->pEpset);
...@@ -1407,7 +1407,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1407,7 +1407,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
strcpy(pRspWrapper->topicName, pParam->topicName); strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL; pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) { if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp); tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
...@@ -1424,7 +1424,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1424,7 +1424,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp); tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead)); memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
...@@ -1884,7 +1884,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1884,7 +1884,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL; return NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
...@@ -1984,7 +1984,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1984,7 +1984,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
} }
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
...@@ -2026,7 +2026,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -2026,7 +2026,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
void* pRsp = NULL; void* pRsp = NULL;
int64_t numOfRows = 0; int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) { if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tscError("consumer:0x%" PRIx64" createTableNum should > 0 if rsp type is data_meta", tmq->consumerId);
} else { } else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
} }
......
...@@ -235,7 +235,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -235,7 +235,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet); SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
......
...@@ -260,7 +260,7 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo) ...@@ -260,7 +260,7 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo)
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
} }
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext** ctxRet) { SSnapContext** ctxRet) {
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext)); SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if (ctx == NULL) return -1; if (ctx == NULL) return -1;
...@@ -476,7 +476,7 @@ int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLe ...@@ -476,7 +476,7 @@ int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLe
if (ctx->index >= taosArrayGetSize(ctx->idList)) { if (ctx->index >= taosArrayGetSize(ctx->idList)) {
metaDebug("tmqsnap get meta end"); metaDebug("tmqsnap get meta end");
ctx->index = 0; ctx->index = 0;
ctx->queryMeta = false; // change to get data ctx->queryMeta = 0; // change to get data
return 0; return 0;
} }
......
...@@ -183,64 +183,64 @@ void tqNotifyClose(STQ* pTq) { ...@@ -183,64 +183,64 @@ void tqNotifyClose(STQ* pTq) {
} }
} }
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, //static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
int64_t consumerId, int32_t type) { // int64_t consumerId, int32_t type) {
int32_t len = 0; // int32_t len = 0;
int32_t code = 0; // int32_t code = 0;
//
if (type == TMQ_MSG_TYPE__POLL_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); // tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); // tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
} // }
//
if (code < 0) { // if (code < 0) {
return -1; // return -1;
} // }
//
int32_t tlen = sizeof(SMqRspHead) + len; // int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen); // void* buf = rpcMallocCont(tlen);
if (buf == NULL) { // if (buf == NULL) {
return -1; // return -1;
} // }
//
((SMqRspHead*)buf)->mqMsgType = type; // ((SMqRspHead*)buf)->mqMsgType = type;
((SMqRspHead*)buf)->epoch = epoch; // ((SMqRspHead*)buf)->epoch = epoch;
((SMqRspHead*)buf)->consumerId = consumerId; // ((SMqRspHead*)buf)->consumerId = consumerId;
//
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); // void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
//
SEncoder encoder = {0}; // SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); // tEncoderInit(&encoder, abuf, len);
//
if (type == TMQ_MSG_TYPE__POLL_RSP) { // if (type == TMQ_MSG_TYPE__POLL_DATA_RSP) {
tEncodeMqDataRsp(&encoder, pRsp); // tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { // } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); // tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
} // }
//
tEncoderClear(&encoder); // tEncoderClear(&encoder);
//
SRpcMsg rsp = { // SRpcMsg rsp = {
.info = *pRpcHandleInfo, // .info = *pRpcHandleInfo,
.pCont = buf, // .pCont = buf,
.contLen = tlen, // .contLen = tlen,
.code = 0, // .code = 0,
}; // };
//
tmsgSendRsp(&rsp); // tmsgSendRsp(&rsp);
return 0; // return 0;
} //}
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId; dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch; dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_DATA_RSP;
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_DATA_RSP, sver,
ever); ever);
char buf1[TSDB_OFFSET_LEN] = {0}; char buf1[TSDB_OFFSET_LEN] = {0};
......
...@@ -216,9 +216,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -216,9 +216,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = 0; code = 0;
goto END; goto END;
} else { } else {
if (pHandle->fetchMeta) { if (pHandle->fetchMeta != WITH_DATA) {
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
*fetchOffset = offset; *fetchOffset = offset;
......
...@@ -215,19 +215,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -215,19 +215,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); goto loop_table;
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
} }
} }
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
...@@ -237,7 +233,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -237,7 +233,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0; uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
continue; goto loop_table;
} }
void* createReq = taosMemoryCalloc(1, len); void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0}; SEncoder encoder = {0};
...@@ -246,7 +242,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -246,7 +242,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) { if (code < 0) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(createReq); taosMemoryFree(createReq);
continue; goto loop_table;
} }
taosArrayPush(pRsp->createTableLen, &len); taosArrayPush(pRsp->createTableLen, &len);
...@@ -255,6 +251,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -255,6 +251,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
goto loop_table;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
...@@ -265,6 +264,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -265,6 +264,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++; pRsp->blockNum++;
} }
continue;
loop_table:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pTqReader; STqReader* pReader = pExec->pTqReader;
...@@ -274,19 +279,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -274,19 +279,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid; int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); goto loop_db;
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
} }
} }
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
...@@ -296,7 +297,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -296,7 +297,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0; uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
continue; goto loop_db;
} }
void* createReq = taosMemoryCalloc(1, len); void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0}; SEncoder encoder = {0};
...@@ -305,7 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -305,7 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) { if (code < 0) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(createReq); taosMemoryFree(createReq);
continue; goto loop_db;
} }
taosArrayPush(pRsp->createTableLen, &len); taosArrayPush(pRsp->createTableLen, &len);
...@@ -314,6 +315,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -314,6 +315,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
goto loop_db;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
...@@ -324,6 +328,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR ...@@ -324,6 +328,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++; pRsp->blockNum++;
} }
continue;
loop_db:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
} }
} }
taosArrayDestroy(pBlocks); taosArrayDestroy(pBlocks);
......
...@@ -123,28 +123,17 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -123,28 +123,17 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef); walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0};
SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest);
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, pHandle->subKey, vgId, dataRsp.rspOffset.version);
pHandle->subKey, vgId, dataRsp.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); tDeleteMqDataRsp(&dataRsp);
tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true;
*pBlockReturned = true; return code;
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
tDeleteSTaosxRsp(&taosxRsp);
*pBlockReturned = true;
return code;
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
...@@ -187,7 +176,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -187,7 +176,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
} }
} }
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : { end : {
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
...@@ -230,7 +219,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -230,7 +219,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) { if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
*offset = taosxRsp.rspOffset; *offset = taosxRsp.rspOffset;
...@@ -260,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -260,7 +249,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -272,7 +261,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -272,7 +261,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} }
...@@ -301,7 +290,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -301,7 +290,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;
} else { } else {
fetchVer++; fetchVer++;
...@@ -396,9 +385,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* ...@@ -396,9 +385,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0; int32_t len = 0;
int32_t code = 0; int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
} }
...@@ -420,9 +409,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* ...@@ -420,9 +409,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len); tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeMqDataRsp(&encoder, pRsp); tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
} }
......
...@@ -2154,7 +2154,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2154,7 +2154,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
qDebug("tmqsnap doRawScan called"); qDebug("tmqsnap doRawScan called");
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
bool hasNext = false; bool hasNext = false;
if (pInfo->dataReader) { if (pInfo->dataReader && pInfo->sContext->withMeta != ONLY_META) {
code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(pInfo->dataReader, &hasNext);
if (code) { if (code) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader); pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->dataReader);
...@@ -2180,7 +2180,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -2180,7 +2180,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext); SMetaTableInfo mtInfo = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext);
STqOffsetVal offset = {0}; STqOffsetVal offset = {0};
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal if (mtInfo.uid == 0 || pInfo->sContext->withMeta == ONLY_META) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal"); qDebug("tmqsnap read snapshot done, change to get data from wal");
tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion); tqOffsetResetToLog(&offset, pInfo->sContext->snapVersion);
} else { } else {
......
...@@ -206,9 +206,9 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons ...@@ -206,9 +206,9 @@ SNode* createDropComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, cons
SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId); SNode* createRestoreComponentNodeStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDnodeId);
SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery); SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pQuery);
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta); int8_t withMeta);
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta, SNode* pWhere); int8_t withMeta, SNode* pWhere);
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName); SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName); SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName);
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue); SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
......
...@@ -538,14 +538,18 @@ sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C). ...@@ -538,14 +538,18 @@ sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C).
sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; } sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
/************************************************ create/drop topic ***************************************************/ /************************************************ create/drop topic ***************************************************/
%type with_meta { int32_t }
%destructor with_meta { }
with_meta(A) ::= AS. { A = 0; }
with_meta(A) ::= WITH META AS. { A = 1; }
with_meta(A) ::= ONLY META AS. { A = 2; }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, false); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(D)
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, D); }
WITH META AS DATABASE db_name(C). { pCxt->pRootNode = createCreateTopicStmtUseDb(pCxt, A, &B, &C, true); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(E)
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, E, D); }
AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, false, D); }
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B)
WITH META AS STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, true, D); }
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); } cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); } cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
......
...@@ -1715,7 +1715,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists, ...@@ -1715,7 +1715,7 @@ SNode* createCreateTopicStmtUseQuery(SAstCreateContext* pCxt, bool ignoreExists,
} }
SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName, SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SToken* pSubDbName,
bool withMeta) { int8_t withMeta) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName) || !checkDbName(pCxt, pSubDbName, true)) { if (!checkTopicName(pCxt, pTopicName) || !checkDbName(pCxt, pSubDbName, true)) {
return NULL; return NULL;
...@@ -1730,7 +1730,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST ...@@ -1730,7 +1730,7 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
} }
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable, SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
bool withMeta, SNode* pWhere) { int8_t withMeta, SNode* pWhere) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
if (!checkTopicName(pCxt, pTopicName)) { if (!checkTopicName(pCxt, pTopicName)) {
return NULL; return NULL;
......
...@@ -140,6 +140,7 @@ static SKeyword keywordTable[] = { ...@@ -140,6 +140,7 @@ static SKeyword keywordTable[] = {
{"MAX_SPEED", TK_MAX_SPEED}, {"MAX_SPEED", TK_MAX_SPEED},
{"MERGE", TK_MERGE}, {"MERGE", TK_MERGE},
{"META", TK_META}, {"META", TK_META},
{"ONLY", TK_ONLY},
{"MINROWS", TK_MINROWS}, {"MINROWS", TK_MINROWS},
{"MINUS", TK_MINUS}, {"MINUS", TK_MINUS},
{"MNODE", TK_MNODE}, {"MNODE", TK_MNODE},
......
此差异已折叠。
...@@ -220,6 +220,17 @@ class TDTestCase: ...@@ -220,6 +220,17 @@ class TDTestCase:
return return
def checkWal1VgroupOnlyMeta(self):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
cmdStr = '%s/build/bin/tmq_taosx_ci -c %s -sv 1 -dv 1 -d -onlymeta'%(buildPath, cfgPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkJson(cfgPath, "tmq_taosx_tmp")
return
def checkWal1VgroupTable(self): def checkWal1VgroupTable(self):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath() cfgPath = tdCom.getClientCfgPath()
...@@ -301,6 +312,8 @@ class TDTestCase: ...@@ -301,6 +312,8 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.checkWal1VgroupOnlyMeta()
self.checkWal1Vgroup() self.checkWal1Vgroup()
self.checkSnapshot1Vgroup() self.checkSnapshot1Vgroup()
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册