提交 41ed0138 编写于 作者: wmmhello's avatar wmmhello

fix:compile error

上级 07bdb95b
...@@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -82,7 +82,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id); uint64_t id);
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList); int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo);
/** /**
* set the task Id, usually used by message queue process * set the task Id, usually used by message queue process
......
...@@ -712,8 +712,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -712,8 +712,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
} }
} }
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
SArray* tbUidList = NULL; SArray* tbUidList = NULL;
ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList); ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
...@@ -723,10 +727,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -723,10 +727,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
......
...@@ -345,9 +345,12 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -345,9 +345,12 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
return -1; return -1;
} }
} }
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
SArray* tbUidList = NULL; SArray* tbUidList = NULL;
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList); int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId); tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
...@@ -357,10 +360,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -357,10 +360,6 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode); handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
......
...@@ -1084,7 +1084,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -1084,7 +1084,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
if (isAdd) { if (isAdd) {
SArray* list = NULL; SArray* list = NULL;
int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list); int ret = qGetTableList(pTqHandle->execHandle.execTb.suid, pTq->pVnode, pTqHandle->execHandle.execTb.node, &list, pTqHandle->execHandle.task);
if(ret != TDB_CODE_SUCCESS) { if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list); taosArrayDestroy(list);
......
...@@ -1152,14 +1152,14 @@ _end: ...@@ -1152,14 +1152,14 @@ _end:
return code; return code;
} }
int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList){ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList, void* pTaskInfo){
SSubplan *pSubplan = (SSubplan *)node; SSubplan *pSubplan = (SSubplan *)node;
SScanPhysiNode pNode = {0}; SScanPhysiNode pNode = {0};
pNode.suid = suid; pNode.suid = suid;
pNode.uid = suid; pNode.uid = suid;
pNode.tableType = TSDB_SUPER_TABLE; pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList"); int code = getTableList(pVnode, &pNode, pSubplan->pTagCond, pSubplan->pTagIndexCond, pTableListInfo, "qGetTableList", &((SExecTaskInfo*)pTaskInfo)->storageAPI);
*tableList = pTableListInfo->pTableList; *tableList = pTableListInfo->pTableList;
pTableListInfo->pTableList = NULL; pTableListInfo->pTableList = NULL;
tableListDestroy(pTableListInfo); tableListDestroy(pTableListInfo);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册