提交 0e8aaf4c 编写于 作者: L Liu Jicong

feat(tmq): push optimization

上级 093b3e80
...@@ -29,13 +29,13 @@ typedef void* DataSinkHandle; ...@@ -29,13 +29,13 @@ typedef void* DataSinkHandle;
struct SRpcMsg; struct SRpcMsg;
struct SSubplan; struct SSubplan;
typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*);
typedef struct { typedef struct {
void *handle; void* handle;
bool localExec; bool localExec;
localFetchFp fp; localFetchFp fp;
SArray *explainRes; SArray* explainRes;
} SLocalFetch; } SLocalFetch;
typedef struct { typedef struct {
...@@ -51,9 +51,9 @@ typedef struct { ...@@ -51,9 +51,9 @@ typedef struct {
bool initTqReader; bool initTqReader;
int32_t numOfVgroups; int32_t numOfVgroups;
void* sContext; // SSnapContext* void* sContext; // SSnapContext*
void* pStateBackend; void* pStateBackend;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
...@@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table ...@@ -136,7 +136,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table
* @param handle * @param handle
* @return * @return
*/ */
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch *pLocal); int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SLocalFetch* pLocal);
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
/** /**
...@@ -195,6 +195,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); ...@@ -195,6 +195,8 @@ int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq);
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
......
...@@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); ...@@ -217,7 +217,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver); int32_t tqSeekVer(STqReader *pReader, int64_t ver);
int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret); int32_t tqNextBlock(STqReader *pReader, SFetchRet *ret);
int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
......
...@@ -113,10 +113,20 @@ typedef struct { ...@@ -113,10 +113,20 @@ typedef struct {
} STqHandle; } STqHandle;
typedef struct {
SMqDataRsp dataRsp;
SMqRspHead rspHead;
STqHandle* pHandle;
SRpcHandleInfo pInfo;
} STqPushEntry;
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
SHashObj* pPushMgr; // consumerId -> STqHandle*
SRWLatch pushLock;
SHashObj* pPushMgr; // consumerId -> STqPushEntry
SHashObj* pHandle; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
...@@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -146,7 +156,9 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
// tqMeta // tqMeta
int32_t tqMetaOpen(STQ* pTq); int32_t tqMetaOpen(STQ* pTq);
......
...@@ -78,7 +78,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -78,7 +78,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle); taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); taosInitRWLatch(&pTq->pushLock);
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
...@@ -153,6 +155,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, ...@@ -153,6 +155,65 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
return 0; return 0;
} }
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
ASSERT(!pRsp->withSchema);
ASSERT(taosArrayGetSize(pRsp->blockSchema) == 0);
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
}
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
if (code < 0) {
return -1;
}
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
}
memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
tEncodeSMqDataRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rsp = {
.info = pPushEntry->pInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&rsp);
char buf1[80] = {0};
char buf2[80] = {0};
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0;
}
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); ASSERT(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
...@@ -477,11 +538,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -477,11 +538,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
// lock
taosWLockLatch(&pTq->pushLock);
tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
#if 1 #if 1
if (dataRsp.blockNum == 0) {
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
if (pPushEntry != NULL) {
pPushEntry->pHandle = pHandle;
pPushEntry->pInfo = pMsg->info;
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
pPushEntry->rspHead.consumerId = consumerId;
pPushEntry->rspHead.epoch = reqEpoch;
pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode));
// unlock
taosWUnLockLatch(&pTq->pushLock);
return 0;
}
}
taosWUnLockLatch(&pTq->pushLock);
#endif #endif
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
...@@ -615,9 +696,14 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe ...@@ -615,9 +696,14 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0); if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
}
tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (code != 0) {
tqError("cannot process tq delete req %s, since no such offset", pReq->subKey);
}
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
ASSERT(0); ASSERT(0);
...@@ -756,7 +842,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe ...@@ -756,7 +842,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
atomic_add_fetch_32(&pHandle->epoch, 1); atomic_add_fetch_32(&pHandle->epoch, 1);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
// TODO // TODO
ASSERT(0);
} }
// close handle
} }
return 0; return 0;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "tq.h" #include "tq.h"
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) { int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) return -1; if (buf == NULL) return -1;
...@@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -243,7 +243,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen); int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) { if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
...@@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp ...@@ -278,7 +278,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp
} }
if (pHandle->fetchMeta) { if (pHandle->fetchMeta) {
SSubmitBlk* pBlk = pReader->pBlock; SSubmitBlk* pBlk = pReader->pBlock;
int32_t schemaLen = htonl(pBlk->schemaLen); int32_t schemaLen = htonl(pBlk->schemaLen);
if (schemaLen > 0) { if (schemaLen > 0) {
if (pRsp->createTableNum == 0) { if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
......
...@@ -213,6 +213,80 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -213,6 +213,80 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType == TDMT_VND_SUBMIT) {
// lock push mgr to avoid potential msg lost
taosWLockLatch(&pTq->pushLock);
if (taosHashGetSize(pTq->pPushMgr) != 0) {
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
void* data = taosMemoryMalloc(msgLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to copy data for stream since out of memory");
return -1;
}
memcpy(data, msg, msgLen);
SSubmitReq* pReq = (SSubmitReq*)data;
pReq->version = ver;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
if (pIter == NULL) break;
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
STqExecHandle* pExec = &pPushEntry->pHandle->execHandle;
qTaskInfo_t task = pExec->task;
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
// prepare scan mem data
qStreamScanMemData(task, pReq);
// exec
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, NULL, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
}
if (pRsp->blockNum > 0) {
// set offset
tqOffsetResetToLog(&pRsp->rspOffset, ver);
// remove from hash
size_t kLen;
void* key = taosHashGetKey(pPushEntry, &kLen);
void* keyCopy = taosMemoryMalloc(kLen);
memcpy(keyCopy, key, kLen);
taosArrayPush(cachedKeys, &keyCopy);
taosArrayPush(cachedKeyLens, &kLen);
if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) {
ASSERT(0);
}
tqPushDataRsp(pTq, pPushEntry);
}
}
// delete entry
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
void* key = taosArrayGetP(cachedKeys, i);
size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i);
taosHashRemove(pTq->pPushMgr, key, kLen);
}
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
taosArrayDestroy(cachedKeyLens);
}
// unlock
taosWUnLockLatch(&pTq->pushLock);
}
if (vnodeIsRoleLeader(pTq->pVnode)) { if (vnodeIsRoleLeader(pTq->pVnode)) {
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
......
...@@ -15,21 +15,20 @@ ...@@ -15,21 +15,20 @@
#include "tq.h" #include "tq.h"
bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE) {
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){
return true; return true;
} }
int16_t msgType = pHead->msgType; int16_t msgType = pHead->msgType;
char* body = pHead->body; char* body = pHead->body;
int32_t bodyLen = pHead->bodyLen; int32_t bodyLen = pHead->bodyLen;
int64_t tbSuid = pHandle->execHandle.execTb.suid; int64_t tbSuid = pHandle->execHandle.execTb.suid;
int64_t realTbSuid = 0; int64_t realTbSuid = 0;
SDecoder coder; SDecoder coder;
void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
int32_t len = bodyLen - sizeof(SMsgHead); int32_t len = bodyLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len); tDecoderInit(&coder, data, len);
if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) { if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
...@@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -43,38 +42,38 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if (tDecodeSVDropStbReq(&coder, &req) < 0) { if (tDecodeSVDropStbReq(&coder, &req) < 0) {
goto end; goto end;
} }
realTbSuid = req.suid; realTbSuid = req.suid;
} else if (msgType == TDMT_VND_CREATE_TABLE) { } else if (msgType == TDMT_VND_CREATE_TABLE) {
SVCreateTbBatchReq req = {0}; SVCreateTbBatchReq req = {0};
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
goto end; goto end;
} }
int32_t needRebuild = 0; int32_t needRebuild = 0;
SVCreateTbReq* pCreateReq = NULL; SVCreateTbReq* pCreateReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
needRebuild++; needRebuild++;
} }
} }
if(needRebuild == 0){ if (needRebuild == 0) {
// do nothing // do nothing
}else if(needRebuild == req.nReqs){ } else if (needRebuild == req.nReqs) {
realTbSuid = tbSuid; realTbSuid = tbSuid;
}else{ } else {
realTbSuid = tbSuid; realTbSuid = tbSuid;
SVCreateTbBatchReq reqNew = {0}; SVCreateTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq)); reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ if (pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid) {
reqNew.nReqs++; reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pCreateReq); taosArrayPush(reqNew.pArray, pCreateReq);
} }
} }
int tlen; int tlen;
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret); tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
...@@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -107,7 +106,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
} }
} }
} else if (msgType == TDMT_VND_ALTER_TABLE) { } else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0}; SVAlterTbReq req = {0};
if (tDecodeSVAlterTbReq(&coder, &req) < 0) { if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
goto end; goto end;
...@@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -129,32 +128,32 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto end; goto end;
} }
int32_t needRebuild = 0; int32_t needRebuild = 0;
SVDropTbReq* pDropReq = NULL; SVDropTbReq* pDropReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){ if (pDropReq->suid == tbSuid) {
needRebuild++; needRebuild++;
} }
} }
if(needRebuild == 0){ if (needRebuild == 0) {
// do nothing // do nothing
}else if(needRebuild == req.nReqs){ } else if (needRebuild == req.nReqs) {
realTbSuid = tbSuid; realTbSuid = tbSuid;
}else{ } else {
realTbSuid = tbSuid; realTbSuid = tbSuid;
SVDropTbBatchReq reqNew = {0}; SVDropTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq)); reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq; pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){ if (pDropReq->suid == tbSuid) {
reqNew.nReqs++; reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pDropReq); taosArrayPush(reqNew.pArray, pDropReq);
} }
} }
int tlen; int tlen;
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret); tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen); void* buf = taosMemoryMalloc(tlen);
...@@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ ...@@ -177,11 +176,11 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
goto end; goto end;
} }
realTbSuid = req.suid; realTbSuid = req.suid;
} else{ } else {
ASSERT(0); ASSERT(0);
} }
end: end:
tDecoderClear(&coder); tDecoderClear(&coder);
return tbSuid == realTbSuid; return tbSuid == realTbSuid;
} }
...@@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -224,7 +223,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = -1; code = -1;
goto END; goto END;
} }
if(isValValidForTable(pHandle, pHead)){ if (isValValidForTable(pHandle, pHead)) {
*fetchOffset = offset; *fetchOffset = offset;
code = 0; code = 0;
goto END; goto END;
...@@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -241,7 +240,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
offset++; offset++;
} }
} }
END: END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex); taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
return code; return code;
} }
...@@ -348,7 +347,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -348,7 +347,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
} }
} }
int32_t tqReaderSetDataMsg(STqReader* pReader, SSubmitReq* pMsg, int64_t ver) { int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg; pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
......
...@@ -808,7 +808,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -808,7 +808,6 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock; SSubmitBlk *pBlock;
SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t nRows; int32_t nRows;
...@@ -921,7 +920,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -921,7 +920,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
} }
if (taosArrayGetSize(newTbUids) > 0) { if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids)); vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
(int32_t)taosArrayGetSize(newTbUids));
} }
tqUpdateTbUidList(pVnode->pTq, newTbUids, true); tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
......
...@@ -145,6 +145,7 @@ typedef struct { ...@@ -145,6 +145,7 @@ typedef struct {
SMqMetaRsp metaRsp; // for tmq fetching meta SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned; int8_t returned;
int64_t snapshotVer; int64_t snapshotVer;
const SSubmitReq* pReq;
SSchemaWrapper* schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
......
...@@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL ...@@ -486,7 +486,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL
if (pLocal) { if (pLocal) {
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
} }
taosArrayClearEx(pResList, freeBlock); taosArrayClearEx(pResList, freeBlock);
int64_t curOwner = 0; int64_t curOwner = 0;
...@@ -773,6 +773,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s ...@@ -773,6 +773,14 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
ASSERT(pTaskInfo->streamInfo.pReq == NULL);
pTaskInfo->streamInfo.pReq = pReq;
return 0;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot; SOperatorInfo* pOperator = pTaskInfo->pRoot;
......
...@@ -1430,6 +1430,41 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1430,6 +1430,41 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("queue scan called"); qDebug("queue scan called");
if (pTaskInfo->streamInfo.pReq != NULL) {
if (pInfo->tqReader->pMsg == NULL) {
pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;
const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;
if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {
qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL;
ASSERT(0);
}
}
blockDataCleanup(pInfo->pRes);
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextDataBlock(pInfo->tqReader)) {
SSDataBlock block = {0};
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue;
}
setBlockIntoRes(pInfo, &block);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
}
pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL;
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) { if (pResult && pResult->info.rows > 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册