Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d947619b
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d947619b
编写于
3月 30, 2023
作者:
H
Haojun Liao
提交者:
GitHub
3月 30, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #20691 from taosdata/feature/3_liaohj
fix(stream):fix the race condition when creating new tables.
上级
a778dc8c
71d26202
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
187 addition
and
163 deletion
+187
-163
include/libs/executor/executor.h
include/libs/executor/executor.h
+2
-1
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+8
-4
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+5
-6
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+5
-5
source/dnode/vnode/src/tq/tqScan.c
source/dnode/vnode/src/tq/tqScan.c
+3
-3
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-0
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+12
-19
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+16
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+112
-101
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+3
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+15
-18
tests/system-test/2-query/columnLenUpdated.py
tests/system-test/2-query/columnLenUpdated.py
+1
-1
tests/system-test/7-tmq/subscribeDb.py
tests/system-test/7-tmq/subscribeDb.py
+2
-2
tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py
tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py
+2
-0
未找到文件。
include/libs/executor/executor.h
浏览文件 @
d947619b
...
...
@@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
* @param SReadHandle
* @return
*/
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchema
);
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
uint64_t
id
);
/**
* set the task Id, usually used by message queue process
...
...
@@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
void
qSetTaskId
(
qTaskInfo_t
tinfo
,
uint64_t
taskId
,
uint64_t
queryId
);
int32_t
qSetStreamOpOpen
(
qTaskInfo_t
tinfo
);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
...
...
source/client/src/clientTmq.c
浏览文件 @
d947619b
...
...
@@ -535,10 +535,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
atomic_add_fetch_32
(
&
pParamSet
->
totalRspNum
,
1
);
SEp
*
pEp
=
GET_ACTIVE_EP
(
&
pVg
->
epSet
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send offset:%"
PRId64
" prev:%"
PRId64
", ep:%s:%d, ordinal:%d/%d, req:0x%"
PRIx64
,
tmq
->
consumerId
,
pOffset
->
subKey
,
pVg
->
vgId
,
pOffset
->
val
.
version
,
pVg
->
committedOffset
.
version
,
pEp
->
fqdn
,
pEp
->
port
,
index
+
1
,
totalVgroups
,
pMsgSendInfo
->
requestId
);
char
offsetBuf
[
80
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pOffset
->
val
);
char
commitBuf
[
80
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
committedOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%"
PRIx64
,
tmq
->
consumerId
,
pOffset
->
subKey
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
pEp
->
fqdn
,
pEp
->
port
,
index
+
1
,
totalVgroups
,
pMsgSendInfo
->
requestId
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d947619b
...
...
@@ -399,8 +399,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char
formatBuf
[
80
];
tFormatOffset
(
formatBuf
,
80
,
pOffsetVal
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.
"
,
consumerId
,
pHandle
->
subKey
,
vgId
,
formatBuf
);
tqDebug
(
"tmq poll: consumer:0x%"
PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.
reqId:0x%"
PRIx64
,
consumerId
,
pHandle
->
subKey
,
vgId
,
formatBuf
,
pRequest
->
reqId
);
return
0
;
}
else
{
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
...
...
@@ -799,7 +799,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle
tqHandle
=
{
0
};
pHandle
=
&
tqHandle
;
/*taosInitRWLatch(&pExec->lock);*/
uint64_t
oldConsumerId
=
pHandle
->
consumerId
;
memcpy
(
pHandle
->
subKey
,
req
.
subKey
,
TSDB_SUBSCRIBE_KEY_LEN
);
...
...
@@ -834,7 +833,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req
.
qmsg
=
NULL
;
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
,
vgId
,
&
pHandle
->
execHandle
.
numOfCols
,
NULL
);
qCreateQueueExecTaskInfo
(
pHandle
->
execHandle
.
execCol
.
qmsg
,
&
handle
,
vgId
,
&
pHandle
->
execHandle
.
numOfCols
,
req
.
newConsumerId
);
void
*
scanner
=
NULL
;
qExtractStreamScanner
(
pHandle
->
execHandle
.
task
,
&
scanner
);
pHandle
->
execHandle
.
pExecReader
=
qExtractReaderFromStreamScanner
(
scanner
);
...
...
@@ -847,7 +846,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext
(
handle
.
meta
,
handle
.
version
,
0
,
pHandle
->
execHandle
.
subType
,
pHandle
->
fetchMeta
,
(
SSnapContext
**
)(
&
handle
.
sContext
));
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
vgId
,
NULL
,
NULL
);
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
vgId
,
NULL
,
req
.
newConsumerId
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
pHandle
->
pWalReader
=
walOpenReader
(
pVnode
->
pWal
,
NULL
);
pHandle
->
execHandle
.
execTb
.
suid
=
req
.
suid
;
...
...
@@ -865,7 +864,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext
(
handle
.
meta
,
handle
.
version
,
req
.
suid
,
pHandle
->
execHandle
.
subType
,
pHandle
->
fetchMeta
,
(
SSnapContext
**
)(
&
handle
.
sContext
));
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
vgId
,
NULL
,
NULL
);
pHandle
->
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
handle
,
vgId
,
NULL
,
req
.
newConsumerId
);
}
taosHashPut
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
d947619b
...
...
@@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
,
vgId
,
&
handle
.
execHandle
.
numOfCols
,
NULL
);
qCreateQueueExecTaskInfo
(
handle
.
execHandle
.
execCol
.
qmsg
,
&
reader
,
vgId
,
&
handle
.
execHandle
.
numOfCols
,
0
);
if
(
handle
.
execHandle
.
task
==
NULL
)
{
tqError
(
"cannot create exec task for %s"
,
handle
.
subKey
);
code
=
-
1
;
...
...
@@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext
(
reader
.
meta
,
reader
.
version
,
0
,
handle
.
execHandle
.
subType
,
handle
.
fetchMeta
,
(
SSnapContext
**
)(
&
reader
.
sContext
));
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
reader
,
vgId
,
NULL
,
NULL
);
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
reader
,
vgId
,
NULL
,
0
);
}
else
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
handle
.
pWalReader
=
walOpenReader
(
pTq
->
pVnode
->
pWal
,
NULL
);
...
...
@@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
tqDebug
(
"vgId:%d, tq try to get all ctb, suid:%"
PRId64
,
pTq
->
pVnode
->
config
.
vgId
,
handle
.
execHandle
.
execTb
.
suid
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
tqDebug
(
"vgId:%d, idx %d, uid:%"
PRId64
,
TD_VID
(
pTq
->
pVnode
)
,
i
,
tbUid
);
tqDebug
(
"vgId:%d, idx %d, uid:%"
PRId64
,
vgId
,
i
,
tbUid
);
}
handle
.
execHandle
.
pExecReader
=
tqOpenReader
(
pTq
->
pVnode
);
tqReaderSetTbUidList
(
handle
.
execHandle
.
pExecReader
,
tbUidList
);
...
...
@@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext
(
reader
.
meta
,
reader
.
version
,
handle
.
execHandle
.
execTb
.
suid
,
handle
.
execHandle
.
subType
,
handle
.
fetchMeta
,
(
SSnapContext
**
)(
&
reader
.
sContext
));
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
reader
,
vgId
,
NULL
,
NULL
);
handle
.
execHandle
.
task
=
qCreateQueueExecTaskInfo
(
NULL
,
&
reader
,
vgId
,
NULL
,
0
);
}
tqDebug
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
.
subKey
,
handle
.
consumerId
,
TD_VID
(
pTq
->
pVnode
)
);
tqDebug
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
.
subKey
,
handle
.
consumerId
,
vgId
);
taosHashPut
(
pTq
->
pHandle
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
}
...
...
source/dnode/vnode/src/tq/tqScan.c
浏览文件 @
d947619b
...
...
@@ -74,14 +74,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
qTaskInfo_t
task
=
pExec
->
task
;
if
(
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
)
<
0
)
{
tqDebug
(
"prepare scan failed,
return, consumer:0x%"
PRIx64
,
pHandle
->
consumerId
);
tqDebug
(
"prepare scan failed,
vgId:%d, consumer:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
);
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
pRsp
->
rspOffset
=
*
pOffset
;
return
code
;
}
else
{
tqOffsetResetToLog
(
pOffset
,
pHandle
->
snapshotVer
);
if
(
qStreamPrepareScan
(
task
,
pOffset
,
pHandle
->
execHandle
.
subType
)
<
0
)
{
tqDebug
(
"prepare scan failed,
return, consumer:0x%"
PRIx64
,
pHandle
->
consumerId
);
tqDebug
(
"prepare scan failed,
vgId:%d, consumer:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
);
pRsp
->
rspOffset
=
*
pOffset
;
return
code
;
}
...
...
@@ -92,7 +92,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t
ts
=
0
;
tqDebug
(
"vgId:%d, tmq task start to execute, consumer:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
);
tqDebug
(
"vgId:%d, tmq task start to execute, consumer:0x%"
PRIx64
,
vgId
,
pHandle
->
consumerId
);
code
=
qExecTask
(
task
,
&
pDataBlock
,
&
ts
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/libs/executor/inc/executil.h
浏览文件 @
d947619b
...
...
@@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t
uint64_t
tableListGetSize
(
const
STableListInfo
*
pTableList
);
uint64_t
tableListGetSuid
(
const
STableListInfo
*
pTableList
);
STableKeyInfo
*
tableListGetInfo
(
const
STableListInfo
*
pTableList
,
int32_t
index
);
int32_t
tableListFind
(
const
STableListInfo
*
pTableList
,
uint64_t
uid
,
int32_t
startIndex
);
size_t
getResultRowSize
(
struct
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
d947619b
...
...
@@ -143,10 +143,7 @@ typedef struct {
SQueryTableDataCond
tableCond
;
int64_t
fillHistoryVer1
;
int64_t
fillHistoryVer2
;
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
SStreamState
*
pState
;
SStreamState
*
pState
;
}
SStreamTaskInfo
;
typedef
struct
{
...
...
@@ -168,15 +165,14 @@ typedef struct STaskStopInfo {
}
STaskStopInfo
;
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version, why not move to sschemainfo
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
STableListInfo
*
pTableInfoList
;
// this is a table list
...
...
@@ -188,6 +184,7 @@ struct SExecTaskInfo {
SLocalFetch
localFetch
;
SArray
*
pResultBlockList
;
// result block list
STaskStopInfo
stopInfo
;
SRWLatch
lock
;
// secure the access of STableListInfo
};
enum
{
...
...
@@ -485,12 +482,6 @@ typedef struct SStreamScanInfo {
}
SStreamScanInfo
;
typedef
struct
{
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode
*
vnode
;
SSDataBlock
pRes
;
// result SSDataBlock
STsdbReader
*
dataReader
;
...
...
@@ -693,6 +684,8 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SExecTaskInfo
*
doCreateExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
);
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_explain_fn_t
explain
);
int32_t
optrDummyOpenFn
(
SOperatorInfo
*
pOperator
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
d947619b
...
...
@@ -34,7 +34,7 @@ struct STableListInfo {
int32_t
numOfOuputGroups
;
// the data block will be generated one by one
int32_t
*
groupOffset
;
// keep the offset value for each group in the tableList
SArray
*
pTableList
;
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
SHashObj
*
map
;
// speedup acquire the tableQueryInfo by table uid
uint64_t
suid
;
};
...
...
@@ -1804,6 +1804,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index)
return
taosArrayGet
(
pTableList
->
pTableList
,
index
);
}
int32_t
tableListFind
(
const
STableListInfo
*
pTableList
,
uint64_t
uid
,
int32_t
startIndex
)
{
int32_t
numOfTables
=
taosArrayGetSize
(
pTableList
->
pTableList
);
if
(
startIndex
>=
numOfTables
)
{
return
-
1
;
}
for
(
int32_t
i
=
startIndex
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
p
=
taosArrayGet
(
pTableList
->
pTableList
,
i
);
if
(
p
->
uid
==
uid
)
{
return
i
;
}
}
return
-
1
;
}
uint64_t
getTableGroupId
(
const
STableListInfo
*
pTableList
,
uint64_t
tableUid
)
{
int32_t
*
slot
=
taosHashGet
(
pTableList
->
map
,
&
tableUid
,
sizeof
(
tableUid
));
ASSERT
(
pTableList
->
map
!=
NULL
&&
slot
!=
NULL
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
d947619b
...
...
@@ -242,30 +242,28 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return
code
;
}
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchema
)
{
if
(
msg
==
NULL
)
{
// create raw scan
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
uint64_t
id
)
{
if
(
msg
==
NULL
)
{
// create raw scan
SExecTaskInfo
*
pTaskInfo
=
doCreateExecTaskInfo
(
0
,
id
,
vgId
,
OPTR_EXEC_MODEL_QUEUE
,
""
);
if
(
NULL
==
pTaskInfo
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampUs
();
pTaskInfo
->
execModel
=
OPTR_EXEC_MODEL_QUEUE
;
pTaskInfo
->
pRoot
=
createRawScanOperatorInfo
(
pReaderHandle
,
pTaskInfo
);
if
(
NULL
==
pTaskInfo
->
pRoot
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pTaskInfo
);
return
NULL
;
}
qDebug
(
"create raw scan task info completed, vgId:%d, %s"
,
vgId
,
GET_TASKID
(
pTaskInfo
));
return
pTaskInfo
;
}
struct
SSubplan
*
pPlan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
pPlan
);
int32_t
code
=
qStringToSubplan
(
msg
,
&
pPlan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
...
...
@@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
}
}
if
(
pSchema
)
{
*
pSchema
=
tCloneSSchemaWrapper
(((
SExecTaskInfo
*
)
pTaskInfo
)
->
schemaInfo
.
qsw
);
}
return
pTaskInfo
;
}
...
...
@@ -410,6 +405,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
STableListInfo
*
pTableListInfo
=
pTaskInfo
->
pTableInfoList
;
taosWLockLatch
(
&
pTaskInfo
->
lock
);
for
(
int32_t
i
=
0
;
i
<
numOfQualifiedTables
;
++
i
)
{
uint64_t
*
uid
=
taosArrayGet
(
qa
,
i
);
...
...
@@ -424,6 +420,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
keyBuf
);
taosArrayDestroy
(
qa
);
taosWUnLockLatch
(
&
pTaskInfo
->
lock
);
return
code
;
}
}
...
...
@@ -445,6 +442,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
tableListAddTableInfo
(
pTableListInfo
,
keyInfo
.
uid
,
keyInfo
.
groupId
);
}
taosWUnLockLatch
(
&
pTaskInfo
->
lock
);
if
(
keyBuf
!=
NULL
)
{
taosMemoryFree
(
keyBuf
);
}
...
...
@@ -452,7 +450,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
taosArrayDestroy
(
qa
);
}
else
{
// remove the table id in current list
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
taosWLockLatch
(
&
pTaskInfo
->
lock
);
code
=
tqReaderRemoveTbUidList
(
pScanInfo
->
tqReader
,
tableIdList
);
taosWUnLockLatch
(
&
pTaskInfo
->
lock
);
}
return
code
;
...
...
@@ -1000,6 +1000,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
}
return
0
;
}
bool
qStreamRecoverScanFinished
(
qTaskInfo_t
tinfo
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
return
pTaskInfo
->
streamInfo
.
recoverScanFinished
;
...
...
@@ -1080,8 +1081,11 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
}
int32_t
qStreamPrepareScan
(
qTaskInfo_t
tinfo
,
STqOffsetVal
*
pOffset
,
int8_t
subType
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
STableListInfo
*
pTableListInfo
=
pTaskInfo
->
pTableInfoList
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
pTaskInfo
->
streamInfo
.
prepareStatus
=
*
pOffset
;
pTaskInfo
->
streamInfo
.
returned
=
0
;
...
...
@@ -1094,21 +1098,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO add more check
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
!=
1
)
{
qError
(
"
pOperator->numOfDownstream != 1:%d"
,
pOperator
->
numOfDownstream
);
if
(
pOperator
->
numOfDownstream
!=
1
)
{
qError
(
"
invalid operator, number of downstream:%d, %s"
,
pOperator
->
numOfDownstream
,
id
);
return
-
1
;
}
pOperator
=
pOperator
->
pDownstream
[
0
];
}
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
STableScanInfo
*
pScanInfo
=
pInfo
->
pTableScanOp
->
info
;
STableScanBase
*
pScanBaseInfo
=
&
pScanInfo
->
base
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
base
.
dataReader
)
;
pTSInfo
->
base
.
dataReader
=
NULL
;
tsdbReaderClose
(
pScanBaseInfo
->
dataReader
)
;
pScanBaseInfo
->
dataReader
=
NULL
;
// let's seek to the next version in wal file
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
+
1
,
pTaskInfo
->
id
.
str
)
<
0
)
{
qError
(
"tqSeekVer failed ver:%"
PRId64
,
pOffset
->
version
+
1
);
qError
(
"tqSeekVer failed ver:%"
PRId64
", %s"
,
pOffset
->
version
+
1
,
id
);
return
-
1
;
}
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
...
...
@@ -1116,121 +1123,125 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// those data are from the snapshot in tsdb, besides the data in the wal file.
int64_t
uid
=
pOffset
->
uid
;
int64_t
ts
=
pOffset
->
ts
;
int32_t
index
=
0
;
// this value may be changed if new tables are created
taosRLockLatch
(
&
pTaskInfo
->
lock
);
int32_t
numOfTables
=
tableListGetSize
(
pTableListInfo
);
if
(
uid
==
0
)
{
if
(
tableListGetSize
(
pTaskInfo
->
pTableInfoList
)
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
tableListGetInfo
(
pTa
skInfo
->
pTableInfoList
,
0
);
if
(
numOfTables
!=
0
)
{
STableKeyInfo
*
pTableInfo
=
tableListGetInfo
(
pTa
bleListInfo
,
0
);
uid
=
pTableInfo
->
uid
;
ts
=
INT64_MIN
;
pScanInfo
->
currentTable
=
0
;
}
else
{
qError
(
"uid == 0 and tablelist size is 0"
);
taosRUnLockLatch
(
&
pTaskInfo
->
lock
);
qError
(
"no table in table list, %s"
,
id
);
return
-
1
;
}
}
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
int32_t
numOfTables
=
tableListGetSize
(
pTaskInfo
->
pTableInfoList
);
qDebug
(
"switch to table uid:%"
PRId64
" ts:%"
PRId64
"% "
PRId64
" rows returned"
,
uid
,
ts
,
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
);
pInfo
->
pTableScanOp
->
resultInfo
.
totalRows
=
0
;
bool
found
=
false
;
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
i
++
)
{
STableKeyInfo
*
pTableInfo
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
i
);
if
(
pTableInfo
->
uid
==
uid
)
{
found
=
true
;
pTableScanInfo
->
currentTable
=
i
;
break
;
}
}
// TODO after dropping table, table may not found
if
(
!
found
){
qError
(
"uid not found in tablelist %"
PRId64
,
uid
);
// start from current accessed position
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
// position, let's find it from the beginning.
index
=
tableListFind
(
pTableListInfo
,
uid
,
0
);
taosRUnLockLatch
(
&
pTaskInfo
->
lock
);
if
(
index
>=
0
)
{
pScanInfo
->
currentTable
=
index
;
}
else
{
qError
(
"vgId:%d uid:%"
PRIu64
" not found in table list, total:%d, index:%d %s"
,
pTaskInfo
->
id
.
vgId
,
uid
,
numOfTables
,
pScanInfo
->
currentTable
,
id
);
return
-
1
;
}
if
(
pTableScanInfo
->
base
.
dataReader
==
NULL
)
{
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
0
);
int32_t
num
=
tableListGetSize
(
pTaskInfo
->
pTableInfoList
);
STableKeyInfo
keyInfo
=
{.
uid
=
uid
};
int64_t
oldSkey
=
pScanBaseInfo
->
cond
.
twindows
.
skey
;
// let's start from the next ts that returned to consumer.
pScanBaseInfo
->
cond
.
twindows
.
skey
=
ts
+
1
;
pScanInfo
->
scanTimes
=
0
;
if
(
tsdbReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
pTableScanInfo
->
base
.
cond
,
pList
,
num
,
pTableScanInfo
->
pResBlock
,
&
pTableScanInfo
->
base
.
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
base
.
dataReader
==
NULL
)
{
qError
(
"tsdbReaderOpen failed. uid:%"
PRIi64
,
pOffset
->
uid
);
if
(
pScanBaseInfo
->
dataReader
==
NULL
)
{
int32_t
code
=
tsdbReaderOpen
(
pScanBaseInfo
->
readHandle
.
vnode
,
&
pScanBaseInfo
->
cond
,
&
keyInfo
,
1
,
pScanInfo
->
pResBlock
,
&
pScanBaseInfo
->
dataReader
,
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"prepare read tsdb snapshot failed, uid:%"
PRId64
", code:%s %s"
,
pOffset
->
uid
,
tstrerror
(
code
),
id
);
terrno
=
code
;
return
-
1
;
}
}
STableKeyInfo
tki
=
{.
uid
=
uid
};
tsdbSetTableList
(
pTableScanInfo
->
base
.
dataReader
,
&
tki
,
1
);
int64_t
oldSkey
=
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
;
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
=
oldSkey
;
pTableScanInfo
->
scanTimes
=
0
;
qDebug
(
"tsdb reader created with offset(snapshot) uid:%"
PRId64
" ts:%"
PRId64
" table index:%d, total:%d, %s"
,
uid
,
pScanBaseInfo
->
cond
.
twindows
.
skey
,
pScanInfo
->
currentTable
,
numOfTables
,
id
);
}
else
{
tsdbSetTableList
(
pScanBaseInfo
->
dataReader
,
&
keyInfo
,
1
);
tsdbReaderReset
(
pScanBaseInfo
->
dataReader
,
&
pScanBaseInfo
->
cond
);
qDebug
(
"tsdb reader offset seek snapshot to uid:%"
PRId64
" ts %"
PRId64
" table index:%d numOfTable:%d, %s"
,
uid
,
pScanBaseInfo
->
cond
.
twindows
.
skey
,
pScanInfo
->
currentTable
,
numOfTables
,
id
);
}
qDebug
(
"tsdb reader offset seek snapshot to uid:%"
PRId64
" ts %"
PRId64
", table cur set to %d , all table num %d"
,
uid
,
ts
,
pTableScanInfo
->
currentTable
,
numOfTables
)
;
// restore the key value
pScanBaseInfo
->
cond
.
twindows
.
skey
=
oldSkey
;
}
else
{
qError
(
"invalid pOffset->type:%d"
,
pOffset
->
type
);
return
-
1
;
}
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
SSnapContext
*
sContext
=
pInfo
->
sContext
;
if
(
setForSnapShot
(
sContext
,
pOffset
->
uid
)
!=
0
)
{
qError
(
"setDataForSnapShot error. uid:%"
PRIi64
,
pOffset
->
uid
);
qError
(
"invalid pOffset->type:%d, %s"
,
pOffset
->
type
,
id
);
return
-
1
;
}
SMetaTableInfo
mtInfo
=
getUidfromSnapShot
(
sContext
);
tsdbReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
}
else
{
// subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
SSnapContext
*
sContext
=
pInfo
->
sContext
;
if
(
setForSnapShot
(
sContext
,
pOffset
->
uid
)
!=
0
)
{
qError
(
"setDataForSnapShot error. uid:%"
PRId64
" , %s"
,
pOffset
->
uid
,
id
);
return
-
1
;
}
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
tableListClear
(
pTaskInfo
->
pTableInfoList
);
SMetaTableInfo
mtInfo
=
getUidfromSnapShot
(
sContext
);
tsdbReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
if
(
mtInfo
.
uid
==
0
)
{
return
0
;
// no data
}
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
tableListClear
(
pTableListInfo
);
initQueryTableDataCondForTmq
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
sContext
,
&
mtInfo
);
pTaskInfo
->
streamInfo
.
tableCond
.
twindows
.
skey
=
pOffset
->
ts
;
if
(
mtInfo
.
uid
==
0
)
{
return
0
;
// no data
}
if
(
pTaskInfo
->
pTableInfoList
==
NULL
)
{
pTaskInfo
->
pTableInfoList
=
tableListCreate
();
}
initQueryTableDataCondForTmq
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
sContext
,
&
mtInfo
);
pTaskInfo
->
streamInfo
.
tableCond
.
twindows
.
skey
=
pOffset
->
ts
;
tableListAddTableInfo
(
pTaskInfo
->
pTableInfoList
,
mtInfo
.
uid
,
0
);
tableListAddTableInfo
(
pTableListInfo
,
mtInfo
.
uid
,
0
);
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
0
);
int32_t
size
=
tableListGetSize
(
pTaskInfo
->
pTableInfoList
);
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTableListInfo
,
0
);
int32_t
size
=
tableListGetSize
(
pTableListInfo
);
tsdbReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
size
,
NULL
,
&
pInfo
->
dataReader
,
NULL
);
tsdbReaderOpen
(
pInfo
->
vnode
,
&
pTaskInfo
->
streamInfo
.
tableCond
,
pList
,
size
,
NULL
,
&
pInfo
->
dataReader
,
NULL
);
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
strcpy
(
pTaskInfo
->
streamInfo
.
tbName
,
mtInfo
.
tbName
);
tDeleteSSchemaWrapper
(
pTaskInfo
->
streamInfo
.
schema
);
pTaskInfo
->
streamInfo
.
schema
=
mtInfo
.
schema
;
cleanupQueryTableDataCond
(
&
pTaskInfo
->
streamInfo
.
tableCond
);
strcpy
(
pTaskInfo
->
streamInfo
.
tbName
,
mtInfo
.
tbName
);
tDeleteSSchemaWrapper
(
pTaskInfo
->
streamInfo
.
schema
);
pTaskInfo
->
streamInfo
.
schema
=
mtInfo
.
schema
;
qDebug
(
"tmqsnap qStreamPrepareScan snapshot data uid:%"
PRId64
" ts %"
PRId64
,
mtInfo
.
uid
,
pOffset
->
ts
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
SSnapContext
*
sContext
=
pInfo
->
sContext
;
if
(
setForSnapShot
(
sContext
,
pOffset
->
uid
)
!=
0
)
{
qError
(
"setForSnapShot error. uid:%"
PRIu64
" ,version:%"
PRId64
,
pOffset
->
uid
,
pOffset
->
version
);
return
-
1
;
qDebug
(
"tmqsnap qStreamPrepareScan snapshot data uid:%"
PRId64
" ts %"
PRId64
" %s"
,
mtInfo
.
uid
,
pOffset
->
ts
,
id
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
SSnapContext
*
sContext
=
pInfo
->
sContext
;
if
(
setForSnapShot
(
sContext
,
pOffset
->
uid
)
!=
0
)
{
qError
(
"setForSnapShot error. uid:%"
PRIu64
" ,version:%"
PRId64
,
pOffset
->
uid
,
pOffset
->
version
);
return
-
1
;
}
qDebug
(
"tmqsnap qStreamPrepareScan snapshot meta uid:%"
PRId64
" ts %"
PRId64
" %s"
,
pOffset
->
uid
,
pOffset
->
ts
,
id
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
tsdbReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
qDebug
(
"tmqsnap qStreamPrepareScan snapshot log, %s"
,
id
);
}
qDebug
(
"tmqsnap qStreamPrepareScan snapshot meta uid:%"
PRId64
" ts %"
PRId64
,
pOffset
->
uid
,
pOffset
->
ts
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
SStreamRawScanInfo
*
pInfo
=
pOperator
->
info
;
tsdbReaderClose
(
pInfo
->
dataReader
);
pInfo
->
dataReader
=
NULL
;
qDebug
(
"tmqsnap qStreamPrepareScan snapshot log"
);
}
return
0
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d947619b
...
...
@@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) {
return
p
;
}
static
SExecTaskInfo
*
doCreateExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
doCreateExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampUs
();
pTaskInfo
->
schemaInfo
.
dbname
=
taosStrdup
(
dbFName
);
pTaskInfo
->
execModel
=
model
;
...
...
@@ -1989,6 +1990,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
pTaskInfo
->
stopInfo
.
pStopInfo
=
taosArrayInit
(
4
,
sizeof
(
SExchangeOpStopInfo
));
pTaskInfo
->
pResultBlockList
=
taosArrayInit
(
128
,
POINTER_BYTES
);
taosInitRWLatch
(
&
pTaskInfo
->
lock
);
pTaskInfo
->
id
.
vgId
=
vgId
;
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
str
=
buildTaskId
(
taskId
,
queryId
);
...
...
@@ -2464,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
goto
_complete
;
}
(
*
pTaskInfo
)
->
cost
.
created
=
taosGetTimestampUs
();
return
TSDB_CODE_SUCCESS
;
_complete:
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d947619b
...
...
@@ -779,13 +779,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if no data, switch to next table and continue scan
pInfo
->
currentTable
++
;
if
(
pInfo
->
currentTable
>=
numOfTables
)
{
qDebug
(
"all table checked in table list, total:%d, return NULL, %s"
,
numOfTables
,
GET_TASKID
(
pTaskInfo
));
return
NULL
;
}
STableKeyInfo
*
pTableInfo
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
pInfo
->
currentTable
);
tsdbSetTableList
(
pInfo
->
base
.
dataReader
,
pTableInfo
,
1
);
qDebug
(
"set uid:%"
PRIu64
" into scanner, total tables:%d, index:%d %s"
,
pTableInfo
->
uid
,
numOfTables
,
pInfo
->
currentTable
,
pTaskInfo
->
id
.
str
);
qDebug
(
"set uid:%"
PRIu64
" into scanner, total tables:%d, index:%d
/%d
%s"
,
pTableInfo
->
uid
,
numOfTables
,
pInfo
->
currentTable
,
numOfTables
,
GET_TASKID
(
pTaskInfo
)
);
tsdbReaderReset
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pInfo
->
scanTimes
=
0
;
...
...
@@ -1595,19 +1596,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
static
SSDataBlock
*
doQueueScan
(
SOperatorInfo
*
pOperator
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
qDebug
(
"start to exec queue scan
"
);
qDebug
(
"start to exec queue scan
, %s"
,
id
);
if
(
pTaskInfo
->
streamInfo
.
submit
.
msgStr
!=
NULL
)
{
if
(
pInfo
->
tqReader
->
msg2
.
msgStr
==
NULL
)
{
/*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/
/*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
/*void* msgStr = pTaskInfo->streamInfo.*/
if
(
pInfo
->
tqReader
->
msg2
.
msgStr
==
NULL
)
{
SPackedData
submit
=
pTaskInfo
->
streamInfo
.
submit
;
if
(
tqReaderSetSubmitReq2
(
pInfo
->
tqReader
,
submit
.
msgStr
,
submit
.
msgLen
,
submit
.
ver
)
<
0
)
{
qError
(
"submit msg messed up when initing stream submit block %p
"
,
submit
.
msgStr
);
qError
(
"submit msg messed up when initing stream submit block %p
, %s"
,
submit
.
msgStr
,
id
);
pInfo
->
tqReader
->
msg2
=
(
SPackedData
){
0
};
pInfo
->
tqReader
->
setMsg
=
0
;
ASSERT
(
0
);
...
...
@@ -1641,18 +1639,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if
(
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
pTableScanOp
);
if
(
pResult
&&
pResult
->
info
.
rows
>
0
)
{
qDebug
(
"queue scan tsdb return %d rows min:%"
PRId64
" max:%"
PRId64
" wal curVersion:%"
PRId64
,
pResult
->
info
.
rows
,
pResult
->
info
.
window
.
skey
,
pResult
->
info
.
window
.
ekey
,
pInfo
->
tqReader
->
pWalReader
->
curVersion
);
qDebug
(
"queue scan tsdb return %d rows min:%"
PRId64
" max:%"
PRId64
" wal curVersion:%"
PRId64
" %s"
,
pResult
->
info
.
rows
,
pResult
->
info
.
window
.
skey
,
pResult
->
info
.
window
.
ekey
,
pInfo
->
tqReader
->
pWalReader
->
curVersion
,
id
);
pTaskInfo
->
streamInfo
.
returned
=
1
;
return
pResult
;
}
else
{
// no data has return already, try to extract data in the WAL
if
(
!
pTaskInfo
->
streamInfo
.
returned
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
base
.
dataReader
);
qDebug
(
"3"
);
pTSInfo
->
base
.
dataReader
=
NULL
;
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
prepareStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
qDebug
(
"queue scan tsdb over, switch to wal ver %"
PRId64
""
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
qDebug
(
"queue scan tsdb over, switch to wal ver:%"
PRId64
" %s"
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
,
id
);
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
,
pTaskInfo
->
id
.
str
)
<
0
)
{
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
lastStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
return
NULL
;
...
...
@@ -1669,7 +1669,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if
(
tqNextBlock
(
pInfo
->
tqReader
,
&
ret
)
<
0
)
{
// if the end is reached, terrno is 0
if
(
terrno
!=
0
)
{
qError
(
"failed to get next log block since %s
"
,
terrstr
()
);
qError
(
"failed to get next log block since %s
, %s"
,
terrstr
(),
id
);
}
}
...
...
@@ -1684,9 +1684,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__META
)
{
qError
(
"unexpected ret.fetchType:%d"
,
ret
.
fetchType
);
continue
;
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
}
else
if
(
ret
.
fetchType
==
FETCH_TYPE__NONE
||
(
ret
.
fetchType
==
FETCH_TYPE__SEP
&&
pOperator
->
status
==
OP_EXEC_RECV
))
{
pTaskInfo
->
streamInfo
.
lastStatus
=
ret
.
offset
;
...
...
@@ -1698,7 +1695,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
}
}
else
{
qError
(
"unexpected streamInfo prepare type: %d
"
,
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
);
qError
(
"unexpected streamInfo prepare type: %d
%s"
,
pTaskInfo
->
streamInfo
.
prepareStatus
.
type
,
id
);
return
NULL
;
}
}
...
...
tests/system-test/2-query/columnLenUpdated.py
浏览文件 @
d947619b
...
...
@@ -83,7 +83,7 @@ class TDTestCase:
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
tdSql
.
init
(
conn
.
cursor
()
,
True
)
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
...
...
tests/system-test/7-tmq/subscribeDb.py
浏览文件 @
d947619b
...
...
@@ -13,11 +13,11 @@ from util.dnodes import *
class
TDTestCase
:
hostname
=
socket
.
gethostname
()
#rpcDebugFlagVal = '143'
#
rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
...
...
tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py
浏览文件 @
d947619b
...
...
@@ -16,6 +16,8 @@ sys.path.append("./7-tmq")
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
"tsdbDebugFlag"
:
135
}
def
__init__
(
self
):
self
.
vgroups
=
4
self
.
ctbNum
=
10
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录