未验证 提交 b89b2df2 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16524 from taosdata/feature/stream

feat(tmq): support taosx
...@@ -88,4 +88,3 @@ Standard: Auto ...@@ -88,4 +88,3 @@ Standard: Auto
TabWidth: 8 TabWidth: 8
UseTab: Never UseTab: Never
... ...
...@@ -2626,6 +2626,22 @@ typedef struct { ...@@ -2626,6 +2626,22 @@ typedef struct {
}; };
} STqOffsetVal; } STqOffsetVal;
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
}
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->uid = uid;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver;
}
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
...@@ -2957,6 +2973,7 @@ typedef struct { ...@@ -2957,6 +2973,7 @@ typedef struct {
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteSMqDataRsp(SMqDataRsp* pRsp);
typedef struct { typedef struct {
SMqRspHead head; SMqRspHead head;
......
...@@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { ...@@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
} }
int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_unsubscribe(tmq_t* tmq) {
int32_t rsp;
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
int32_t rsp = tmq_subscribe(tmq, lst); while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst); tmq_list_destroy(lst);
return rsp; return rsp;
} }
......
...@@ -5889,6 +5889,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { ...@@ -5889,6 +5889,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0; return 0;
} }
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
......
...@@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
int32_t cols = 0; int32_t cols = 0;
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB); strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name));
tNameGetDbName(&n, varDataVal(topicName)); /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
/*tNameGetDbName(&n, varDataVal(topicName));*/
varDataSetLen(topicName, strlen(varDataVal(topicName))); varDataSetLen(topicName, strlen(varDataVal(topicName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
......
...@@ -67,21 +67,21 @@ typedef struct { ...@@ -67,21 +67,21 @@ typedef struct {
// tqExec // tqExec
typedef struct { typedef struct {
char* qmsg; char* qmsg;
} STqExecCol; } STqExecCol;
typedef struct { typedef struct {
int64_t suid; int64_t suid;
} STqExecTb; } STqExecTb;
typedef struct { typedef struct {
SHashObj* pFilterOutTbUid; SHashObj* pFilterOutTbUid;
} STqExecDb; } STqExecDb;
typedef struct { typedef struct {
int8_t subType; int8_t subType;
STqReader* pExecReader; STqReader* pExecReader;
qTaskInfo_t task; qTaskInfo_t task;
union { union {
STqExecCol execCol; STqExecCol execCol;
...@@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec // tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta // tqMeta
...@@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); ...@@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
char* tqOffsetBuildFName(const char* path, int32_t ver); char* tqOffsetBuildFName(const char* path, int32_t ver);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
}
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->uid = uid;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver;
}
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
......
...@@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
TD_VID(pTq->pVnode), formatBuf); TD_VID(pTq->pVnode), formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot){ if (pReq->useSnapshot) {
if (pHandle->fetchMeta){ if (pHandle->fetchMeta) {
tqOffsetResetToMeta(&fetchOffsetNew, 0); tqOffsetResetToMeta(&fetchOffsetNew, 0);
} else { } else {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
...@@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
goto OVER; tDeleteSMqDataRsp(&dataRsp);
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
" in vg %d, subkey %s, reset none failed", " in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
code = -1; code = -1;
goto OVER; tDeleteSMqDataRsp(&dataRsp);
return code;
} }
} }
} }
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){ if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) {
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
if(metaRsp.metaRspLen > 0){ if (metaRsp.metaRspLen > 0) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1; code = -1;
} }
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
taosMemoryFree(metaRsp.metaRsp); taosMemoryFree(metaRsp.metaRsp);
goto OVER; goto OVER;
} }
if (dataRsp.blockNum > 0){ if (dataRsp.blockNum > 0) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1; code = -1;
} }
goto OVER; goto OVER;
}else{ } else {
fetchOffsetNew = dataRsp.rspOffset; fetchOffsetNew = dataRsp.rspOffset;
} }
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version); consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
...@@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
consumerEpoch = atomic_load_32(&pHandle->epoch); consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) { if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", ", found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break; break;
} }
...@@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
// TODO batch optimization: // TODO batch optimization:
...@@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
OVER: OVER:
if (pCkHead) taosMemoryFree(pCkHead); if (pCkHead) taosMemoryFree(pCkHead);
// TODO wrap in destroy func tDeleteSMqDataRsp(&dataRsp);
taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
if (dataRsp.withSchema) {
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (dataRsp.withTbName) {
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
return code; return code;
} }
...@@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe ...@@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
pHandle->execHandle.task = (SSnapContext**)(&handle.sContext));
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);
......
...@@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { ...@@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0; return 0;
} }
int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset;
return 0;
}
}
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("tmq task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
tqDebug("tmq task execute end, get %p", pDataBlock);
if (pDataBlock) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
}
}
return 0;
}
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle; const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task; qTaskInfo_t task = pExec->task;
...@@ -102,18 +142,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -102,18 +142,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
taosArrayPush(pRsp->blockTbName, &tbName); taosArrayPush(pRsp->blockTbName, &tbName);
} }
} }
if(pRsp->withSchema){ if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
}else{ } else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
} }
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
}else{ } else {
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
} }
pRsp->blockNum++; pRsp->blockNum++;
...@@ -125,17 +165,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -125,17 +165,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
} }
} }
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), if (qStreamExtractPrepareUid(task) != 0) {
pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue;
}
}else{
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
if(qStreamExtractPrepareUid(task) != 0){
continue; continue;
} }
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
...@@ -143,13 +175,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -143,13 +175,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
break; break;
} }
if (pRsp->blockNum > 0){ if (pRsp->blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data"); tqDebug("tmqsnap task exec exited, get data");
break; break;
} }
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
...@@ -173,57 +205,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -173,57 +205,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
return 0; return 0;
} }
#if 0 int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) {
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0);
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
pRsp->withTbName = 0;
#if 0
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp);
#endif
}
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) break;
}
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
return 0;
}
#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
...@@ -268,6 +251,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR ...@@ -268,6 +251,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockSchemaToRsp(pExec, pRsp); tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++; pRsp->blockNum++;
} }
#if 0
if (pHandle->fetchMeta && pRsp->blockNum) {
SSubmitMsgIter iter = {0};
tInitSubmitMsgIter(pReq, &iter);
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
while (1) {
SSubmitBlk* pBlk = NULL;
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
if (pBlk->schemaLen > 0) {
if (pXrsp->createTableNum == 0) {
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen);
taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
taosArrayPush(pXrsp->createTableReq, &createReq);
pXrsp->createTableNum++;
}
}
}
#endif
} }
if (pRsp->blockNum == 0) { if (pRsp->blockNum == 0) {
......
...@@ -143,6 +143,8 @@ typedef struct { ...@@ -143,6 +143,8 @@ typedef struct {
STqOffsetVal prepareStatus; // for tmq STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SSchemaWrapper *schema; SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming SSDataBlock* pullOverBlk; // for streaming
...@@ -486,24 +488,23 @@ typedef struct SStreamScanInfo { ...@@ -486,24 +488,23 @@ typedef struct SStreamScanInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes; SSDataBlock* pUpdateDataRes;
// status for tmq // status for tmq
// SSchemaWrapper schema; SNodeList* pGroupTags;
SNodeList* pGroupTags; SNode* pTagCond;
SNode* pTagCond; SNode* pTagIndexCond;
SNode* pTagIndexCond;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct SStreamRawScanInfo{ typedef struct {
// int8_t subType; // int8_t subType;
// bool withMeta; // bool withMeta;
// int64_t suid; // int64_t suid;
// int64_t snapVersion; // int64_t snapVersion;
// void *metaInfo; // void *metaInfo;
// void *dataInfo; // void *dataInfo;
SVnode* vnode; SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader; STsdbReader* dataReader;
SSnapContext* sContext; SSnapContext* sContext;
}SStreamRawScanInfo; } SStreamRawScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp; SRetrieveMetaTableRsp* pRsp;
...@@ -528,14 +529,14 @@ typedef struct SBlockDistInfo { ...@@ -528,14 +529,14 @@ typedef struct SBlockDistInfo {
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
void* pHandle; void* pHandle;
SReadHandle readHandle; SReadHandle readHandle;
uint64_t uid; // table uid uint64_t uid; // table uid
} SBlockDistInfo; } SBlockDistInfo;
// todo remove this // todo remove this
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
bool mergeResultBlock; bool mergeResultBlock;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
......
...@@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO ...@@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (NULL == pTaskInfo) { if (NULL == pTaskInfo) {
...@@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
if(NULL == pTaskInfo->pRoot){ if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo); taosMemoryFree(pTaskInfo);
return NULL; return NULL;
...@@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else { } else {
ASSERT(0); ASSERT(0);
} }
}else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if(setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
return -1; return -1;
} }
...@@ -847,27 +847,29 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ...@@ -847,27 +847,29 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
if(mtInfo.uid == 0) return 0; // no data if (mtInfo.uid == 0) return 0; // no data
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo); initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
&pInfo->dataReader, NULL);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts); qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
if(setForSnapShot(sContext, pOffset->uid) != 0) { if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid); qError("setForSnapShot error. uid:%" PRIi64 " ,version:%" PRIi64, pOffset->uid);
return -1; return -1;
} }
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid); qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid);
}else if (pOffset->type == TMQ_OFFSET__LOG) { } else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader); tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL; pInfo->dataReader = NULL;
......
...@@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition)); sizeof(SResultRowPosition));
} }
// 2. set the new time window to be the new active time window // 2. set the new time window to be the new active time window
...@@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
} }
} }
} }
#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) {
pInfo->noTable = 1;
return TSDB_CODE_SUCCESS;
}
/*if (pSnapShotScanInfo->dataReader == NULL) {*/
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/
pInfo->noTable = 0;
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
pInfo->currentTable = i;
}
}
// TODO after processing drop, found can be false
ASSERT(found);
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows.skey;
pInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->cond.twindows.skey = oldSkey;
pInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
pInfo->currentTable, tableSz);
}
return TSDB_CODE_SUCCESS;
} else {
if (pOperator->numOfDownstream == 1) {
return doPrepareScan(pOperator->pDownstream[0], uid, ts);
} else if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block");
return TSDB_CODE_QRY_APP_ERROR;
} else {
qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
}
}
}
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts;
} else {
if (pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA;
} else {
doGetScanStatus(pOperator->pDownstream[0], uid, ts);
}
}
return TSDB_CODE_SUCCESS;
}
#endif
// this is a blocking operator // this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
...@@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len ...@@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage); releaseBufPage(pSup->pResultBuf, pPage);
int32_t iter = 0; int32_t iter = 0;
void* pIter = NULL; void* pIter = NULL;
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) { while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
...@@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul ...@@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) { const char* pKey) {
int32_t code = 0; int32_t code = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1; pAggSup->currentPageId = -1;
...@@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { ...@@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList; return pList;
} }
#if 0
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
code = 0;
qDebug("no table qualified for query, %s", idstr);
goto _error;
}
SQueryTableDataCond cond = {0};
code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
STsdbReader* pReader;
code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
cleanupQueryTableDataCond(&cond);
return pReader;
_error:
terrno = code;
return NULL;
}
#endif
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
......
...@@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 ...@@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp; SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
...@@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version; pInfo->pRes->info.version = pBlock->info.version;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupIdPre) { if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre; pInfo->pRes->info.groupId = *groupIdPre;
} else { } else {
...@@ -1276,48 +1276,29 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock ...@@ -1276,48 +1276,29 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
return 0; return 0;
} }
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
#if 0 qDebug("queue scan called");
SStreamState* pState = pTaskInfo->streamInfo.pState; if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (pState) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
printf(">>>>>>>> stream write backend\n"); if (pResult && pResult->info.rows > 0) {
SWinKey key = { qDebug("queue scan tsdb return %d rows", pResult->info.rows);
.ts = 1, return pResult;
.groupId = 2, } else {
}; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
char tmp[100] = "abcdefg1"; tsdbReaderClose(pTSInfo->dataReader);
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) { pTSInfo->dataReader = NULL;
ASSERT(0); tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
} qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
key.ts = 2; return NULL;
char tmp2[100] = "abcdefg2"; }
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) { ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
} }
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
} }
#endif
qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
SFetchRet ret = {0}; SFetchRet ret = {0};
...@@ -1329,21 +1310,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1329,21 +1310,21 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
// TODO clean data block // TODO clean data block
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows); qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes; return pInfo->pRes;
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0); ASSERT(0);
// pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta; // pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL; // return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) { } else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset; pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80]; char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset); tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("stream scan log return null, offset %s", formatBuf); qDebug("queue scan log return null, offset %s", formatBuf);
return NULL; return NULL;
} else { } else {
ASSERT(0); ASSERT(0);
...@@ -1357,7 +1338,53 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1357,7 +1338,53 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
qDebug("stream scan tsdb return null"); qDebug("stream scan tsdb return null");
return NULL; return NULL;
} else {
ASSERT(0);
return NULL;
}
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
printf(">>>>>>>> stream write backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char tmp[100] = "abcdefg1";
if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
ASSERT(0);
}
key.ts = 2;
char tmp2[100] = "abcdefg2";
if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
ASSERT(0);
}
key.groupId = 5;
key.ts = 1;
char tmp3[100] = "abcdefg3";
if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
ASSERT(0);
}
char* val2 = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val2, sz);
streamFreeVal(val2);
} }
#endif
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
...@@ -1554,14 +1581,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { ...@@ -1554,14 +1581,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
} }
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not // NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info; SStreamRawScanInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("tmqsnap doRawScan called"); qDebug("tmqsnap doRawScan called");
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pBlock = &pInfo->pRes; SSDataBlock* pBlock = &pInfo->pRes;
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
...@@ -1585,42 +1612,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -1585,42 +1612,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
} }
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal"); qDebug("tmqsnap read snapshot done, change to get data from wal");
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion; pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); } else {
}else{
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN; pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
} }
qDebug("tmqsnap stream scan tsdb return null"); qDebug("tmqsnap stream scan tsdb return null");
return NULL; return NULL;
}else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){ } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext *sContext = pInfo->sContext; SSnapContext* sContext = pInfo->sContext;
void* data = NULL; void* data = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
int16_t type = 0; int16_t type = 0;
int64_t uid = 0; int64_t uid = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){ if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
qError("tmqsnap getMetafromSnapShot error"); qError("tmqsnap getMetafromSnapShot error");
taosMemoryFreeClear(data); taosMemoryFreeClear(data);
return NULL; return NULL;
} }
if(!sContext->queryMetaOrData){ // change to get data next poll request if (!sContext->queryMetaOrData) { // change to get data next poll request
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN; pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
}else{ } else {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus; pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
...@@ -1631,44 +1654,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { ...@@ -1631,44 +1654,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { // else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; // int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
// //
// while(1){ // while(1){
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { // if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); // qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
// pTaskInfo->streamInfo.lastStatus.version = fetchVer; // pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// return NULL; // return NULL;
// } // }
// SWalCont* pHead = &pInfo->pCkHead->head; // SWalCont* pHead = &pInfo->pCkHead->head;
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); // qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
// //
// if (pHead->msgType == TDMT_VND_SUBMIT) { // if (pHead->msgType == TDMT_VND_SUBMIT) {
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body; // SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); // tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); // SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
// if(block){ // &pInfo->pRes); if(block){
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.lastStatus.version = fetchVer; // pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); // qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// return block; // return block;
// }else{ // }else{
// fetchVer++; // fetchVer++;
// } // }
// } else{ // } else{
// ASSERT(pInfo->sContext->withMeta); // ASSERT(pInfo->sContext->withMeta);
// ASSERT(IS_META_MSG(pHead->msgType)); // ASSERT(IS_META_MSG(pHead->msgType));
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); // qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; // pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; // pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; // pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; // pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); // pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); // memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
// return NULL; // return NULL;
// } // }
// } // }
return NULL; return NULL;
} }
...@@ -1689,7 +1712,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -1689,7 +1712,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
// create tq reader // create tq reader
SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL; return NULL;
...@@ -1699,13 +1722,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT ...@@ -1699,13 +1722,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator"; pOperator->name = "RawStreamScanOperator";
// pOperator->blocking = false; // pOperator->blocking = false;
// pOperator->status = OP_NOT_OPENED; // pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
NULL, NULL, NULL);
return pOperator; return pOperator;
} }
...@@ -1724,7 +1746,7 @@ static void destroyStreamScanOperatorInfo(void* param) { ...@@ -1724,7 +1746,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
} }
if (pStreamScan->pPseudoExpr) { if (pStreamScan->pPseudoExpr) {
destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
taosMemoryFreeClear(pStreamScan->pPseudoExpr); taosMemoryFree(pStreamScan->pPseudoExpr);
} }
updateInfoDestroy(pStreamScan->pUpdateInfo); updateInfoDestroy(pStreamScan->pUpdateInfo);
...@@ -1815,6 +1837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1815,6 +1837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
// set the extract column id to streamHandle // set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds); tqReaderSetColIdList(pInfo->tqReader, pColIds);
...@@ -1858,8 +1881,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1858,8 +1881,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo, __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
NULL, NULL, NULL); pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
return pOperator; return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册