提交 4ed47a02 编写于 作者: L Liu Jicong

feat(tmq): scan tsdb as snapshot

上级 14a488c9
......@@ -76,19 +76,41 @@ int32_t init_env() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')");
if (taos_errno(pRes) != 0) {
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
......@@ -168,6 +190,9 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "experiment.use.snapshot", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
......
......@@ -113,8 +113,8 @@ typedef struct SQueryTableDataCond {
int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow* twindows;
int32_t startVersion;
int32_t endVersion;
int64_t startVersion;
int64_t endVersion;
} SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock);
......
......@@ -696,12 +696,12 @@ typedef struct {
typedef STableCfg STableCfgRsp;
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
int32_t tSerializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq);
int32_t tDeserializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq);
int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
void tFreeSTableCfgRsp(STableCfgRsp *pRsp);
int32_t tSerializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp);
int32_t tDeserializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp);
void tFreeSTableCfgRsp(STableCfgRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
......@@ -2652,6 +2652,7 @@ typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int8_t useSnapshot;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
......
......@@ -36,11 +36,13 @@ typedef struct SReadHandle {
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
int8_t initTsdbReader;
} SReadHandle;
enum {
STREAM_DATA_TYPE_SUBMIT_BLOCK = 1,
STREAM_DATA_TYPE_SSDATA_BLOCK = 2,
STREAM_DATA_TYPE_FROM_SNAPSHOT = 3,
};
typedef enum {
......@@ -56,6 +58,13 @@ typedef enum {
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
/**
* Switch the stream scan to snapshot mode
* @param tinfo
* @return
*/
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo);
/**
* Set the input data block for the stream scan.
* @param tinfo
......
......@@ -54,6 +54,7 @@ struct tmq_conf_t {
int8_t autoCommit;
int8_t resetOffset;
int8_t withTbName;
int8_t useSnapshot;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
......@@ -69,6 +70,7 @@ struct tmq_t {
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
......@@ -282,6 +284,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "experiment.use.snapshot") == 0) {
if (strcmp(value, "true") == 0) {
conf->useSnapshot = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
conf->useSnapshot = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value);
return TMQ_CONF_OK;
......@@ -953,6 +967,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->withTbName = conf->withTbName;
pTmq->useSnapshot = conf->useSnapshot;
pTmq->autoCommit = conf->autoCommit;
pTmq->autoCommitInterval = conf->autoCommitInterval;
pTmq->commitCb = conf->commitCb;
......@@ -1534,6 +1549,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
pReq->currentOffset = reqOffset;
pReq->reqId = generateRequestId();
pReq->useSnapshot = tmq->useSnapshot;
pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqPollReq));
return pReq;
......
......@@ -509,7 +509,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
mndAddTaskToTaskSet(taskOneLevel, pTask);
// input
// source
pTask->isDataScan = 1;
// trigger
......
......@@ -150,6 +150,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
// tqExec
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId);
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp);
// tqMeta
......
......@@ -227,19 +227,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
}
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (rsp.blockData == NULL || rsp.blockDataLen == NULL) {
return -1;
}
rsp.withTbName = pReq->withTbName;
if (rsp.withTbName) {
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
......@@ -253,6 +250,28 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
}
#if 1
if (pReq->useSnapshot) {
int64_t lastVer = walGetCommittedVer(pTq->pWal);
if (rsp.reqOffset < lastVer) {
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
if (rsp.blockNum != 0) {
rsp.withTbName = false;
rsp.rspOffset = lastVer;
goto SEND_RSP;
}
}
}
#endif
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
if (pHeadWithCkSum == NULL) {
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
while (1) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
......@@ -292,6 +311,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
goto OVER;
}
code = 0;
goto OVER;
......@@ -308,6 +328,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree(pHeadWithCkSum);
SEND_RSP:
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
if (rsp.withSchema) {
......@@ -376,6 +397,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SReadHandle handle = {
.reader = pHandle->execHandle.pExecReader[i],
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTsdbReader = 1,
};
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
ASSERT(pHandle->execHandle.execCol.task[i]);
......@@ -448,6 +471,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
.reader = pStreamReader,
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTsdbReader = 1,
};
/*pTask->exec.inputHandle = pStreamReader;*/
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
......
......@@ -60,6 +60,30 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD
return 0;
}
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
if (qStreamScanSnapshot(task) < 0) {
ASSERT(0);
}
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp);
pRsp->blockNum++;
}
return 0;
}
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
qTaskInfo_t task = pExec->execCol.task[workerId];
......
......@@ -67,6 +67,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
taosArrayPush(pInfo->pBlockLists, &p);
}
} else if (type == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
// do nothing
ASSERT(pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT);
} else {
ASSERT(0);
}
......@@ -75,6 +78,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
if (tinfo == NULL) {
return TSDB_CODE_QRY_APP_ERROR;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL);
}
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
}
......@@ -106,14 +117,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return NULL;
}
// print those info into log
#if 0
pMsg->sId = pMsg->sId;
pMsg->queryId = pMsg->queryId;
pMsg->taskId = pMsg->taskId;
pMsg->contentLen = pMsg->contentLen;
#endif
/*qDebugL("stream task string %s", (const char*)msg);*/
struct SSubplan* plan = NULL;
......
......@@ -1340,7 +1340,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
if (rowRes != NULL) {
int32_t totalRows = pBlock->info.rows;
int32_t totalRows = pBlock->info.rows;
SSDataBlock* px = createOneDataBlock(pBlock, true);
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
......@@ -3968,7 +3968,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
}
}
}
int32_t len = (int32_t)(pStart - (char*)keyBuf);
int32_t len = (int32_t)(pStart - (char*)keyBuf);
uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len);
......@@ -4047,12 +4047,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
tsdbReaderT pDataReader = NULL;
if (pHandle) {
if (pHandle->vnode) {
// for stram
if (pHandle->initTsdbReader) {
// for stream
ASSERT(pHandle->vnode);
pDataReader =
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
} else {
// for tq
ASSERT(pHandle->meta);
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo, pTagCond);
}
}
......@@ -4502,16 +4504,12 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(*pTaskInfo)->sql = sql;
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
if (NULL == (*pTaskInfo)->pRoot) {
code = (*pTaskInfo)->code;
goto _complete;
}
if ((*pTaskInfo)->pRoot == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}
return code;
_complete:
......
......@@ -928,7 +928,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
blockDataUpdateTsWindow(pBlock, 0);
return pBlock;
} else {
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
blockDataDestroy(pInfo->pUpdateRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
......@@ -1062,6 +1062,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy);
if (pResult) {
return pResult->info.rows > 0 ? pResult : NULL;
}
return NULL;
} else {
ASSERT(0);
return NULL;
}
}
......
......@@ -80,7 +80,6 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes;
/*qRes->sourceVg = pTask->nodeId;*/
if (streamTaskOutput(pTask, qRes) < 0) {
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册