Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8f0f1f01
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8f0f1f01
编写于
1月 11, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/qnode
上级
9f864b74
52457838
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
261 addition
and
155 deletion
+261
-155
include/common/tmsg.h
include/common/tmsg.h
+8
-0
include/libs/executor/executor.h
include/libs/executor/executor.h
+8
-4
source/libs/executor/inc/dataSinkMgt.h
source/libs/executor/inc/dataSinkMgt.h
+7
-4
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+1
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+5
-2
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+3
-3
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+1
-0
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+52
-36
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-0
source/libs/qworker/inc/qworkerInt.h
source/libs/qworker/inc/qworkerInt.h
+3
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+129
-61
src/inc/tsdb.h
src/inc/tsdb.h
+8
-8
src/query/inc/qExecutor.h
src/query/inc/qExecutor.h
+2
-2
src/query/src/qExecutor.c
src/query/src/qExecutor.c
+33
-33
未找到文件。
include/common/tmsg.h
浏览文件 @
8f0f1f01
...
...
@@ -986,6 +986,14 @@ typedef struct {
char
msg
[];
}
SSubQueryMsg
;
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
uint64_t
queryId
;
uint64_t
taskId
;
}
SSinkDataReq
;
typedef
struct
{
SMsgHead
header
;
uint64_t
sId
;
...
...
include/libs/executor/executor.h
浏览文件 @
8f0f1f01
...
...
@@ -21,6 +21,9 @@ extern "C" {
#endif
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
struct
SSubplan
;
/**
* Create the exec task object according to task json
...
...
@@ -34,13 +37,14 @@ typedef void* qTaskInfo_t;
int32_t
qCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
struct
SSubplan
*
pPlan
,
qTaskInfo_t
*
pTaskInfo
);
/**
*
t
he main task execution function, including query on both table and multiple tables,
*
T
he main task execution function, including query on both table and multiple tables,
* which are decided according to the tag or table name query conditions
*
* @param qinfo
* @param tinfo
* @param handle
* @return
*/
bool
qExecTask
(
qTaskInfo_t
qTask
,
SSDataBlock
**
pRes
);
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
DataSinkHandle
*
handle
);
/**
* Retrieve the produced results information, if current query is not paused or completed,
...
...
@@ -62,7 +66,7 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
* @param contLen payload length
* @return
*/
int32_t
qDumpRetrieveResult
(
qTaskInfo_t
qinfo
,
SRetrieveTableRsp
**
pRsp
,
int32_t
*
contLen
,
bool
*
continueExec
);
//
int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec);
/**
* return the transporter context (RPC)
...
...
source/libs/executor/inc/dataSinkMgt.h
浏览文件 @
8f0f1f01
...
...
@@ -21,12 +21,13 @@ extern "C" {
#endif
#include "os.h"
#include "executor.h"
#include "executorimpl.h"
#define DS_CAPACITY_ENOUGH 1
#define DS_
CAPACITY_FULL
2
#define DS_
DATA_FULL
2
#define DS_NEED_SCHEDULE 3
#define DS_
END
4
#define DS_
QUERY_END
4
#define DS_IN_PROCESS 5
struct
SDataSink
;
...
...
@@ -39,8 +40,6 @@ typedef struct SDataSinkMgtCfg {
int32_t
dsDataSinkMgtInit
(
SDataSinkMgtCfg
*
cfg
);
typedef
void
*
DataSinkHandle
;
typedef
struct
SInputData
{
const
SSDataBlock
*
pData
;
SHashObj
*
pTableRetrieveTsMap
;
...
...
@@ -68,6 +67,10 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
*/
int32_t
dsPutDataBlock
(
DataSinkHandle
handle
,
const
SInputData
*
pInput
,
int32_t
*
pStatus
);
/**
*
* @param handle
*/
void
dsEndPut
(
DataSinkHandle
handle
);
/**
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
8f0f1f01
...
...
@@ -20,6 +20,7 @@
extern
"C"
{
#endif
#ifdef __cplusplus
}
#endif
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
8f0f1f01
...
...
@@ -20,14 +20,16 @@
#include "ttszip.h"
#include "tvariant.h"
#include "
thash
.h"
#include "
dataSinkMgt
.h"
#include "executil.h"
#include "planner.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfilter.h"
#include "thash.h"
#include "tlockfree.h"
#include "tpagedfile.h"
#include "
planne
r.h"
#include "
executo
r.h"
struct
SColumnFilterElem
;
...
...
@@ -256,6 +258,7 @@ typedef struct SExecTaskInfo {
// void* rspContext; // response context
char
*
sql
;
// query sql string
jmp_buf
env
;
//
DataSinkHandle
dsHandle
;
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
8f0f1f01
...
...
@@ -124,7 +124,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static
int32_t
updateStatus
(
SDataDispatchHandle
*
pDispatcher
)
{
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
int32_t
status
=
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
<
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_CAPACITY_ENOUGH
:
DS_
CAPACITY
_FULL
;
int32_t
status
=
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
<
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_CAPACITY_ENOUGH
:
DS_
DATA
_FULL
;
pDispatcher
->
status
=
status
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
return
status
;
...
...
@@ -152,14 +152,14 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
static
void
endPut
(
struct
SDataSinkHandle
*
pHandle
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
pthread_mutex_lock
(
&
pDispatcher
->
mutex
);
pDispatcher
->
status
=
DS_END
;
pDispatcher
->
status
=
DS_
QUERY_
END
;
pthread_mutex_unlock
(
&
pDispatcher
->
mutex
);
}
static
int32_t
getDataLength
(
SDataSinkHandle
*
pHandle
,
int32_t
*
pStatus
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
if
(
taosQueueEmpty
(
pDispatcher
->
pDataBlocks
))
{
*
pStatus
=
getStatus
(
pDispatcher
)
?
DS_END
:
DS_IN_PROCESS
;
*
pStatus
=
getStatus
(
pDispatcher
)
?
DS_
QUERY_
END
:
DS_IN_PROCESS
;
return
0
;
}
SDataDispatchBuf
*
pBuf
=
NULL
;
...
...
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
8f0f1f01
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tarray.h"
#include "dataSinkMgt.h"
#include "dataSinkInt.h"
#include "planner.h"
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
8f0f1f01
...
...
@@ -13,9 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <dataSinkMgt.h>
#include "exception.h"
#include "os.h"
#include "tarray.h"
#include "dataSinkMgt.h"
#include "exception.h"
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
...
...
@@ -69,8 +70,9 @@ void freeParam(STaskParam *param) {
int32_t
qCreateExecTask
(
void
*
tsdb
,
int32_t
vgId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
)
{
assert
(
tsdb
!=
NULL
&&
pSubplan
!=
NULL
);
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
int32_t
code
=
doCreateExecTaskInfo
(
pSubplan
,
(
SExecTaskInfo
**
)
pTaskInfo
,
tsdb
);
int32_t
code
=
doCreateExecTaskInfo
(
pSubplan
,
pTask
,
tsdb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -81,8 +83,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto
_error
;
}
DataSinkHandle
pHandle
=
NULL
;
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
&
pHandle
);
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
(
*
pTask
)
->
dsHandle
);
_error:
// if failed to add ref for all tables in this query, abort current query
...
...
@@ -134,64 +135,79 @@ int waitMoment(SQInfo* pQInfo){
}
#endif
bool
qExecTask
(
qTaskInfo_t
tinfo
,
SSDataBlock
**
pRes
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
int32_t
qExecTask
(
qTaskInfo_t
tinfo
,
DataSinkHandle
*
handle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int64_t
threadId
=
taosGetSelfPthreadId
();
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QInfo:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
qError
(
"QInfo:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
fals
e
;
return
pTaskInfo
->
cod
e
;
}
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
if
(
pTaskInfo
->
cost
.
start
==
0
)
{
pTaskInfo
->
cost
.
start
=
taosGetTimestampMs
();
}
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
// return doBuildResCheck(pTaskInfo)
;
qDebug
(
"QInfo:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
pTaskInfo
->
code
;
}
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) {
// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
// return doBuildResCheck(pTaskInfo);
// }
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
// if (pTaskInfo->tableqinfoGroupInfo.numOfTables == 0) {
// qDebug("QInfo:0x%"PRIx64" no table exists for query, abort", GET_TASKID(pTaskInfo));
// setTaskStatus(pTaskInfo, TASK_COMPLETED);
// return doBuildResCheck(pTaskInfo);
// }
// error occurs, record the error code and return to client
int32_t
ret
=
setjmp
(
pTaskInfo
->
env
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
qDebug
(
"QInfo:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
// return doBuildResCheck(pTaskInfo)
;
qDebug
(
"QInfo:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
}
qDebug
(
"QInfo:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"QInfo:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
int64_t
st
=
0
;
int64_t
st
=
taosGetTimestampUs
();
*
pRes
=
pTaskInfo
->
pRoot
->
exec
(
pTaskInfo
->
pRoot
,
&
newgroup
);
// todo put the result into sink node.
handle
=
&
pTaskInfo
->
dsHandle
;
pTaskInfo
->
cost
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
while
(
1
)
{
st
=
taosGetTimestampUs
();
SSDataBlock
*
pRes
=
pTaskInfo
->
pRoot
->
exec
(
pTaskInfo
->
pRoot
,
&
newgroup
);
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" query is killed"
,
GET_TASKID
(
pTaskInfo
));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
else
{
// qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows", pTaskInfo->qId,
// GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total);
pTaskInfo
->
cost
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
);
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_AFTER_OPERATOR_EXEC
);
if
(
pRes
==
NULL
)
{
// no results generated yet, abort
return
pTaskInfo
->
code
;
}
int32_t
status
=
0
;
SInputData
inputData
=
{.
pData
=
pRes
,
.
pTableRetrieveTsMap
=
NULL
};
pTaskInfo
->
code
=
dsPutDataBlock
(
pTaskInfo
->
dsHandle
,
&
inputData
,
&
status
);
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"QInfo:0x%"
PRIx64
" task is killed"
,
GET_TASKID
(
pTaskInfo
));
// } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) {
// qDebug("QInfo:0x%"PRIx64" over, %u tables queried, total %"PRId64" rows returned", pTaskInfo->qId, pRuntimeEnv->tableqinfoGroupInfo.numOfTables,
// pRuntimeEnv->resultInfo.total);
}
if
(
status
==
DS_DATA_FULL
)
{
qDebug
(
"QInfo:0x%"
PRIx64
" query paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
0
,
0L
,
0
);
return
pTaskInfo
->
code
;
}
}
// return doBuildResCheck(pTaskInfo);
}
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
8f0f1f01
...
...
@@ -7181,6 +7181,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
pthread_mutex_init
(
&
pTaskInfo
->
lock
,
NULL
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
return
pTaskInfo
;
}
...
...
source/libs/qworker/inc/qworkerInt.h
浏览文件 @
8f0f1f01
...
...
@@ -67,11 +67,12 @@ typedef struct SQWTaskStatus {
bool
drop
;
}
SQWTaskStatus
;
typedef
struct
SQWorkerTaskHandleCache
{
typedef
struct
SQWorkerTaskHandle
s
Cache
{
SRWLatch
lock
;
bool
needRsp
;
qTaskInfo_t
taskHandle
;
DataSinkHandle
sinkHandle
;
}
SQWorkerTaskHandleCache
;
}
SQWorkerTaskHandle
s
Cache
;
typedef
struct
SQWSchStatus
{
int32_t
lastAccessTs
;
// timestamp in second
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
8f0f1f01
...
...
@@ -91,16 +91,16 @@ int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwAddTask
AndSink
ToCache
(
SQWorkerMgmt
*
mgmt
,
uint64_t
qId
,
uint64_t
tId
,
qTaskInfo_t
taskHandle
,
DataSinkHandle
sinkHandle
)
{
int32_t
qwAddTask
Handles
ToCache
(
SQWorkerMgmt
*
mgmt
,
uint64_t
qId
,
uint64_t
tId
,
qTaskInfo_t
taskHandle
,
DataSinkHandle
sinkHandle
)
{
char
id
[
sizeof
(
qId
)
+
sizeof
(
tId
)]
=
{
0
};
QW_SET_QTID
(
id
,
qId
,
tId
);
SQWorker
R
esCache
resCache
=
{
0
};
SQWorker
TaskHandl
esCache
resCache
=
{
0
};
resCache
.
taskHandle
=
taskHandle
;
resCache
.
sinkHandle
=
sinkHandle
;
QW_LOCK
(
QW_WRITE
,
&
mgmt
->
resLock
);
if
(
0
!=
taosHashPut
(
mgmt
->
resHash
,
id
,
sizeof
(
id
),
&
resCache
,
sizeof
(
SQWorker
R
esCache
)))
{
if
(
0
!=
taosHashPut
(
mgmt
->
resHash
,
id
,
sizeof
(
id
),
&
resCache
,
sizeof
(
SQWorker
TaskHandl
esCache
)))
{
QW_UNLOCK
(
QW_WRITE
,
&
mgmt
->
resLock
);
qError
(
"taosHashPut queryId[%"
PRIx64
"] taskId[%"
PRIx64
"] to resHash failed"
,
qId
,
tId
);
return
TSDB_CODE_QRY_APP_ERROR
;
...
...
@@ -249,13 +249,13 @@ static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_RET
(
code
);
}
static
FORCE_INLINE
int32_t
qwAcquireTask
ResCache
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
queryId
,
uint64_t
taskId
,
SQWorkerResCache
**
r
es
)
{
static
FORCE_INLINE
int32_t
qwAcquireTask
Handles
(
int32_t
rwType
,
SQWorkerMgmt
*
mgmt
,
uint64_t
queryId
,
uint64_t
taskId
,
SQWorkerTaskHandlesCache
**
handl
es
)
{
char
id
[
sizeof
(
queryId
)
+
sizeof
(
taskId
)]
=
{
0
};
QW_SET_QTID
(
id
,
queryId
,
taskId
);
QW_LOCK
(
rwType
,
&
mgmt
->
resLock
);
*
r
es
=
taosHashGet
(
mgmt
->
resHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
r
es
))
{
*
handl
es
=
taosHashGet
(
mgmt
->
resHash
,
id
,
sizeof
(
id
));
if
(
NULL
==
(
*
handl
es
))
{
QW_UNLOCK
(
rwType
,
&
mgmt
->
resLock
);
return
TSDB_CODE_QRY_RES_CACHE_NOT_EXIST
;
}
...
...
@@ -605,19 +605,34 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
SRpcMsg
*
pMsg
,
void
*
data
)
{
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
int32_t
qwInitFetchRsp
(
int32_t
length
,
SRetrieveTableRsp
**
rsp
)
{
int32_t
msgSize
=
sizeof
(
SRetrieveTableRsp
)
+
length
;
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
msgSize
);
if
(
NULL
==
pRsp
)
{
qError
(
"rpcMallocCont %d failed"
,
msgSize
);
QW_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
//TODO fill msg
pRsp
->
completed
=
true
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwBuildAndSendFetchRsp
(
SRpcMsg
*
pMsg
,
SRetrieveTableRsp
*
pRsp
,
int32_t
dataLength
,
int32_t
code
)
{
if
(
NULL
==
pRsp
)
{
pRsp
=
(
SRetrieveTableRsp
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableRsp
));
memset
(
pRsp
,
0
,
sizeof
(
SRetrieveTableRsp
));
dataLength
=
0
;
}
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
handle
,
.
ahandle
=
pMsg
->
ahandle
,
.
pCont
=
pRsp
,
.
contLen
=
sizeof
(
*
pRsp
),
.
code
=
0
,
.
contLen
=
sizeof
(
*
pRsp
)
+
dataLength
,
.
code
=
code
,
};
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -850,12 +865,16 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwHandleFetch
(
SQWorker
ResCache
*
r
es
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
int32_t
qwHandleFetch
(
SQWorker
TaskHandlesCache
*
handl
es
,
SQWorkerMgmt
*
mgmt
,
uint64_t
sId
,
uint64_t
queryId
,
uint64_t
taskId
,
SRpcMsg
*
pMsg
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
int32_t
needRsp
=
true
;
void
*
data
=
NULL
;
int32_t
sinkStatus
=
0
;
int32_t
dataLength
=
0
;
SRetrieveTableRsp
*
rsp
=
NULL
;
bool
queryEnd
=
false
;
QW_ERR_JRET
(
qwAcquireScheduler
(
QW_READ
,
mgmt
,
sId
,
&
sch
,
QW_NOT_EXIST_RET_ERR
));
QW_ERR_JRET
(
qwAcquireTask
(
QW_READ
,
sch
,
queryId
,
taskId
,
&
task
));
...
...
@@ -871,26 +890,61 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, u
qError
(
"invalid status %d for fetch"
,
task
->
status
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
code
=
dsGetDataLength
(
handles
->
sinkHandle
,
&
dataLength
,
&
queryEnd
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
qError
(
"dsGetDataLength failed, code:%x"
,
code
);
QW_ERR_JRET
(
code
);
}
if
(
QW_GOT_RES_DATA
(
res
->
data
))
{
data
=
res
->
data
;
if
(
QW_LOW_RES_DATA
(
res
->
data
))
{
if
(
task
->
status
==
JOB_TASK_STATUS_PARTIAL_SUCCEED
)
{
//TODO add query back to queue
}
if
(
dataLength
>
0
)
{
SOutPutData
output
=
{
0
};
QW_ERR_JRET
(
qwInitFetchRsp
(
dataLength
,
&
rsp
));
output
.
pData
=
rsp
->
data
;
code
=
dsGetDataBlock
(
handles
->
sinkHandle
,
&
output
);
if
(
code
)
{
qError
(
"dsGetDataBlock failed, code:%x"
,
code
);
QW_ERR_JRET
(
code
);
}
if
(
DS_BUF_EMPTY
==
output
.
bufStatus
&&
output
.
queryEnd
)
{
rsp
->
completed
=
1
;
}
if
(
output
.
needSchedule
)
{
//TODO
}
if
((
!
output
.
queryEnd
)
&&
DS_BUF_LOW
==
output
.
bufStatus
)
{
//TODO
//UPDATE STATUS TO EXECUTING
}
}
else
{
if
(
task
->
status
!=
JOB_TASK_STATUS_EXECUTING
)
{
qError
(
"invalid
status %d for fetch without res"
,
task
->
status
);
QW_ERR_JRET
(
TSDB_CODE_QRY_
APP_ERROR
);
if
(
dataLength
<
0
)
{
qError
(
"invalid
length from dsGetDataLength, length:%d"
,
dataLength
);
QW_ERR_JRET
(
TSDB_CODE_QRY_
INVALID_INPUT
);
}
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
if
(
queryEnd
)
{
QW_ERR_JRET
(
qwQueryPostProcess
(
mgmt
,
sId
,
queryId
,
taskId
,
JOB_TASK_STATUS_SUCCEED
,
code
));
}
else
{
if
(
task
->
status
!=
JOB_TASK_STATUS_EXECUTING
)
{
qError
(
"invalid status %d for fetch without res"
,
task
->
status
);
QW_ERR_JRET
(
TSDB_CODE_QRY_APP_ERROR
);
}
QW_LOCK
(
QW_WRITE
,
&
handles
->
lock
);
handles
->
needRsp
=
true
;
QW_UNLOCK
(
QW_WRITE
,
&
handles
->
lock
);
needRsp
=
false
;
needRsp
=
false
;
}
}
_return:
if
(
task
)
{
QW_UNLOCK
(
QW_READ
,
&
task
->
lock
);
qwReleaseTask
(
QW_READ
,
sch
);
...
...
@@ -901,7 +955,7 @@ _return:
}
if
(
needRsp
)
{
qwBuildAndSendFetchRsp
(
pMsg
,
r
es
->
data
);
qwBuildAndSendFetchRsp
(
pMsg
,
r
sp
,
dataLength
,
code
);
}
QW_RET
(
code
);
...
...
@@ -1011,7 +1065,6 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg
->
taskId
=
htobe64
(
msg
->
taskId
);
msg
->
contentLen
=
ntohl
(
msg
->
contentLen
);
bool
queryDone
=
false
;
bool
queryRsped
=
false
;
bool
needStop
=
false
;
struct
SSubplan
*
plan
=
NULL
;
...
...
@@ -1039,20 +1092,19 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_JRET
(
qwBuildAndSendQueryRsp
(
pMsg
,
TSDB_CODE_SUCCESS
));
queryRsped
=
true
;
SSDataBlock
*
pRes
=
NULL
;
code
=
qExecTask
(
pTaskInfo
,
&
pRes
);
queryDone
=
false
;
DataSinkHandle
sinkHandle
=
NULL
;
code
=
qExecTask
(
pTaskInfo
,
&
sinkHandle
);
if
(
code
)
{
QW_ERR_JRET
(
code
);
}
else
{
QW_ERR_JRET
(
qwAddTask
AndSinkToCache
(
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
pTaskInfo
,
NULL
));
QW_ERR_JRET
(
qwAddTask
HandlesToCache
(
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
pTaskInfo
,
sinkHandle
));
QW_ERR_JRET
(
qwUpdateTaskStatus
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
JOB_TASK_STATUS_PARTIAL_SUCCEED
));
}
_return:
if
(
queryRsped
)
{
code
=
qwCheckAndSendReadyRsp
(
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
,
code
);
}
else
{
...
...
@@ -1062,8 +1114,6 @@ _return:
int8_t
status
=
0
;
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
}
else
if
(
queryDone
)
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
else
{
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
}
...
...
@@ -1073,6 +1123,49 @@ _return:
QW_RET
(
code
);
}
int32_t
qWorkerProcessQueryContinueMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
uint64_t
sId
,
qId
,
tId
;
//TODO call executer to continue execute subquery
code
=
0
;
void
*
data
=
NULL
;
queryDone
=
false
;
//TODO call executer to continue execute subquery
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
}
else
if
(
queryDone
)
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
else
{
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
}
code
=
qwQueryPostProcess
(
qWorkerMgmt
,
sId
,
qId
,
tId
,
status
,
code
);
QW_RET
(
code
);
}
int32_t
qWorkerProcessSinkDataMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SSinkDataReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
qError
(
"invalid sink data msg"
);
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
//TODO
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessReadyMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
){
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
return
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -1135,12 +1228,12 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET
(
qwUpdateSchLastAccess
(
qWorkerMgmt
,
msg
->
sId
));
void
*
data
=
NULL
;
SQWorker
ResCache
*
r
es
=
NULL
;
SQWorker
TaskHandlesCache
*
handl
es
=
NULL
;
int32_t
code
=
0
;
QW_ERR_RET
(
qwAcquireTask
ResCache
(
QW_READ
,
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
&
r
es
));
QW_ERR_RET
(
qwAcquireTask
Handles
(
QW_READ
,
qWorkerMgmt
,
msg
->
queryId
,
msg
->
taskId
,
&
handl
es
));
QW_ERR_JRET
(
qwHandleFetch
(
r
es
,
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
QW_ERR_JRET
(
qwHandleFetch
(
handl
es
,
qWorkerMgmt
,
msg
->
sId
,
msg
->
queryId
,
msg
->
taskId
,
pMsg
));
_return:
...
...
@@ -1218,31 +1311,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
QW_ERR_RET
(
qwBuildAndSendShowFetchRsp
(
pMsg
,
pFetchReq
));
}
int32_t
qWorkerContinueQuery
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
int8_t
status
=
0
;
bool
queryDone
=
false
;
uint64_t
sId
,
qId
,
tId
;
//TODO call executer to continue execute subquery
code
=
0
;
void
*
data
=
NULL
;
queryDone
=
false
;
//TODO call executer to continue execute subquery
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
status
=
JOB_TASK_STATUS_FAILED
;
}
else
if
(
queryDone
)
{
status
=
JOB_TASK_STATUS_SUCCEED
;
}
else
{
status
=
JOB_TASK_STATUS_PARTIAL_SUCCEED
;
}
code
=
qwQueryPostProcess
(
qWorkerMgmt
,
sId
,
qId
,
tId
,
status
,
code
);
QW_RET
(
code
);
}
void
qWorkerDestroy
(
void
**
qWorkerMgmt
)
{
if
(
NULL
==
qWorkerMgmt
||
NULL
==
*
qWorkerMgmt
)
{
return
;
...
...
src/inc/tsdb.h
浏览文件 @
8f0f1f01
...
...
@@ -274,7 +274,7 @@ TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable
TsdbQueryHandleT
tsdbQueryCacheLast
(
STsdbRepo
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
SMemRef
*
pMemRef
);
bool
isTsdbCacheLastRow
(
TsdbQueryHandleT
*
p
Query
Handle
);
bool
isTsdbCacheLastRow
(
TsdbQueryHandleT
*
p
TsdbRead
Handle
);
/**
...
...
@@ -308,19 +308,19 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
*
* @param p
Query
Handle
* @param p
TsdbRead
Handle
* @return
*/
bool
tsdbNextDataBlock
(
TsdbQueryHandleT
p
Query
Handle
);
bool
tsdbNextDataBlock
(
TsdbQueryHandleT
p
TsdbRead
Handle
);
/**
* Get current data block information
*
* @param p
Query
Handle
* @param p
TsdbRead
Handle
* @param pBlockInfo
* @return
*/
void
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
p
Query
Handle
,
SDataBlockInfo
*
pBlockInfo
);
void
tsdbRetrieveDataBlockInfo
(
TsdbQueryHandleT
*
p
TsdbRead
Handle
,
SDataBlockInfo
*
pBlockInfo
);
/**
*
...
...
@@ -332,7 +332,7 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *p
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t
tsdbRetrieveDataBlockStatisInfo
(
TsdbQueryHandleT
*
p
Query
Handle
,
SDataStatis
**
pBlockStatis
);
int32_t
tsdbRetrieveDataBlockStatisInfo
(
TsdbQueryHandleT
*
p
TsdbRead
Handle
,
SDataStatis
**
pBlockStatis
);
/**
*
...
...
@@ -340,11 +340,11 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataSta
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param p
Query
Handle query handle
* @param p
TsdbRead
Handle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray
*
tsdbRetrieveDataBlock
(
TsdbQueryHandleT
*
p
Query
Handle
,
SArray
*
pColumnIdList
);
SArray
*
tsdbRetrieveDataBlock
(
TsdbQueryHandleT
*
p
TsdbRead
Handle
,
SArray
*
pColumnIdList
);
/**
* Get the qualified table id for a super table according to the tag query expression.
...
...
src/query/inc/qExecutor.h
浏览文件 @
8f0f1f01
...
...
@@ -284,7 +284,7 @@ typedef struct SQueryRuntimeEnv {
uint32_t
status
;
// query status
void
*
qinfo
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
void
*
p
Query
Handle
;
void
*
p
TsdbRead
Handle
;
int32_t
prevGroupId
;
// previous executed group id
bool
enableGroupData
;
...
...
@@ -418,7 +418,7 @@ typedef struct SQueryParam {
}
SQueryParam
;
typedef
struct
STableScanInfo
{
void
*
p
Query
Handle
;
void
*
p
TsdbRead
Handle
;
int32_t
numOfBlocks
;
int32_t
numOfSkipped
;
int32_t
numOfBlockStatis
;
...
...
src/query/src/qExecutor.c
浏览文件 @
8f0f1f01
...
...
@@ -2326,8 +2326,8 @@ _clean:
static
void
doFreeQueryHandle
(
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
SQueryAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
p
Query
Handle
);
pRuntimeEnv
->
p
Query
Handle
=
NULL
;
tsdbCleanupQueryHandle
(
pRuntimeEnv
->
p
TsdbRead
Handle
);
pRuntimeEnv
->
p
TsdbRead
Handle
=
NULL
;
SMemRef
*
pMemRef
=
&
pQueryAttr
->
memRef
;
assert
(
pMemRef
->
ref
==
0
&&
pMemRef
->
snapshot
.
imem
==
NULL
&&
pMemRef
->
snapshot
.
mem
==
NULL
);
...
...
@@ -3148,10 +3148,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
}
else
if
((
*
status
)
==
BLK_DATA_STATIS_NEEDED
)
{
// this function never returns error?
pCost
->
loadBlockStatis
+=
1
;
tsdbRetrieveDataBlockStatisInfo
(
pTableScanInfo
->
p
Query
Handle
,
&
pBlock
->
pBlockStatis
);
tsdbRetrieveDataBlockStatisInfo
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
pBlock
->
pBlockStatis
);
if
(
pBlock
->
pBlockStatis
==
NULL
)
{
// data block statistics does not exist, load data block
pBlock
->
pDataBlock
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
p
Query
Handle
,
NULL
);
pBlock
->
pDataBlock
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
p
TsdbRead
Handle
,
NULL
);
pCost
->
totalCheckedRows
+=
pBlock
->
info
.
rows
;
}
}
else
{
...
...
@@ -3159,7 +3159,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// load the data block statistics to perform further filter
pCost
->
loadBlockStatis
+=
1
;
tsdbRetrieveDataBlockStatisInfo
(
pTableScanInfo
->
p
Query
Handle
,
&
pBlock
->
pBlockStatis
);
tsdbRetrieveDataBlockStatisInfo
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
pBlock
->
pBlockStatis
);
if
(
pQueryAttr
->
topBotQuery
&&
pBlock
->
pBlockStatis
!=
NULL
)
{
{
// set previous window
...
...
@@ -3205,7 +3205,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
pCost
->
totalCheckedRows
+=
pBlockInfo
->
rows
;
pCost
->
loadBlocks
+=
1
;
pBlock
->
pDataBlock
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
p
Query
Handle
,
NULL
);
pBlock
->
pDataBlock
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
p
TsdbRead
Handle
,
NULL
);
if
(
pBlock
->
pDataBlock
==
NULL
)
{
return
terrno
;
}
...
...
@@ -4494,7 +4494,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//
// assert(pQueryAttr->pos >= 0 && pQueryAttr->pos <= pBlockInfo->rows - 1);
//
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
Query
Handle, NULL);
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
TsdbRead
Handle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// // update the pQueryAttr->limit.offset value, and pQueryAttr->pos value
...
...
@@ -4521,15 +4521,15 @@ void queryCostStatis(SQInfo *pQInfo) {
// int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
//
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// TsdbQueryHandleT p
QueryHandle = pRuntimeEnv->pQuery
Handle;
// TsdbQueryHandleT p
TsdbReadHandle = pRuntimeEnv->pTsdbRead
Handle;
//
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(p
Query
Handle)) {
// while (tsdbNextDataBlock(p
TsdbRead
Handle)) {
// if (isQueryKilled(pRuntimeEnv->qinfo)) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// }
//
// tsdbRetrieveDataBlockInfo(p
Query
Handle, &blockInfo);
// tsdbRetrieveDataBlockInfo(p
TsdbRead
Handle, &blockInfo);
//
// if (pQueryAttr->limit.offset > blockInfo.rows) {
// pQueryAttr->limit.offset -= blockInfo.rows;
...
...
@@ -4562,7 +4562,7 @@ void queryCostStatis(SQInfo *pQInfo) {
//
// // load the data block and check data remaining in current data block
// // TODO optimize performance
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
Query
Handle, NULL);
// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
TsdbRead
Handle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// tw = *win;
...
...
@@ -4627,8 +4627,8 @@ void queryCostStatis(SQInfo *pQInfo) {
// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
//
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pRuntimeEnv->p
Query
Handle)) {
// tsdbRetrieveDataBlockInfo(pRuntimeEnv->p
Query
Handle, &blockInfo);
// while (tsdbNextDataBlock(pRuntimeEnv->p
TsdbRead
Handle)) {
// tsdbRetrieveDataBlockInfo(pRuntimeEnv->p
TsdbRead
Handle, &blockInfo);
//
// if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
...
...
@@ -4674,7 +4674,7 @@ void queryCostStatis(SQInfo *pQInfo) {
// */
// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) {
//
// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
Query
Handle, NULL);
// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->p
TsdbRead
Handle, NULL);
// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0);
//
// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) {
...
...
@@ -4748,7 +4748,7 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
terrno
=
TSDB_CODE_SUCCESS
;
if
(
isFirstLastRowQuery
(
pQueryAttr
))
{
pRuntimeEnv
->
p
Query
Handle
=
tsdbQueryLastRow
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
pRuntimeEnv
->
p
TsdbRead
Handle
=
tsdbQueryLastRow
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
// update the query time window
pQueryAttr
->
window
=
cond
.
twindow
;
...
...
@@ -4769,11 +4769,11 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64
}
}
}
else
if
(
isCachedLastQuery
(
pQueryAttr
))
{
pRuntimeEnv
->
p
Query
Handle
=
tsdbQueryCacheLast
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
pRuntimeEnv
->
p
TsdbRead
Handle
=
tsdbQueryCacheLast
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
}
else
if
(
pQueryAttr
->
pointInterpQuery
)
{
pRuntimeEnv
->
p
Query
Handle
=
tsdbQueryRowsInExternalWindow
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
pRuntimeEnv
->
p
TsdbRead
Handle
=
tsdbQueryRowsInExternalWindow
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
}
else
{
pRuntimeEnv
->
p
Query
Handle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
pRuntimeEnv
->
p
TsdbRead
Handle
=
tsdbQueryTables
(
tsdb
,
&
cond
,
&
pQueryAttr
->
tableGroupInfo
,
qId
,
&
pQueryAttr
->
memRef
);
}
return
terrno
;
...
...
@@ -4831,19 +4831,19 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
switch
(
tbScanner
)
{
case
OP_TableBlockInfoScan
:
{
pRuntimeEnv
->
proot
=
createTableBlockInfoScanOperator
(
pRuntimeEnv
->
p
Query
Handle
,
pRuntimeEnv
);
pRuntimeEnv
->
proot
=
createTableBlockInfoScanOperator
(
pRuntimeEnv
->
p
TsdbRead
Handle
,
pRuntimeEnv
);
break
;
}
case
OP_TableSeqScan
:
{
pRuntimeEnv
->
proot
=
createTableSeqScanOperator
(
pRuntimeEnv
->
p
Query
Handle
,
pRuntimeEnv
);
pRuntimeEnv
->
proot
=
createTableSeqScanOperator
(
pRuntimeEnv
->
p
TsdbRead
Handle
,
pRuntimeEnv
);
break
;
}
case
OP_DataBlocksOptScan
:
{
pRuntimeEnv
->
proot
=
createDataBlocksOptScanInfo
(
pRuntimeEnv
->
p
Query
Handle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
),
pQueryAttr
->
needReverseScan
?
1
:
0
);
pRuntimeEnv
->
proot
=
createDataBlocksOptScanInfo
(
pRuntimeEnv
->
p
TsdbRead
Handle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
),
pQueryAttr
->
needReverseScan
?
1
:
0
);
break
;
}
case
OP_TableScan
:
{
pRuntimeEnv
->
proot
=
createTableScanOperator
(
pRuntimeEnv
->
p
Query
Handle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
));
pRuntimeEnv
->
proot
=
createTableScanOperator
(
pRuntimeEnv
->
p
TsdbRead
Handle
,
pRuntimeEnv
,
getNumOfScanTimes
(
pQueryAttr
));
break
;
}
default:
{
// do nothing
...
...
@@ -4974,13 +4974,13 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) {
*
newgroup
=
false
;
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
p
Query
Handle
))
{
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
p
TsdbRead
Handle
))
{
if
(
isQueryKilled
(
pOperator
->
pRuntimeEnv
->
qinfo
))
{
longjmp
(
pOperator
->
pRuntimeEnv
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
pTableScanInfo
->
numOfBlocks
+=
1
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
p
Query
Handle
,
&
pBlock
->
info
);
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
pBlock
->
info
);
// todo opt
if
(
pTableGroupInfo
->
numOfTables
>
1
||
(
pRuntimeEnv
->
current
==
NULL
&&
pTableGroupInfo
->
numOfTables
==
1
))
{
...
...
@@ -5037,7 +5037,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
}
if
(
++
pTableScanInfo
->
current
>=
pTableScanInfo
->
times
)
{
if
(
pTableScanInfo
->
reverseTimes
<=
0
||
isTsdbCacheLastRow
(
pTableScanInfo
->
p
Query
Handle
))
{
if
(
pTableScanInfo
->
reverseTimes
<=
0
||
isTsdbCacheLastRow
(
pTableScanInfo
->
p
TsdbRead
Handle
))
{
return
NULL
;
}
else
{
break
;
...
...
@@ -5046,7 +5046,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
// do prepare for the next round table scan operation
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQueryAttr
,
&
pQueryAttr
->
window
);
tsdbResetQueryHandle
(
pTableScanInfo
->
p
Query
Handle
,
&
cond
);
tsdbResetQueryHandle
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
cond
);
setQueryStatus
(
pRuntimeEnv
,
QUERY_NOT_COMPLETED
);
pRuntimeEnv
->
scanFlag
=
REPEAT_SCAN
;
...
...
@@ -5069,7 +5069,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
setupEnvForReverseScan
(
pRuntimeEnv
,
pTableScanInfo
->
pResultRowInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
);
STsdbQueryCond
cond
=
createTsdbQueryCond
(
pQueryAttr
,
&
pQueryAttr
->
window
);
tsdbResetQueryHandle
(
pTableScanInfo
->
p
Query
Handle
,
&
cond
);
tsdbResetQueryHandle
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
cond
);
qDebug
(
"QInfo:0x%"
PRIx64
" start to reverse scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_QID
(
pRuntimeEnv
),
cond
.
twindow
.
skey
,
cond
.
twindow
.
ekey
);
...
...
@@ -5112,8 +5112,8 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
tableBlockDist
.
maxRows
=
INT_MIN
;
tableBlockDist
.
minRows
=
INT_MAX
;
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
p
Query
Handle
,
&
tableBlockDist
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
p
Query
Handle
);
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
p
TsdbRead
Handle
,
&
tableBlockDist
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
p
TsdbRead
Handle
);
SSDataBlock
*
pBlock
=
&
pTableScanInfo
->
block
;
pBlock
->
info
.
rows
=
1
;
...
...
@@ -5142,7 +5142,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
p
Query
Handle
=
pTsdbQueryHandle
;
pInfo
->
p
TsdbRead
Handle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
...
...
@@ -5165,7 +5165,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo
*
createTableSeqScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
p
Query
Handle
=
pTsdbQueryHandle
;
pInfo
->
p
TsdbRead
Handle
=
pTsdbQueryHandle
;
pInfo
->
times
=
1
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
...
...
@@ -5189,7 +5189,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbQueryHandle
,
SQueryRuntimeEnv
*
pRuntimeEnv
)
{
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
p
Query
Handle
=
pTsdbQueryHandle
;
pInfo
->
p
TsdbRead
Handle
=
pTsdbQueryHandle
;
pInfo
->
block
.
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
SColumnInfoData
infoData
=
{{
0
}};
...
...
@@ -5271,7 +5271,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
assert
(
repeatTime
>
0
);
STableScanInfo
*
pInfo
=
calloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
p
Query
Handle
=
pTsdbQueryHandle
;
pInfo
->
p
TsdbRead
Handle
=
pTsdbQueryHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
current
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录