Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a34d3442
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
a34d3442
编写于
7月 12, 2022
作者:
L
Liu Jicong
提交者:
GitHub
7月 12, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14808 from taosdata/feature/stream
refactor(tmq): prepare only needed
上级
558829dc
8e3f5135
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
245 addition
and
169 deletion
+245
-169
include/libs/executor/executor.h
include/libs/executor/executor.h
+9
-6
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+3
-0
include/util/tlog.h
include/util/tlog.h
+1
-1
source/client/src/tmq.c
source/client/src/tmq.c
+29
-31
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+13
-12
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-0
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+12
-13
source/dnode/vnode/src/tq/tqExec.c
source/dnode/vnode/src/tq/tqExec.c
+7
-5
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+14
-6
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+40
-43
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+25
-1
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+43
-36
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+10
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+14
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+8
-6
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+2
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+2
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+5
-1
未找到文件。
include/libs/executor/executor.h
浏览文件 @
a34d3442
...
...
@@ -42,25 +42,28 @@ typedef struct SReadHandle {
bool
initTqReader
;
}
SReadHandle
;
// in queue mode, data streams are seperated by msg
typedef
enum
{
OPTR_EXEC_MODEL_BATCH
=
0x1
,
OPTR_EXEC_MODEL_STREAM
=
0x2
,
OPTR_EXEC_MODEL_QUEUE
=
0x3
,
}
EOPTR_EXEC_MODEL
;
/**
* Create the exec task for stream
ing
mode
* Create the exec task for stream mode
* @param pMsg
* @param
stream
ReadHandle
* @param
S
ReadHandle
* @return
*/
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
);
/**
* Switch the stream scan to snapshot mode
* @param tinfo
* Create the exec task for queue mode
* @param pMsg
* @param SReadHandle
* @return
*/
int32_t
qStreamScanSnapshot
(
qTaskInfo_t
tinfo
);
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
);
/**
* Set the input data block for the stream scan.
...
...
@@ -111,7 +114,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @return
*/
int32_t
qGetQueryTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
);
int32_t
*
tversion
);
/**
* The main task execution function, including query on both table and multiple tables,
...
...
include/libs/stream/tstream.h
浏览文件 @
a34d3442
...
...
@@ -14,6 +14,7 @@
*/
#include "os.h"
#include "query.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsgcb.h"
...
...
@@ -119,6 +120,7 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue
static
FORCE_INLINE
void
*
streamQueueNextItem
(
SStreamQueue
*
queue
)
{
int8_t
dequeueFlag
=
atomic_exchange_8
(
&
queue
->
status
,
STREAM_QUEUE__PROCESSING
);
if
(
dequeueFlag
==
STREAM_QUEUE__FAILED
)
{
ASSERT
(
0
);
ASSERT
(
queue
->
qItem
!=
NULL
);
return
streamQueueCurItem
(
queue
);
}
else
{
...
...
@@ -305,6 +307,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
atomic_store_8
(
&
pTask
->
inputStatus
,
TASK_INPUT_STATUS__FAILED
);
return
-
1
;
}
qInfo
(
"task %d %p submit enqueue %p %p %p"
,
pTask
->
taskId
,
pTask
,
pItem
,
pSubmitClone
,
pSubmitClone
->
data
);
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pSubmitClone
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
...
...
include/util/tlog.h
浏览文件 @
a34d3442
...
...
@@ -94,7 +94,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
#define pError(...) { taosPrintLog("APP ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", DEBUG_INFO, 255, __VA_ARGS__); }
// clang-format on
#define BUF_PAGE_DEBUG
//
#define BUF_PAGE_DEBUG
#ifdef __cplusplus
}
#endif
...
...
source/client/src/tmq.c
浏览文件 @
a34d3442
...
...
@@ -1149,11 +1149,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderInit
(
&
decoder
,
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
pMsg
->
len
-
sizeof
(
SMqRspHead
));
tDecodeSMqDataRsp
(
&
decoder
,
&
pRspWrapper
->
dataRsp
);
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
/*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
}
else
{
ASSERT
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
);
memcpy
(
&
pRspWrapper
->
metaRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqMetaRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
metaRsp
);
memcpy
(
&
pRspWrapper
->
metaRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
taosMemoryFree
(
pMsg
->
pData
);
...
...
@@ -2427,15 +2426,15 @@ static void destroyCreateTbReqBatch(void* data) {
taosArrayDestroy
(
pTbBatch
->
req
.
pArray
);
}
static
int32_t
taosCreateTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVCreateTbBatchReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SHashObj
*
pVgroupHashmap
=
NULL
;
static
int32_t
taosCreateTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVCreateTbBatchReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SHashObj
*
pVgroupHashmap
=
NULL
;
code
=
buildRequest
(
*
(
int64_t
*
)
taos
,
""
,
0
,
NULL
,
false
,
&
pRequest
);
code
=
buildRequest
(
*
(
int64_t
*
)
taos
,
""
,
0
,
NULL
,
false
,
&
pRequest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
...
...
@@ -2455,8 +2454,8 @@ static int32_t taosCreateTable(TAOS *taos, void *meta, int32_t metaLen){
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SVCreateTbReq
*
pCreateReq
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
SVCreateTbReq
*
pCreateReq
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
...
...
@@ -2540,13 +2539,13 @@ static void destroyDropTbReqBatch(void* data) {
taosArrayDestroy
(
pTbBatch
->
req
.
pArray
);
}
static
int32_t
taosDropTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVDropTbBatchReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SHashObj
*
pVgroupHashmap
=
NULL
;
static
int32_t
taosDropTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVDropTbBatchReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SHashObj
*
pVgroupHashmap
=
NULL
;
code
=
buildRequest
(
*
(
int64_t
*
)
taos
,
""
,
0
,
NULL
,
false
,
&
pRequest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -2568,8 +2567,8 @@ static int32_t taosDropTable(TAOS *taos, void *meta, int32_t metaLen){
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SVDropTbReq
*
pDropReq
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
SVDropTbReq
*
pDropReq
=
NULL
;
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
...
...
@@ -2640,17 +2639,16 @@ end:
return
code
;
}
static
int32_t
taosAlterTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
){
SVAlterTbReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SArray
*
pArray
=
NULL
;
SVgDataBlocks
*
pVgData
=
NULL
;
static
int32_t
taosAlterTable
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVAlterTbReq
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SArray
*
pArray
=
NULL
;
SVgDataBlocks
*
pVgData
=
NULL
;
code
=
buildRequest
(
*
(
int64_t
*
)
taos
,
""
,
0
,
NULL
,
false
,
&
pRequest
);
code
=
buildRequest
(
*
(
int64_t
*
)
taos
,
""
,
0
,
NULL
,
false
,
&
pRequest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
end
;
}
...
...
source/common/src/tdatablock.c
浏览文件 @
a34d3442
...
...
@@ -1738,56 +1738,57 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t
colNum
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
len
=
0
;
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
%s |block type %d |child id %d|group id:%"
PRIu64
"|
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
groupId
);
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
%s |block type %d |child id %d|group id:%"
PRIu64
"| uid:%ld
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
groupId
,
pDataBlock
->
info
.
uid
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"%s |"
,
flag
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
colDataIsNull
(
pColInfoData
,
rows
,
j
,
NULL
)
||
!
pColInfoData
->
pData
)
{
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
"NULL"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
continue
;
}
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
formatTimestamp
(
pBuf
,
*
(
uint64_t
*
)
var
,
TSDB_TIME_PRECISION_MILLI
);
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %25s |"
,
pBuf
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_INT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15d |"
,
*
(
int32_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_UINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15u |"
,
*
(
uint32_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_BIGINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15ld |"
,
*
(
int64_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_UBIGINT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15lu |"
,
*
(
uint64_t
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_FLOAT
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15f |"
,
*
(
float
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
case
TSDB_DATA_TYPE_DOUBLE
:
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15lf |"
,
*
(
double
*
)
var
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
break
;
}
}
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
"
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
if
(
len
>=
size
-
1
)
return
dumpBuf
;
}
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"%s |end
\n
"
,
flag
);
return
dumpBuf
;
...
...
source/common/src/tmsg.c
浏览文件 @
a34d3442
...
...
@@ -5526,6 +5526,11 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
ASSERT
(
0
);
// TODO
return
pLeft
->
uid
==
pRight
->
uid
&&
pLeft
->
ts
==
pRight
->
ts
;
}
else
{
ASSERT
(
0
);
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEAST ||*/
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
/*return true;*/
}
}
return
false
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
a34d3442
...
...
@@ -89,8 +89,6 @@ typedef struct {
STqExecTb
execTb
;
STqExecDb
execDb
;
};
// TODO remove it
int64_t
tsdbEndVer
;
}
STqExecHandle
;
...
...
@@ -101,6 +99,8 @@ typedef struct {
int32_t
epoch
;
int8_t
fetchMeta
;
int64_t
snapshotVer
;
// TODO remove
SWalReader
*
pWalReader
;
...
...
@@ -131,7 +131,7 @@ typedef struct {
static
STqMgmt
tqMgmt
=
{
0
};
// tqRead
int64_t
tqScan
(
STQ
*
pTq
,
const
STq
ExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
offset
);
int64_t
tqScan
(
STQ
*
pTq
,
const
STq
Handle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
offset
);
int64_t
tqFetchLog
(
STQ
*
pTq
,
STqHandle
*
pHandle
,
int64_t
*
fetchOffset
,
SWalCkHead
**
pHeadWithCkSum
);
// tqExec
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a34d3442
...
...
@@ -284,7 +284,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffsetNew
=
pOffset
->
val
;
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
&
fetchOffsetNew
);
tqDebug
(
"tmq poll: consumer %"
PRId64
", subkey %s, offset reset to %s"
,
consumerId
,
pHandle
->
subKey
,
formatBuf
);
tqDebug
(
"tmq poll: consumer %"
PRId64
", subkey %s, vg %d, offset reset to %s"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
formatBuf
);
}
else
{
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_EARLIEAST
)
{
if
(
pReq
->
useSnapshot
&&
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
...
...
@@ -299,8 +300,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
}
else
if
(
reqOffset
.
type
==
TMQ_OFFSET__RESET_LATEST
)
{
tqOffsetResetToLog
(
&
dataRsp
.
rspOffset
,
walGetLastVer
(
pTq
->
pVnode
->
pWal
));
tqDebug
(
"tmq poll: consumer %ld, subkey %s, offset reset to %ld"
,
consumerId
,
pHandle
->
subKey
,
dataRsp
.
rspOffset
.
version
);
tqDebug
(
"tmq poll: consumer %ld, subkey %s,
vg %d,
offset reset to %ld"
,
consumerId
,
pHandle
->
subKey
,
TD_VID
(
pTq
->
pVnode
),
dataRsp
.
rspOffset
.
version
);
if
(
tqSendDataRsp
(
pTq
,
pMsg
,
pReq
,
&
dataRsp
)
<
0
)
{
code
=
-
1
;
}
...
...
@@ -318,10 +319,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 3.query
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
fetchOffsetNew
.
type
==
TMQ_OFFSET__LOG
)
{
fetchOffsetNew
.
version
++
;
}
if
(
tqScan
(
pTq
,
&
pHandle
->
exec
Handle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
/*if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {*/
/*fetchOffsetNew.version++;*/
/*}*/
if
(
tqScan
(
pTq
,
p
Handle
,
&
dataRsp
,
&
fetchOffsetNew
)
<
0
)
{
ASSERT
(
0
);
code
=
-
1
;
goto
OVER
;
...
...
@@ -480,30 +481,28 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
fetchMeta
=
req
.
withMeta
;
pHandle
->
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
/*for (int32_t i = 0; i < 5; i++) {*/
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
// TODO version should be assigned in preprocess
int64_t
ver
=
walGetCommittedVer
(
pTq
->
pVnode
->
pWal
);
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pHandle
->
execHandle
.
execCol
.
qmsg
=
req
.
qmsg
;
pHandle
->
snapshotVer
=
ver
;
req
.
qmsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
handle
=
{
.
tqReader
=
pHandle
->
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
vnode
=
pTq
->
pVnode
,
.
initTableReader
=
true
,
.
initTqReader
=
true
,
.
version
=
ver
,
};
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreate
Stream
ExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
pHandle
->
execHandle
.
execCol
.
task
[
i
]
=
qCreate
Queue
ExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
);
ASSERT
(
pHandle
->
execHandle
.
execCol
.
task
[
i
]);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
pHandle
->
execHandle
.
execCol
.
task
[
i
],
&
scanner
);
ASSERT
(
scanner
);
pHandle
->
execHandle
.
pExecReader
[
i
]
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
pHandle
->
execHandle
.
pExecReader
[
i
]);
pHandle
->
execHandle
.
tsdbEndVer
=
ver
;
}
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__DB
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
...
...
source/dnode/vnode/src/tq/tqExec.c
浏览文件 @
a34d3442
...
...
@@ -59,13 +59,13 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return
0
;
}
int64_t
tqScan
(
STQ
*
pTq
,
const
STqExecHandle
*
pExec
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
0
];
int64_t
tqScan
(
STQ
*
pTq
,
const
STqHandle
*
pHandle
,
SMqDataRsp
*
pRsp
,
STqOffsetVal
*
pOffset
)
{
const
STqExecHandle
*
pExec
=
&
pHandle
->
execHandle
;
qTaskInfo_t
task
=
pExec
->
execCol
.
task
[
0
];
if
(
qStreamPrepareScan
(
task
,
pOffset
)
<
0
)
{
ASSERT
(
pOffset
->
type
==
TMQ_OFFSET__LOG
);
pRsp
->
rspOffset
=
*
pOffset
;
pRsp
->
rspOffset
.
version
--
;
return
0
;
}
...
...
@@ -73,9 +73,11 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
tqDebug
(
"task start to execute"
);
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT
(
0
);
}
tqDebug
(
"task execute end, get %p"
,
pDataBlock
);
if
(
pDataBlock
!=
NULL
)
{
tqAddBlockDataToRsp
(
pDataBlock
,
pRsp
);
...
...
@@ -97,7 +99,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
}
if
(
pRsp
->
blockNum
==
0
&&
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
tqOffsetResetToLog
(
pOffset
,
p
Exec
->
tsdbEnd
Ver
+
1
);
tqOffsetResetToLog
(
pOffset
,
p
Handle
->
snapshot
Ver
+
1
);
qStreamPrepareScan
(
task
,
pOffset
);
continue
;
}
...
...
@@ -116,7 +118,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
ASSERT
(
pRsp
->
rspOffset
.
version
+
1
>=
pRsp
->
reqOffset
.
version
);
}
tqDebug
(
"task exec exited"
);
break
;
}
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
a34d3442
...
...
@@ -19,6 +19,7 @@ static int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pHandle
->
snapshotVer
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
...
...
@@ -32,6 +33,7 @@ static int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pDecoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pHandle
->
consumerId
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pHandle
->
snapshotVer
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pHandle
->
epoch
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pHandle
->
execHandle
.
subType
)
<
0
)
return
-
1
;
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
...
...
@@ -78,19 +80,25 @@ int32_t tqMetaOpen(STQ* pTq) {
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqOpenReader
(
pTq
->
pVnode
);
}
/*for (int32_t i = 0; i < 5; i++) {*/
/*handle.execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
/*}*/
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
tqReader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
.
initTableReader
=
true
,
.
initTqReader
=
true
,
.
version
=
handle
.
snapshotVer
,
};
handle
.
execHandle
.
execCol
.
task
[
i
]
=
qCreate
Stream
ExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
);
handle
.
execHandle
.
execCol
.
task
[
i
]
=
qCreate
Queue
ExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
execCol
.
task
[
i
]);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
handle
.
execHandle
.
execCol
.
task
[
i
],
&
scanner
);
ASSERT
(
scanner
);
handle
.
execHandle
.
pExecReader
[
i
]
=
qExtractReaderFromStreamScanner
(
scanner
);
ASSERT
(
handle
.
execHandle
.
pExecReader
[
i
]);
}
}
else
{
handle
.
execHandle
.
execDb
.
pFilterOutTbUid
=
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
a34d3442
...
...
@@ -63,15 +63,15 @@ typedef struct SBlockLoadSuppInfo {
}
SBlockLoadSuppInfo
;
typedef
struct
SFilesetIter
{
int32_t
numOfFiles
;
// number of total files
int32_t
index
;
// current accessed index in the list
SArray
*
pFileList
;
// data file list
int32_t
order
;
int32_t
numOfFiles
;
// number of total files
int32_t
index
;
// current accessed index in the list
SArray
*
pFileList
;
// data file list
int32_t
order
;
}
SFilesetIter
;
typedef
struct
SFileDataBlockInfo
{
int32_t
tbBlockIdx
;
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
tbBlockIdx
;
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t
uid
;
}
SFileDataBlockInfo
;
...
...
@@ -119,10 +119,10 @@ struct STsdbReader {
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo
suppInfo
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SDataFReader
*
pFileReader
;
SVersionRange
verRange
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
SDataFReader
*
pFileReader
;
SVersionRange
verRange
;
};
static
SFileDataBlockInfo
*
getCurrentBlockInfo
(
SDataBlockIter
*
pBlockIter
);
...
...
@@ -287,9 +287,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, const STsdbFSState* pFSt
return
TSDB_CODE_SUCCESS
;
}
static
void
cleanupFilesetIterator
(
SFilesetIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
pFileList
);
}
static
void
cleanupFilesetIterator
(
SFilesetIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
pFileList
);
}
static
bool
filesetIteratorNext
(
SFilesetIter
*
pIter
,
STsdbReader
*
pReader
)
{
bool
asc
=
ASCENDING_TRAVERSE
(
pIter
->
order
);
...
...
@@ -304,6 +302,7 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
STimeWindow
win
=
{
0
};
while
(
1
)
{
/*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/
pReader
->
status
.
pCurrentFileset
=
(
SDFileSet
*
)
taosArrayGet
(
pIter
->
pFileList
,
pIter
->
index
);
int32_t
code
=
tsdbDataFReaderOpen
(
&
pReader
->
pFileReader
,
pReader
->
pTsdb
,
pReader
->
status
.
pCurrentFileset
);
...
...
@@ -349,9 +348,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
}
}
static
void
cleanupDataBlockIterator
(
SDataBlockIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
blockList
);
}
static
void
cleanupDataBlockIterator
(
SDataBlockIter
*
pIter
)
{
taosArrayDestroy
(
pIter
->
blockList
);
}
static
void
initReaderStatus
(
SReaderStatus
*
pStatus
)
{
pStatus
->
pTableIter
=
NULL
;
...
...
@@ -392,8 +389,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
initReaderStatus
(
&
pReader
->
status
);
pReader
->
pTsdb
=
getTsdbByRetentions
(
pVnode
,
pCond
->
twindows
.
skey
,
pVnode
->
config
.
tsdbCfg
.
retentions
,
idstr
,
&
level
);
pReader
->
pTsdb
=
getTsdbByRetentions
(
pVnode
,
pCond
->
twindows
.
skey
,
pVnode
->
config
.
tsdbCfg
.
retentions
,
idstr
,
&
level
);
pReader
->
suid
=
pCond
->
suid
;
pReader
->
order
=
pCond
->
order
;
pReader
->
capacity
=
4096
;
...
...
@@ -833,7 +829,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
uint8_t
*
pb
=
NULL
,
*
pb1
=
NULL
;
int32_t
code
=
tsdbReadColData
(
pReader
->
pFileReader
,
&
pBlockScanInfo
->
blockIdx
,
pBlock
,
pSupInfo
->
colIds
,
numOfCols
,
pBlockData
,
&
pb
,
&
pb1
);
pBlockData
,
&
pb
,
&
pb1
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -1459,18 +1455,18 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
}
TSDBKEY
*
pFirst
=
taosArrayGet
(
pBlockScanInfo
->
delSkyline
,
0
);
TSDBKEY
*
pLast
=
taosArrayGetLast
(
pBlockScanInfo
->
delSkyline
);
TSDBKEY
*
pLast
=
taosArrayGetLast
(
pBlockScanInfo
->
delSkyline
);
// ts is not overlap
if
(
pBlock
->
minKey
.
ts
>
pLast
->
ts
||
pBlock
->
maxKey
.
ts
<
pFirst
->
ts
)
{
return
false
;
}
int32_t
step
=
ASCENDING_TRAVERSE
(
order
)
?
1
:
-
1
;
int32_t
step
=
ASCENDING_TRAVERSE
(
order
)
?
1
:
-
1
;
// version is not overlap
size_t
num
=
taosArrayGetSize
(
pBlockScanInfo
->
delSkyline
);
for
(
int32_t
i
=
pBlockScanInfo
->
fileDelIndex
;
i
<
num
;
i
+=
step
)
{
for
(
int32_t
i
=
pBlockScanInfo
->
fileDelIndex
;
i
<
num
;
i
+=
step
)
{
TSDBKEY
*
p
=
taosArrayGet
(
pBlockScanInfo
->
delSkyline
,
i
);
if
(
p
->
ts
>=
pBlock
->
minKey
.
ts
&&
p
->
ts
<=
pBlock
->
maxKey
.
ts
)
{
if
(
p
->
version
>=
pBlock
->
minVersion
)
{
...
...
@@ -1502,8 +1498,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
}
// has duplicated ts of different version in this block
bool
hasDup
=
(
pBlock
->
nSubBlock
==
1
)
?
pBlock
->
hasDup
:
true
;
bool
overlapWithDel
=
overlapWithDelSkyline
(
pScanInfo
,
pBlock
,
pReader
->
order
);
bool
hasDup
=
(
pBlock
->
nSubBlock
==
1
)
?
pBlock
->
hasDup
:
true
;
bool
overlapWithDel
=
overlapWithDelSkyline
(
pScanInfo
,
pBlock
,
pReader
->
order
);
return
(
overlapWithNeighbor
||
hasDup
||
dataBlockPartiallyRequired
(
&
pReader
->
window
,
&
pReader
->
verRange
,
pBlock
)
||
keyOverlapFileBlock
(
key
,
pBlock
,
&
pReader
->
verRange
)
||
(
pBlock
->
nRow
>
pReader
->
capacity
)
||
overlapWithDel
);
...
...
@@ -2220,17 +2216,18 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
}
SVersionRange
getQueryVerRange
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
int8_t
level
)
{
int64_t
startVer
=
(
pCond
->
startVersion
==
-
1
)
?
0
:
pCond
->
startVersion
;
int64_t
startVer
=
(
pCond
->
startVersion
==
-
1
)
?
0
:
pCond
->
startVersion
;
if
(
VND_IS_RSMA
(
pVnode
))
{
return
(
SVersionRange
){.
minVer
=
startVer
,
.
maxVer
=
tdRSmaGetMaxSubmitVer
(
pVnode
->
pSma
,
level
)};
}
int64_t
endVer
=
0
;
if
(
pCond
->
endVersion
==
-
1
)
{
// user not specified end version, set current maximum version of vnode as the endVersion
if
(
pCond
->
endVersion
==
-
1
)
{
// user not specified end version, set current maximum version of vnode as the endVersion
endVer
=
pVnode
->
state
.
applied
;
}
else
{
endVer
=
(
pCond
->
endVersion
>
pVnode
->
state
.
applied
)
?
pVnode
->
state
.
applied
:
pCond
->
endVersion
;
endVer
=
(
pCond
->
endVersion
>
pVnode
->
state
.
applied
)
?
pVnode
->
state
.
applied
:
pCond
->
endVersion
;
}
return
(
SVersionRange
){.
minVer
=
startVer
,
.
maxVer
=
endVer
};
...
...
@@ -2274,9 +2271,9 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if
(
pDelList
==
NULL
)
{
return
false
;
}
size_t
num
=
taosArrayGetSize
(
pDelList
);
bool
asc
=
ASCENDING_TRAVERSE
(
order
);
int32_t
step
=
asc
?
1
:
-
1
;
size_t
num
=
taosArrayGetSize
(
pDelList
);
bool
asc
=
ASCENDING_TRAVERSE
(
order
);
int32_t
step
=
asc
?
1
:
-
1
;
if
(
asc
)
{
if
(
*
index
>=
num
-
1
)
{
...
...
@@ -2823,7 +2820,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree
(
pSupInfo
->
colIds
);
taosArrayDestroy
(
pSupInfo
->
pColAgg
);
for
(
int32_t
i
=
0
;
i
<
blockDataGetNumOfCols
(
pReader
->
pResBlock
);
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
blockDataGetNumOfCols
(
pReader
->
pResBlock
);
++
i
)
{
if
(
pSupInfo
->
buildBuf
[
i
]
!=
NULL
)
{
taosMemoryFreeClear
(
pSupInfo
->
buildBuf
[
i
]);
}
...
...
@@ -2835,7 +2832,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
destroyBlockScanInfo
(
pReader
->
status
.
pTableMap
);
blockDataDestroy
(
pReader
->
pResBlock
);
if
(
pReader
->
pFileReader
!=
NULL
)
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
#if 0
// if (pReader->status.pTableScanInfo != NULL) {
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
...
...
@@ -3011,8 +3008,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
return
TSDB_CODE_SUCCESS
;
}
pReader
->
order
=
pCond
->
order
;
pReader
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pReader
->
order
=
pCond
->
order
;
pReader
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pReader
->
status
.
loadFromFile
=
true
;
pReader
->
status
.
pTableIter
=
NULL
;
...
...
@@ -3028,6 +3025,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t
numOfTables
=
1
;
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
fs
->
cState
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pFState
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
...
...
@@ -3114,13 +3113,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo
->
numOfBlocks
+=
pBlockIter
->
numOfBlocks
;
}
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
/*
hasNext = blockIteratorNext(&pStatus->blockIter);
*/
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
return
code
;
...
...
@@ -3158,7 +3156,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
return
rows
;
}
int32_t
tsdbGetTableSchema
(
SVnode
*
pVnode
,
int64_t
uid
,
STSchema
**
pSchema
,
int64_t
*
suid
)
{
int32_t
tsdbGetTableSchema
(
SVnode
*
pVnode
,
int64_t
uid
,
STSchema
**
pSchema
,
int64_t
*
suid
)
{
int32_t
sversion
=
1
;
SMetaReader
mr
=
{
0
};
...
...
@@ -3171,7 +3169,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
}
*
suid
=
0
;
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
*
suid
=
mr
.
me
.
ctbEntry
.
suid
;
code
=
metaGetTableEntryByUid
(
&
mr
,
*
suid
);
...
...
@@ -3188,8 +3186,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
metaReaderClear
(
&
mr
);
*
pSchema
=
metaGetTbTSchema
(
pVnode
->
pMeta
,
uid
,
sversion
);
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/executor.c
浏览文件 @
a34d3442
...
...
@@ -106,6 +106,30 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
return
code
;
}
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
)
{
if
(
msg
==
NULL
)
{
// TODO create raw scan
return
NULL
;
}
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
}
qTaskInfo_t
pTaskInfo
=
NULL
;
code
=
qCreateExecTask
(
readers
,
0
,
0
,
plan
,
&
pTaskInfo
,
NULL
,
NULL
,
OPTR_EXEC_MODEL_QUEUE
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// TODO: destroy SSubplan & pTaskInfo
terrno
=
code
;
return
NULL
;
}
return
pTaskInfo
;
}
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
)
{
if
(
msg
==
NULL
)
{
return
NULL
;
...
...
@@ -186,7 +210,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
int32_t
qGetQueryTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
int32_t
*
tversion
)
{
ASSERT
(
tinfo
!=
NULL
&&
dbName
!=
NULL
&&
tableName
!=
NULL
);
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
a34d3442
...
...
@@ -269,13 +269,13 @@ const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
void
*
qStreamExtractMetaMsg
(
qTaskInfo_t
tinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
STREAM
);
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
QUEUE
);
return
pTaskInfo
->
streamInfo
.
metaBlk
;
}
int32_t
qStreamExtractOffset
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
STREAM
);
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
QUEUE
);
memcpy
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
,
sizeof
(
STqOffsetVal
));
return
0
;
}
...
...
@@ -283,35 +283,41 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
const
STqOffsetVal
*
pOffset
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
STREAM
);
ASSERT
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_
QUEUE
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
// TODO: optimize
/*if (pTaskInfo->streamInfo.lastStatus.type != pOffset->type ||*/
/*pTaskInfo->streamInfo.prepareStatus.version != pTaskInfo->streamInfo.lastStatus.version) {*/
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
if
(
!
tOffsetEqual
(
pOffset
,
&
pTaskInfo
->
streamInfo
.
lastStatus
))
{
while
(
1
)
{
uint8_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
}
if
(
pTaskInfo
->
streamInfo
.
lastStatus
.
type
!=
TMQ_OFFSET__SNAPSHOT_DATA
||
pTaskInfo
->
streamInfo
.
lastStatus
.
uid
!=
uid
||
pTaskInfo
->
streamInfo
.
lastStatus
.
ts
!=
ts
)
{
#endif
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
+
1
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
+
1
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
if
(
uid
==
0
)
{
if
(
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
taosArrayGet
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
tableSz
=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
bool
found
=
false
;
...
...
@@ -320,6 +326,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
break
;
}
}
...
...
@@ -335,18 +342,18 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
tableSz
);
}
/*}*/
}
else
{
ASSERT
(
0
);
}
return
0
;
}
else
{
ASSERT
(
0
);
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
return
0
;
}
else
{
ASSERT
(
pOperator
->
numOfDownstream
==
1
);
pOperator
=
pOperator
->
pDownstream
[
0
];
}
}
/*}*/
return
0
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a34d3442
...
...
@@ -3235,6 +3235,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_QUEUE
)
{
pOperator
->
status
=
OP_OPENED
;
return
NULL
;
}
return
NULL
;
}
...
...
@@ -3268,11 +3272,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
while
(
1
)
{
// The downstream exec may change the value of the newgroup, so use a local variable instead.
qDebug
(
"projection call next"
);
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
// TODO optimize
/*if (pTaskInfo->execModel != OPTR_EXEC_MODEL_STREAM) {*/
qDebug
(
"projection get null"
);
/*if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH) {*/
doSetOperatorCompleted
(
pOperator
);
/*} else if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {*/
/*pOperator->status = OP_RES_TO_RETURN;*/
/*}*/
break
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
a34d3442
...
...
@@ -1236,6 +1236,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
/*return NULL;*/
/*}*/
qDebug
(
"stream scan called"
);
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__LOG
)
{
while
(
1
)
{
SFetchRet
ret
=
{
0
};
...
...
@@ -1247,6 +1248,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
// TODO clean data block
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
qDebug
(
"stream scan log return %d rows"
,
pInfo
->
pRes
->
info
.
rows
);
return
pInfo
->
pRes
;
}
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__META
)
{
...
...
@@ -1257,6 +1259,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
)
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
ASSERT
(
pTaskInfo
->
streamInfo
.
lastStatus
.
version
+
1
>=
pTaskInfo
->
streamInfo
.
prepareStatus
.
version
);
qDebug
(
"stream scan log return null"
);
return
NULL
;
}
else
{
ASSERT
(
0
);
...
...
@@ -1264,7 +1267,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
else
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
return
pResult
&&
pResult
->
info
.
rows
>
0
?
pResult
:
NULL
;
if
(
pResult
&&
pResult
->
info
.
rows
>
0
)
{
qDebug
(
"stream scan tsdb return %d rows"
,
pResult
->
info
.
rows
);
return
pResult
;
}
qDebug
(
"stream scan tsdb return null"
);
return
NULL
;
}
else
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
// TODO scan meta
ASSERT
(
0
);
...
...
@@ -1287,6 +1295,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pBlock
->
info
.
calWin
.
ekey
=
INT64_MAX
;
blockDataUpdateTsWindow
(
pBlock
,
0
);
switch
(
pBlock
->
info
.
type
)
{
case
STREAM_NORMAL
:
case
STREAM_GET_ALL
:
return
pBlock
;
case
STREAM_RETRIEVE
:
{
pInfo
->
blockType
=
STREAM_INPUT__DATA_SUBMIT
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
;
...
...
@@ -1316,6 +1327,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
return
pBlock
;
}
else
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_SUBMIT
)
{
qDebug
(
"scan mode %d"
,
pInfo
->
scanMode
);
if
(
pInfo
->
scanMode
==
STREAM_SCAN_FROM_RES
)
{
blockDataDestroy
(
pInfo
->
pUpdateRes
);
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
...
...
@@ -1410,7 +1422,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
}
}
qDebug
(
"scan rows: %d"
,
pBlockInfo
->
rows
);
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
#if 0
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a34d3442
...
...
@@ -1350,13 +1350,13 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
if
(
chIds
&&
pPullDataMap
)
{
SArray
*
chAy
=
*
(
SArray
**
)
chIds
;
int32_t
size
=
taosArrayGetSize
(
chAy
);
q
Info
(
"window %"
PRId64
" wait child size:%d"
,
win
.
skey
,
size
);
q
Debug
(
"window %"
PRId64
" wait child size:%d"
,
win
.
skey
,
size
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
q
Info
(
"window %"
PRId64
" wait chid id:%d"
,
win
.
skey
,
*
(
int32_t
*
)
taosArrayGet
(
chAy
,
i
));
q
Debug
(
"window %"
PRId64
" wait chid id:%d"
,
win
.
skey
,
*
(
int32_t
*
)
taosArrayGet
(
chAy
,
i
));
}
continue
;
}
else
if
(
pPullDataMap
)
{
q
Info
(
"close window %"
PRId64
,
win
.
skey
);
q
Debug
(
"close window %"
PRId64
,
win
.
skey
);
}
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
if
(
pSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
...
...
@@ -2509,8 +2509,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
if
(
IS_FINAL_OP
(
pInfo
))
{
forwardRows
=
1
;
}
else
{
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
}
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdated
)
{
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
...
...
@@ -2627,6 +2627,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
qDebug
(
"interval status %d %s"
,
pOperator
->
status
,
IS_FINAL_OP
(
pInfo
)
?
"interval Final"
:
"interval Semi"
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
...
...
@@ -2677,7 +2679,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock
(
pInfo
->
pUpdateRes
);
removeDeleteResults
(
pUpdated
,
pInfo
->
pDelWins
);
pOperator
->
status
=
OP_RES_TO_RETURN
;
q
Info
(
"%s return data"
,
IS_FINAL_OP
(
pInfo
)
?
"interval Final"
:
"interval Semi"
);
q
Debug
(
"%s return data"
,
IS_FINAL_OP
(
pInfo
)
?
"interval Final"
:
"interval Semi"
);
break
;
}
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval Final recv"
:
"interval Semi recv"
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
a34d3442
...
...
@@ -173,7 +173,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
}
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
qInfo
(
"task %d receive dispatch req from node %d task %d"
,
pTask
->
taskId
,
pReq
->
upstreamNodeId
,
pReq
->
upstreamTaskId
);
qDebug
(
"task %d receive dispatch req from node %d task %d"
,
pTask
->
taskId
,
pReq
->
upstreamNodeId
,
pReq
->
upstreamTaskId
);
// 1. handle input
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
a34d3442
...
...
@@ -26,10 +26,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
ASSERT
(
pTask
->
isDataScan
);
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
qDebug
(
"task %d %p set submit input %p %p %d"
,
pTask
->
taskId
,
pTask
,
pSubmit
,
pSubmit
->
data
,
*
pSubmit
->
dataRef
);
qSetStreamInput
(
exec
,
pSubmit
->
data
,
STREAM_INPUT__DATA_SUBMIT
,
false
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
SArray
*
blocks
=
pBlock
->
blocks
;
qDebug
(
"task %d %p set ssdata input"
,
pTask
->
taskId
,
pTask
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__DATA_BLOCK
,
false
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DROP
)
{
// TODO exec drop
...
...
source/libs/wal/src/walRead.c
浏览文件 @
a34d3442
...
...
@@ -66,6 +66,7 @@ void walCloseReader(SWalReader *pRead) {
}
int32_t
walNextValidMsg
(
SWalReader
*
pRead
)
{
wDebug
(
"vgId:%d wal start to fetch"
,
pRead
->
pWal
->
cfg
.
vgId
);
int64_t
fetchVer
=
pRead
->
curVersion
;
int64_t
endVer
=
pRead
->
cond
.
scanUncommited
?
walGetLastVer
(
pRead
->
pWal
)
:
walGetCommittedVer
(
pRead
->
pWal
);
while
(
fetchVer
<=
endVer
)
{
...
...
@@ -176,7 +177,7 @@ int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) {
return
-
1
;
}
wDebug
(
"wal version reset from %ld
to %ld"
,
pRead
->
curVersion
,
ver
);
wDebug
(
"wal version reset from %ld
(invalid: %d) to %ld"
,
pRead
->
curVersion
,
pRead
->
curInvalid
,
ver
);
pRead
->
curVersion
=
ver
;
return
0
;
...
...
@@ -242,6 +243,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
return
-
1
;
}
}
pRead
->
curInvalid
=
0
;
return
0
;
}
...
...
@@ -301,6 +303,7 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
int64_t
code
;
ASSERT
(
pRead
->
curVersion
==
pRead
->
pHead
->
head
.
version
);
ASSERT
(
pRead
->
curInvalid
==
0
);
code
=
taosLSeekFile
(
pRead
->
pLogFile
,
pRead
->
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
...
...
@@ -404,6 +407,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
}
int32_t
walReadVer
(
SWalReader
*
pRead
,
int64_t
ver
)
{
wDebug
(
"vgId:%d wal start to read ver %ld"
,
pRead
->
pWal
->
cfg
.
vgId
,
ver
);
int64_t
contLen
;
bool
seeked
=
false
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录