提交 56c98d77 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 d998912c
......@@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id);
/**
* set the task Id, usually used by message queue process
......@@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
......
......@@ -799,7 +799,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle tqHandle = {0};
pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/
uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
......@@ -834,7 +833,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.qmsg = NULL;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
......@@ -847,7 +846,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid;
......@@ -865,7 +864,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
......
......@@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL);
` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey);
code = -1;
......@@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
......@@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
......@@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}
......
......@@ -482,12 +482,6 @@ typedef struct SStreamScanInfo {
} SStreamScanInfo;
typedef struct {
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
......@@ -690,6 +684,8 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
......
......@@ -242,29 +242,27 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) {
// create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) {
if (msg == NULL) { // create raw scan
SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, "");
if (NULL == pTaskInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo);
return NULL;
}
qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
return pTaskInfo;
}
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
......@@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
}
}
if (pSchema) {
*pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
}
return pTaskInfo;
}
......@@ -1138,6 +1133,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
pScanInfo->currentTable = 0;
} else {
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
......@@ -1210,10 +1206,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
// if (pTableListInfo == NULL) {
// pTableListInfo = tableListCreate();
// }
tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
......
......@@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) {
return p;
}
static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
pTaskInfo->execModel = model;
......@@ -2465,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
goto _complete;
}
(*pTaskInfo)->cost.created = taosGetTimestampUs();
return TSDB_CODE_SUCCESS;
_complete:
......
......@@ -83,7 +83,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册