提交 45e7597d 编写于 作者: wmmhello's avatar wmmhello

feat:get data from snapshot for taosx

上级 c3cd858a
...@@ -157,6 +157,7 @@ typedef struct SQueryTableDataCond { ...@@ -157,6 +157,7 @@ typedef struct SQueryTableDataCond {
STimeWindow twindows; STimeWindow twindows;
int64_t startVersion; int64_t startVersion;
int64_t endVersion; int64_t endVersion;
int64_t schemaVersion;
} SQueryTableDataCond; } SQueryTableDataCond;
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
......
...@@ -2589,12 +2589,7 @@ enum { ...@@ -2589,12 +2589,7 @@ enum {
typedef struct { typedef struct {
int8_t type; int8_t type;
union { union {
// snapshot meta // snapshot
struct {
int64_t muid;
int64_t mversion;
};
// snapshot data
struct { struct {
int64_t uid; int64_t uid;
int64_t ts; int64_t ts;
......
...@@ -43,6 +43,9 @@ typedef struct SReadHandle { ...@@ -43,6 +43,9 @@ typedef struct SReadHandle {
int32_t numOfVgroups; int32_t numOfVgroups;
void* sContext; // SSnapContext* void* sContext; // SSnapContext*
void* pWalReader;
SHashObj *pFilterOutTbUid;
} SReadHandle; } SReadHandle;
// in queue mode, data streams are seperated by msg // in queue mode, data streams are seperated by msg
...@@ -178,7 +181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -178,7 +181,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo); SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
void* qExtractReaderFromStreamScanner(void* scanner); void* qExtractReaderFromStreamScanner(void* scanner);
......
...@@ -5617,7 +5617,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { ...@@ -5617,7 +5617,7 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_DATA) {
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
return pLeft->muid == pRight->muid && pLeft->mversion == pRight->mversion; return pLeft->uid == pRight->uid;
} else { } else {
ASSERT(0); ASSERT(0);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/ /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
......
...@@ -159,7 +159,6 @@ typedef struct SMetaTableInfo{ ...@@ -159,7 +159,6 @@ typedef struct SMetaTableInfo{
}SMetaTableInfo; }SMetaTableInfo;
typedef struct SSnapContext { typedef struct SSnapContext {
SMeta *pMeta; SMeta *pMeta;
int64_t snapVersion; int64_t snapVersion;
TBC *pCur; TBC *pCur;
...@@ -167,6 +166,7 @@ typedef struct SSnapContext { ...@@ -167,6 +166,7 @@ typedef struct SSnapContext {
int8_t subType; int8_t subType;
SHashObj *idVersion; SHashObj *idVersion;
SHashObj *suidInfo; SHashObj *suidInfo;
bool withMeta;
bool queryMetaOrData; // true-get meta, false-get data bool queryMetaOrData; // true-get meta, false-get data
}SSnapContext; }SSnapContext;
...@@ -204,6 +204,8 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver); ...@@ -204,6 +204,8 @@ int32_t tqReaderSetDataMsg(STqReader *pReader, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlock(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader);
int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead);
SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
...@@ -217,11 +219,11 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -217,11 +219,11 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx); int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet);
int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type); int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx); SMetaTableInfo getUidfromSnapShot(SSnapContext* ctx);
int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver); int32_t setForSnapShot(SSnapContext* ctx, int64_t uid);
int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid); int32_t destroySnapContext(SSnapContext* ctx);
// structs // structs
struct STsdbCfg { struct STsdbCfg {
......
...@@ -88,7 +88,7 @@ typedef struct { ...@@ -88,7 +88,7 @@ typedef struct {
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
int32_t numOfCols; // number of out pout column, temporarily used // int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
} STqExecHandle; } STqExecHandle;
...@@ -101,9 +101,6 @@ typedef struct { ...@@ -101,9 +101,6 @@ typedef struct {
int64_t snapshotVer; int64_t snapshotVer;
SSnapContext* sContext;
// TODO remove
SWalReader* pWalReader;
SWalRef* pRef; SWalRef* pRef;
...@@ -138,11 +135,8 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); ...@@ -138,11 +135,8 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
// tqRead // tqRead
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* offset); int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta
...@@ -176,10 +170,9 @@ static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t u ...@@ -176,10 +170,9 @@ static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t u
pOffsetVal->ts = ts; pOffsetVal->ts = ts;
} }
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid, int64_t version) { static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->muid = uid; pOffsetVal->uid = uid;
pOffsetVal->mversion = version;
} }
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
......
...@@ -209,12 +209,22 @@ static void destroySTableInfoForChildTable(void* data) { ...@@ -209,12 +209,22 @@ static void destroySTableInfoForChildTable(void* data) {
tDeleteSSchemaWrapper(pData->schemaRow); tDeleteSSchemaWrapper(pData->schemaRow);
} }
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext* ctx){ static void clearAndMoveToFirst(SSnapContext* ctx){
tdbTbcClose(ctx->pCur);
tdbTbcOpen(ctx->pMeta->pTbDb, &ctx->pCur, NULL);
tdbTbcMoveToFirst(ctx->pCur);
}
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta, SSnapContext** ctxRet){
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if(ctx == NULL) return -1;
*ctxRet = ctx;
ctx->pMeta = pMeta; ctx->pMeta = pMeta;
ctx->snapVersion = snapVersion; ctx->snapVersion = snapVersion;
ctx->suid = suid; ctx->suid = suid;
ctx->subType = subType; ctx->subType = subType;
ctx->queryMetaOrData = withMeta; ctx->queryMetaOrData = withMeta;
ctx->withMeta = withMeta;
int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL); int32_t ret = tdbTbcOpen(pMeta->pTbDb, &ctx->pCur, NULL);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
...@@ -243,7 +253,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t ...@@ -243,7 +253,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
if(tmp->version > ctx->snapVersion) break; if(tmp->version > ctx->snapVersion) break;
taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t)); taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &tmp->version, sizeof(int64_t));
} }
tdbTbcMoveToFirst(ctx->pCur); clearAndMoveToFirst(ctx);
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -251,7 +261,7 @@ int32_t destroySnapContext(SSnapContext* ctx){ ...@@ -251,7 +261,7 @@ int32_t destroySnapContext(SSnapContext* ctx){
tdbTbcClose(ctx->pCur); tdbTbcClose(ctx->pCur);
taosHashCleanup(ctx->idVersion); taosHashCleanup(ctx->idVersion);
taosHashCleanup(ctx->suidInfo); taosHashCleanup(ctx->suidInfo);
taosMemoryFree(ctx);
return 0; return 0;
} }
...@@ -334,25 +344,15 @@ static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){ ...@@ -334,25 +344,15 @@ static void saveSuperTableInfoForChildTable(SMetaEntry *me, SHashObj *suidInfo){
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
} }
int32_t setMetaForSnapShot(SSnapContext* ctx, int64_t uid, int64_t ver){ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid){
int c = 0; int c = 0;
ctx->queryMetaOrData = true; // change to get data
if(uid == 0 && ver == 0){ if(uid == -1){
tdbTbcMoveToFirst(ctx->pCur);
return c; return c;
} }
STbDbKey key = {.version = ver, .uid = uid};
tdbTbcMoveTo(ctx->pCur, &key, sizeof(key), &c);
return c;
}
int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){
int c = 0;
ctx->queryMetaOrData = false; // change to get data
if(uid == 0){ if(uid == 0){
tdbTbcMoveToFirst(ctx->pCur); clearAndMoveToFirst(ctx);
return c; return c;
} }
...@@ -367,7 +367,7 @@ int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){ ...@@ -367,7 +367,7 @@ int32_t setDataForSnapShot(SSnapContext* ctx, int64_t uid){
return c; return c;
} }
int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type){ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid){
int32_t ret = 0; int32_t ret = 0;
void *pKey = NULL; void *pKey = NULL;
void *pVal = NULL; void *pVal = NULL;
...@@ -377,13 +377,13 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ...@@ -377,13 +377,13 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen); ret = tdbTbcNext(ctx->pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) { if (ret < 0) {
ctx->queryMetaOrData = false; // change to get data ctx->queryMetaOrData = false; // change to get data
tdbTbcMoveToFirst(ctx->pCur); clearAndMoveToFirst(ctx);
return 0; return 0;
} }
STbDbKey *tmp = (STbDbKey*)pKey; STbDbKey *tmp = (STbDbKey*)pKey;
if(tmp->version > ctx->snapVersion) { if(tmp->version > ctx->snapVersion) {
tdbTbcMoveToFirst(ctx->pCur); clearAndMoveToFirst(ctx);
ctx->queryMetaOrData = false; // change to get data ctx->queryMetaOrData = false; // change to get data
return 0; return 0;
} }
...@@ -394,6 +394,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in ...@@ -394,6 +394,7 @@ int32_t getMetafromSnapShot(SSnapContext* ctx, void **pBuf, int32_t *contLen, in
} }
ASSERT(*ver == tmp->version); ASSERT(*ver == tmp->version);
*uid = tmp->uid;
SDecoder dc = {0}; SDecoder dc = {0};
SMetaEntry me = {0}; SMetaEntry me = {0};
tDecoderInit(&dc, pVal, vLen); tDecoderInit(&dc, pVal, vLen);
......
...@@ -294,12 +294,10 @@ static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return ...@@ -294,12 +294,10 @@ static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq* pReq = pMsg->pCont; SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
int64_t timeout = pReq->timeout;
int32_t reqEpoch = pReq->epoch; int32_t reqEpoch = pReq->epoch;
int32_t code = 0; int32_t code = 0;
STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal reqOffset = pReq->reqOffset;
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
SWalCkHead* pCkHead = NULL;
// 1.find handle // 1.find handle
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
...@@ -329,6 +327,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -329,6 +327,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, tqDebug("tmq poll: consumer %ld (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
SMqMetaRsp metaRsp = {0};
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType);
...@@ -347,7 +346,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -347,7 +346,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot){ if (pReq->useSnapshot){
if (pHandle->fetchMeta){ if (pHandle->fetchMeta){
tqOffsetResetToMeta(&fetchOffsetNew, 0, 0); tqOffsetResetToMeta(&fetchOffsetNew, 0);
} else { } else {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} }
...@@ -373,151 +372,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -373,151 +372,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
} }
// 3.query tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
/*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/
/*fetchOffsetNew.version++;*/
/*}*/
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
code = -1;
goto OVER;
}
if (dataRsp.blockNum == 0) {
// TODO add to async task pool
/*dataRsp.rspOffset.version--;*/
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA)) { if(dataRsp.blockNum != 0){
if (tqScan(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
ASSERT(0);
}
if (dataRsp.blockNum == 0) {
// TODO add to async task pool
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
goto OVER; goto OVER;
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META)) {
SSnapContext* sContext = pHandle->sContext;
if(setMetaForSnapShot(sContext, fetchOffsetNew.muid, fetchOffsetNew.mversion) != 0) {
qError("setMetaForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, fetchOffsetNew.muid, fetchOffsetNew.mversion);
code = -1;
goto OVER;
}
void* data = NULL;
int32_t dataLen = 0;
int16_t type = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type) < 0){
qError("getMetafromSnapShot error");
taosMemoryFreeClear(data);
code = -1;
goto OVER;
}
if(!sContext->queryMetaOrData){ // change to get data next poll request if(metaRsp.metaRspLen > 0){
fetchOffsetNew.type = TMQ_OFFSET__SNAPSHOT_DATA;
fetchOffsetNew.uid = 0;
fetchOffsetNew.ts = 0;
}
SMqMetaRsp metaRsp = {0};
metaRsp.rspOffset = fetchOffsetNew;
metaRsp.resMsgType = type;
metaRsp.metaRspLen = dataLen;
metaRsp.metaRsp = data;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
taosMemoryFreeClear(data);
code = -1; code = -1;
goto OVER;
} }
taosMemoryFreeClear(data);
code = 0;
goto OVER; goto OVER;
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && (fetchOffsetNew.type == TMQ_OFFSET__LOG)) { tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, no data", consumerId, pHandle->subKey,
int64_t fetchVer = fetchOffsetNew.version + 1; TD_VID(pTq->pVnode));
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
code = -1;
goto OVER;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %ld (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
// TODO add push mgr
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}
SWalCont* pHead = &pCkHead->head;
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/
}
// TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (dataRsp.blockNum > 0 /* threshold */) {
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
} else {
fetchVer++;
}
} else {
ASSERT(pHandle->fetchMeta);
ASSERT(IS_META_MSG(pHead->msgType));
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
goto OVER;
}
code = 0;
goto OVER;
}
}
}
OVER: OVER:
if (pCkHead) taosMemoryFree(pCkHead);
// TODO wrap in destroy func // TODO wrap in destroy func
taosArrayDestroy(dataRsp.blockDataLen); taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
...@@ -530,6 +404,8 @@ OVER: ...@@ -530,6 +404,8 @@ OVER:
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
} }
taosMemoryFreeClear(metaRsp.metaRsp);
return code; return code;
} }
...@@ -604,7 +480,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -604,7 +480,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL; req.qmsg = NULL;
pHandle->execHandle.task = pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL,
&pHandle->execHandle.pSchemaWrapper); &pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.task); ASSERT(pHandle->execHandle.task);
void* scanner = NULL; void* scanner = NULL;
...@@ -613,25 +489,17 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -613,25 +489,17 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(pHandle->execHandle.pExecReader); ASSERT(pHandle->execHandle.pExecReader);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
pHandle->sContext = handle.sContext; handle.tqReader = pHandle->execHandle.pExecReader;
handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader;
handle.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid;
pHandle->execHandle.task = pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, handle.sContext);
pHandle->sContext = handle.sContext;
pHandle->execHandle.execTb.suid = req.suid;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid);
...@@ -642,6 +510,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -642,6 +510,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
handle.tqReader = pHandle->execHandle.pExecReader;
handle.pWalReader = ((STqReader*)handle.tqReader)->pWalReader;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
} }
taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %ld", req.subKey, pHandle->consumerId); tqDebug("try to persist handle %s consumer %ld", req.subKey, pHandle->consumerId);
......
...@@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -60,7 +60,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0; return 0;
} }
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
...@@ -91,16 +91,20 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -91,16 +91,20 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
if (pRsp->withTbName) { if (pRsp->withTbName) {
int64_t uid = 0;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader->msgIter.uid; uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} else { } else {
pRsp->withTbName = 0; uid = pDataBlock->info.uid;
}
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
} }
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); if(pRsp->withSchema){
tqAddBlockSchemaToRsp(pExec, pRsp);
}
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
pRsp->blockNum++; pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
continue; continue;
...@@ -108,17 +112,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -108,17 +112,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt <= 4096) continue; if (rowCnt <= 4096) continue;
} }
} else {
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
SMetaTableInfo mtInfo = getUidfromSnapShot(pHandle->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
}else{
pOffset->uid = mtInfo.uid;
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue;
}
}
} }
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
...@@ -128,27 +121,19 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa ...@@ -128,27 +121,19 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
continue; continue;
} }
void* meta = qStreamExtractMetaMsg(task); if (pRsp->blockNum > 0){
if (meta != NULL) { qStreamExtractOffset(task, &pRsp->rspOffset);
// tq add meta to rsp break;
} }
if (qStreamExtractOffset(task, &pRsp->rspOffset) < 0) { SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
ASSERT(0); if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
} qStreamPrepareScan(task, &tmp->rspOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
ASSERT(pRsp->rspOffset.type != 0); continue;
#if 0
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
if (pRsp->blockNum > 0) {
ASSERT(pRsp->rspOffset.version > pRsp->reqOffset.version);
} else {
ASSERT(pRsp->rspOffset.version >= pRsp->reqOffset.version);
}
} }
#endif
*pMetaRsp = *tmp;
tqDebug("task exec exited"); tqDebug("task exec exited");
break; break;
} }
...@@ -206,56 +191,22 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S ...@@ -206,56 +191,22 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
} }
#endif #endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { SSDataBlock* tqLogScanExec(int8_t subType, STqReader* pReader, SHashObj* pFilterOutTbUid, SSDataBlock* block) {
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); if (subType == TOPIC_SUB_TYPE__TABLE) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pRsp->withSchema = 1;
STqReader* pReader = pExec->pExecReader;
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0}; if (tqRetrieveDataBlock(block, pReader) < 0) {
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { return block;
int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
blockDataFreeRes(&block);
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++;
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (subType == TOPIC_SUB_TYPE__DB) {
pRsp->withSchema = 1; while (tqNextDataBlockFilterOut(pReader, pFilterOutTbUid)) {
STqReader* pReader = pExec->pExecReader; if (tqRetrieveDataBlock(block, pReader) < 0) {
tqReaderSetDataMsg(pReader, pReq, 0);
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { return block;
int64_t uid = pExec->pExecReader->msgIter.uid;
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
blockDataFreeRes(&block);
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
blockDataFreeRes(&block);
tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++;
} }
} }
if (pRsp->blockNum == 0) { return NULL;
return -1;
}
return 0;
} }
...@@ -70,25 +70,35 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -70,25 +70,35 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
} }
walRefVer(handle.pRef, handle.snapshotVer); walRefVer(handle.pRef, handle.snapshotVer);
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTableReader = true,
.initTqReader = true,
.version = handle.snapshotVer,
};
handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.task); ASSERT(handle.execHandle.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner); qExtractStreamScanner(handle.execHandle.task, &scanner);
ASSERT(scanner); ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader); ASSERT(handle.execHandle.pExecReader);
} else { } else if(handle.execHandle.subType == TOPIC_SUB_TYPE__DB){
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
reader.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
reader.tqReader = handle.execHandle.pExecReader;
reader.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid;
handle.execHandle.task =
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
......
...@@ -15,22 +15,21 @@ ...@@ -15,22 +15,21 @@
#include "tq.h" #include "tq.h"
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int64_t tqFetchLog(SWalReader *pWalReader, bool fetchMeta, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0; int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex); taosThreadMutexLock(&pWalReader->mutex);
int64_t offset = *fetchOffset; int64_t offset = *fetchOffset;
while (1) { while (1) {
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) { if (walFetchHead(pWalReader, offset, *ppCkHead) < 0) {
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return", tqDebug("tmq poll no more log to return");
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
*fetchOffset = offset - 1; *fetchOffset = offset - 1;
code = -1; code = -1;
goto END; goto END;
} }
if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) { if ((*ppCkHead)->head.msgType == TDMT_VND_SUBMIT) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
...@@ -42,10 +41,10 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -42,10 +41,10 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = 0; code = 0;
goto END; goto END;
} else { } else {
if (pHandle->fetchMeta) { if (fetchMeta) {
SWalCont* pHead = &((*ppCkHead)->head); SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) { if (IS_META_MSG(pHead->msgType)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead); code = walFetchBody(pWalReader, ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
...@@ -58,7 +57,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -58,7 +57,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
goto END; goto END;
} }
} }
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); code = walSkipFetchBody(pWalReader, *ppCkHead);
if (code < 0) { if (code < 0) {
ASSERT(0); ASSERT(0);
*fetchOffset = offset; *fetchOffset = offset;
...@@ -69,7 +68,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea ...@@ -69,7 +68,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
} }
} }
END: END:
taosThreadMutexUnlock(&pHandle->pWalReader->mutex); taosThreadMutexUnlock(&pWalReader->mutex);
return code; return code;
} }
......
...@@ -2581,10 +2581,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -2581,10 +2581,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
if (pCond->suid != 0) { if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, pCond->schemaVersion);
} else if (taosArrayGetSize(pTableList) > 0) { } else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0); STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1); pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, pCond->schemaVersion);
} }
int32_t numOfTables = taosArrayGetSize(pTableList); int32_t numOfTables = taosArrayGetSize(pTableList);
......
...@@ -149,7 +149,7 @@ typedef struct { ...@@ -149,7 +149,7 @@ typedef struct {
//TODO remove prepareStatus //TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq STqOffsetVal lastStatus; // for tmq
void* metaBlk; // for tmq fetching meta SMqMetaRsp metaRsp; // for tmq fetching meta
SSDataBlock* pullOverBlk; // for streaming SSDataBlock* pullOverBlk; // for streaming
SWalFilterCond cond; SWalFilterCond cond;
int64_t lastScanUid; int64_t lastScanUid;
...@@ -498,7 +498,8 @@ typedef struct SStreamRawScanInfo{ ...@@ -498,7 +498,8 @@ typedef struct SStreamRawScanInfo{
// void *metaInfo; // void *metaInfo;
// void *dataInfo; // void *dataInfo;
SReadHandle * readHandle; SWalCkHead* pCkHead;
SReadHandle* readHandle;
SSDataBlock pRes; // result SSDataBlock SSDataBlock pRes; // result SSDataBlock
uint64_t groupId; uint64_t groupId;
STsdbReader* dataReader; STsdbReader* dataReader;
......
...@@ -847,6 +847,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi ...@@ -847,6 +847,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
pCond->schemaVersion = -1;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;
int32_t j = 0; int32_t j = 0;
......
...@@ -154,13 +154,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -154,13 +154,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
// extract the number of output columns // extract the number of output columns
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
*numOfCols = 0; if(numOfCols) *numOfCols = 0;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) { FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
++(*numOfCols); if(numOfCols) ++(*numOfCols);
} }
} }
...@@ -585,10 +585,10 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) { ...@@ -585,10 +585,10 @@ const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
return pInfo->tqReader->pSchemaWrapper; return pInfo->tqReader->pSchemaWrapper;
} }
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SMqMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE); ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
return pTaskInfo->streamInfo.metaBlk; return &pTaskInfo->streamInfo.metaRsp;
} }
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) { int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
...@@ -613,6 +613,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s ...@@ -613,6 +613,7 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion; pCond->endVersion = sContext->snapVersion;
pCond->schemaVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
pCond->colList[i].type = mtInfo.schema->pSchema[i].type; pCond->colList[i].type = mtInfo.schema->pSchema[i].type;
...@@ -722,7 +723,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -722,7 +723,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
}else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ }else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if(setDataForSnapShot(sContext, pOffset->uid) != 0) { if(setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid);
return -1; return -1;
} }
...@@ -732,6 +733,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -732,6 +733,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if(pOffset->ts == 0) pOffset->ts = INT64_MIN; if(pOffset->ts == 0) pOffset->ts = INT64_MIN;
if (pOffset->uid == 0) { if (pOffset->uid == 0) {
qError("setDataForSnapShot error. uid = 0 ");
return -1; return -1;
} }
...@@ -746,6 +748,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -746,6 +748,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->readHandle->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts); qDebug("tsdb reader snapshot change to uid %ld ts %ld", pOffset->uid, pOffset->ts);
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if(setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid);
return -1;
}
}else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
} }
return 0; return 0;
} }
...@@ -3924,6 +3924,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC ...@@ -3924,6 +3924,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond->type = TIMEWINDOW_RANGE_CONTAINED; pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1; pCond->startVersion = -1;
pCond->endVersion = -1; pCond->endVersion = -1;
pCond->schemaVersion = -1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -1456,45 +1456,140 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -1456,45 +1456,140 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
qDebug("stream scan called"); qDebug("stream scan called");
ASSERT(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA); if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
SSDataBlock* pBlock = &pInfo->pRes;
SSDataBlock* pBlock = &pInfo->pRes; while (tsdbNextDataBlock(pInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
while (tsdbNextDataBlock(pInfo->dataReader)) { SDataBlockInfo binfo = pBlock->info;
if (isTaskKilled(pTaskInfo)) { tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo);
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
SDataBlockInfo binfo = pBlock->info; pBlock->info = binfo;
tsdbRetrieveDataBlockInfo(pInfo->dataReader, &binfo);
pBlock->info = binfo; SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL);
pBlock->pDataBlock = pCols;
if (pCols == NULL) {
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
return NULL;
}else{
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
continue;
}
}
SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA;
if (pCols == NULL) { pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
return pBlock;
}
qDebug("stream scan tsdb return null");
return NULL;
}else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){
SSnapContext *sContext = pInfo->sContext;
void* data = NULL;
int32_t dataLen = 0;
int16_t type = 0;
int64_t uid = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){
qError("getMetafromSnapShot error");
taosMemoryFreeClear(data);
return NULL; return NULL;
} }
// size_t numOfSrcCols = taosArrayGetSize(pCols); if(!sContext->queryMetaOrData){ // change to get data next poll request
// for (int i = 0; i < taosArrayGetSize(pCols); i++) { pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
// SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId); pTaskInfo->streamInfo.lastStatus.uid = uid;
// colDataAssign(pDst, p, pBlock->info.rows, &pBlock->info); pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
// } pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = 0;
}else{
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
pTaskInfo->streamInfo.metaRsp.resMsgType = type;
pTaskInfo->streamInfo.metaRsp.metaRspLen = dataLen;
pTaskInfo->streamInfo.metaRsp.metaRsp = data;
}
pBlock->pDataBlock = pCols; return NULL;
}else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
if(pInfo->pCkHead == NULL){
pInfo->pCkHead = taosMemoryCalloc(1, sizeof(SWalCkHead) + 2048);
if (pInfo->pCkHead == NULL) {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_DATA; }
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.uid; walSetReaderCapacity(pInfo->readHandle->pWalReader, 2048);
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey; }
return pBlock; int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version;
SWalCont* pHead = &pInfo->pCkHead->head;
if(pHead->msgType != TDMT_VND_SUBMIT){
fetchVer++;
if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
return NULL;
}
qDebug("tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
pHead = &pInfo->pCkHead->head;
if(pHead->msgType == TDMT_VND_SUBMIT){
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0);
}else if(pInfo->sContext->withMeta){
ASSERT(IS_META_MSG(pHead->msgType));
qDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
return NULL;
}
}
if (pHead->msgType == TDMT_VND_SUBMIT) {
while(1){
blockDataFreeRes(&pInfo->pRes);
SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->readHandle->tqReader, pInfo->readHandle->pFilterOutTbUid, &pInfo->pRes);
if(!block){
fetchVer++;
if (tqFetchLog(pInfo->readHandle->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
return NULL;
}
pHead = &pInfo->pCkHead->head;
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
tqReaderSetDataMsg(pInfo->readHandle->tqReader, pCont, 0);
}
return block;
}
}
} }
qDebug("stream scan tsdb return null");
return NULL; return NULL;
} }
static void destroyRawScanOperatorInfo(void* param, int32_t numOfOutput) {
SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param;
taosMemoryFreeClear(pRawScan->pCkHead);
if (pRawScan->readHandle->tqReader) {
tqCloseReader(pRawScan->readHandle->tqReader);
}
blockDataFreeRes(&pRawScan->pRes);
tsdbReaderClose(pRawScan->dataReader);
destroySnapContext(pRawScan->sContext);
taosHashCleanup(pRawScan->readHandle->pFilterOutTbUid);
taosMemoryFree(pRawScan);
}
// for subscribing db or stb (not including column), // for subscribing db or stb (not including column),
// if this scan is used, meta data can be return // if this scan is used, meta data can be return
// and schemas are decided when scanning // and schemas are decided when scanning
...@@ -1519,7 +1614,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -1519,7 +1614,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, NULL, pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo,
NULL, NULL, NULL); NULL, NULL, NULL);
return pOperator; return pOperator;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册