Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c2a41ec
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4c2a41ec
编写于
1月 29, 2023
作者:
X
Xiaoyu Wang
提交者:
GitHub
1月 29, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19685 from taosdata/fix/3.0_merge_main
merge main
上级
fe8a4631
eb86f5ae
变更
43
隐藏空白更改
内联
并排
Showing
43 changed file
with
577 addition
and
337 deletion
+577
-337
include/common/tmsg.h
include/common/tmsg.h
+1
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+7
-6
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-1
source/client/src/clientEnv.c
source/client/src/clientEnv.c
+18
-48
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+1
-2
source/client/src/clientRawBlockWrite.c
source/client/src/clientRawBlockWrite.c
+32
-32
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+25
-1
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
+6
-3
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+4
-9
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+8
-5
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+23
-0
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-0
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+7
-1
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+2
-0
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+1
-1
source/libs/command/src/command.c
source/libs/command/src/command.c
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-2
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+9
-1
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+82
-73
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+0
-4
source/libs/function/src/tpercentile.c
source/libs/function/src/tpercentile.c
+7
-6
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+6
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+4
-2
source/libs/stream/src/streamMeta.c
source/libs/stream/src/streamMeta.c
+1
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+2
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+2
-2
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+8
-3
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+2
-5
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+5
-1
source/util/src/tarray.c
source/util/src/tarray.c
+21
-21
source/util/src/tlog.c
source/util/src/tlog.c
+1
-0
source/util/src/tpagedbuf.c
source/util/src/tpagedbuf.c
+142
-96
tests/develop-test/2-query/show_create_db.py
tests/develop-test/2-query/show_create_db.py
+82
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+1
-1
tests/script/tsim/query/sys_tbname.sim
tests/script/tsim/query/sys_tbname.sim
+19
-0
tools/shell/inc/shellInt.h
tools/shell/inc/shellInt.h
+1
-0
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-3
tools/shell/src/shellMain.c
tools/shell/src/shellMain.c
+30
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
4c2a41ec
...
...
@@ -914,6 +914,7 @@ typedef struct {
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
int8_t
schemaless
;
int16_t
sstTrigger
;
}
SDbCfgRsp
;
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
);
...
...
include/libs/stream/tstream.h
浏览文件 @
4c2a41ec
...
...
@@ -370,7 +370,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void
tFreeSStreamTask
(
SStreamTask
*
pTask
);
static
FORCE_INLINE
int32_t
streamTaskInput
(
SStreamTask
*
pTask
,
SStreamQueueItem
*
pItem
)
{
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
int8_t
type
=
pItem
->
type
;
if
(
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
SStreamDataSubmit2
*
pSubmitClone
=
streamSubmitRefClone
((
SStreamDataSubmit2
*
)
pItem
);
if
(
pSubmitClone
==
NULL
)
{
qDebug
(
"task %d %p submit enqueue failed since out of memory"
,
pTask
->
taskId
,
pTask
);
...
...
@@ -382,19 +383,19 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
pSubmitClone
->
submit
.
msgStr
,
pSubmitClone
->
submit
.
msgLen
,
pSubmitClone
->
submit
.
ver
);
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pSubmitClone
);
// qStreamInput(pTask->exec.executor, pSubmitClone);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
||
pItem
->
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
}
else
if
(
type
==
STREAM_INPUT__DATA_BLOCK
||
type
==
STREAM_INPUT__DATA_RETRIEVE
||
type
==
STREAM_INPUT__REF_DATA_BLOCK
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
// qStreamInput(pTask->exec.executor, pItem);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__CHECKPOINT
)
{
}
else
if
(
type
==
STREAM_INPUT__CHECKPOINT
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
// qStreamInput(pTask->exec.executor, pItem);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__GET_RES
)
{
}
else
if
(
type
==
STREAM_INPUT__GET_RES
)
{
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
// qStreamInput(pTask->exec.executor, pItem);
}
if
(
pItem
->
type
!=
STREAM_INPUT__GET_RES
&&
pItem
->
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
if
(
type
!=
STREAM_INPUT__GET_RES
&&
type
!=
STREAM_INPUT__CHECKPOINT
&&
pTask
->
triggerParam
!=
0
)
{
atomic_val_compare_exchange_8
(
&
pTask
->
triggerStatus
,
TASK_TRIGGER_STATUS__INACTIVE
,
TASK_TRIGGER_STATUS__ACTIVE
);
}
...
...
include/libs/sync/sync.h
浏览文件 @
4c2a41ec
...
...
@@ -193,7 +193,7 @@ typedef struct SSyncLogStore {
SyncIndex
(
*
syncLogLastIndex
)(
struct
SSyncLogStore
*
pLogStore
);
SyncTerm
(
*
syncLogLastTerm
)(
struct
SSyncLogStore
*
pLogStore
);
int32_t
(
*
syncLogAppendEntry
)(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
);
int32_t
(
*
syncLogAppendEntry
)(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
bool
forcSync
);
int32_t
(
*
syncLogGetEntry
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
);
int32_t
(
*
syncLogTruncate
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
fromIndex
);
...
...
source/client/src/clientEnv.c
浏览文件 @
4c2a41ec
...
...
@@ -400,45 +400,6 @@ void destroyRequest(SRequestObj *pRequest) {
removeRequest
(
pRequest
->
self
);
}
void
taosClientCrash
(
int
signum
,
void
*
sigInfo
,
void
*
context
)
{
taosIgnSignal
(
SIGTERM
);
taosIgnSignal
(
SIGHUP
);
taosIgnSignal
(
SIGINT
);
taosIgnSignal
(
SIGBREAK
);
#if !defined(WINDOWS)
taosIgnSignal
(
SIGBUS
);
#endif
taosIgnSignal
(
SIGABRT
);
taosIgnSignal
(
SIGFPE
);
taosIgnSignal
(
SIGSEGV
);
char
*
pMsg
=
NULL
;
const
char
*
flags
=
"UTL FATAL "
;
ELogLevel
level
=
DEBUG_FATAL
;
int32_t
dflag
=
255
;
int64_t
msgLen
=
-
1
;
if
(
tsEnableCrashReport
)
{
if
(
taosGenCrashJsonMsg
(
signum
,
&
pMsg
,
lastClusterId
,
appInfo
.
startTime
))
{
taosPrintLog
(
flags
,
level
,
dflag
,
"failed to generate crash json msg"
);
goto
_return
;
}
else
{
msgLen
=
strlen
(
pMsg
);
}
}
_return:
taosLogCrashInfo
(
"taos"
,
pMsg
,
msgLen
,
signum
,
sigInfo
);
#ifdef _TD_DARWIN_64
exit
(
signum
);
#elif defined(WINDOWS)
exit
(
signum
);
#endif
}
void
crashReportThreadFuncUnexpectedStopped
(
void
)
{
atomic_store_32
(
&
clientStop
,
-
1
);
}
static
void
*
tscCrashReportThreadFp
(
void
*
param
)
{
...
...
@@ -535,15 +496,26 @@ void tscStopCrashReport() {
}
}
static
void
tscSetSignalHandle
()
{
#if !defined(WINDOWS)
taosSetSignal
(
SIGBUS
,
taosClientCrash
);
#endif
taosSetSignal
(
SIGABRT
,
taosClientCrash
);
taosSetSignal
(
SIGFPE
,
taosClientCrash
);
taosSetSignal
(
SIGSEGV
,
taosClientCrash
);
void
tscWriteCrashInfo
(
int
signum
,
void
*
sigInfo
,
void
*
context
)
{
char
*
pMsg
=
NULL
;
const
char
*
flags
=
"UTL FATAL "
;
ELogLevel
level
=
DEBUG_FATAL
;
int32_t
dflag
=
255
;
int64_t
msgLen
=
-
1
;
if
(
tsEnableCrashReport
)
{
if
(
taosGenCrashJsonMsg
(
signum
,
&
pMsg
,
lastClusterId
,
appInfo
.
startTime
))
{
taosPrintLog
(
flags
,
level
,
dflag
,
"failed to generate crash json msg"
);
}
else
{
msgLen
=
strlen
(
pMsg
);
}
}
taosLogCrashInfo
(
"taos"
,
pMsg
,
msgLen
,
signum
,
sigInfo
);
}
void
taos_init_imp
(
void
)
{
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
...
...
@@ -567,8 +539,6 @@ void taos_init_imp(void) {
return
;
}
tscSetSignalHandle
();
initQueryModuleMsgHandle
();
if
(
taosConvInit
()
!=
0
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
4c2a41ec
...
...
@@ -1239,7 +1239,7 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
pTscObj
->
pAppInfo
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
transporterId
,
body
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
if
(
pRequest
->
code
!=
TSDB_CODE_SUCCESS
)
{
const
char
*
errorMsg
=
...
...
source/client/src/clientMain.c
浏览文件 @
4c2a41ec
...
...
@@ -528,9 +528,8 @@ void taos_stop_query(TAOS_RES *res) {
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
pRequest
->
killed
=
true
;
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
// It is not a query, no need to stop.
if
(
numOfFields
==
0
)
{
if
(
NULL
==
pRequest
->
pQuery
||
QUERY_EXEC_MODE_SCHEDULE
!=
pRequest
->
pQuery
->
execMode
)
{
tscDebug
(
"request 0x%"
PRIx64
" no need to be killed since not query"
,
pRequest
->
requestId
);
return
;
}
...
...
source/client/src/clientRawBlockWrite.c
浏览文件 @
4c2a41ec
...
...
@@ -179,7 +179,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
}
string
=
cJSON_PrintUnformatted
(
json
);
end:
end:
cJSON_Delete
(
json
);
tFreeSMAltertbReq
(
&
req
);
return
string
;
...
...
@@ -200,7 +200,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
}
string
=
buildCreateTableJson
(
&
req
.
schemaRow
,
&
req
.
schemaTag
,
req
.
name
,
req
.
suid
,
TSDB_SUPER_TABLE
);
_err:
_err:
tDecoderClear
(
&
coder
);
return
string
;
}
...
...
@@ -220,7 +220,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
}
string
=
buildAlterSTableJson
(
req
.
alterOriData
,
req
.
alterOriDataLen
);
_err:
_err:
tDecoderClear
(
&
coder
);
return
string
;
}
...
...
@@ -302,7 +302,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray
(
tags
,
tag
);
}
end:
end:
cJSON_AddItemToObject
(
json
,
"tags"
,
tags
);
taosArrayDestroy
(
pTagVals
);
}
...
...
@@ -360,7 +360,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
}
}
_exit:
_exit:
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
taosMemoryFreeClear
(
pCreateReq
->
comment
);
...
...
@@ -373,7 +373,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
}
static
char
*
processAutoCreateTable
(
STaosxRsp
*
rsp
)
{
if
(
rsp
->
createTableNum
<=
0
)
{
if
(
rsp
->
createTableNum
<=
0
)
{
uError
(
"WriteRaw:processAutoCreateTable rsp->createTableNum <= 0"
);
goto
_exit
;
}
...
...
@@ -392,14 +392,14 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
goto
_exit
;
}
if
(
pCreateReq
[
iReq
].
type
!=
TSDB_CHILD_TABLE
)
{
if
(
pCreateReq
[
iReq
].
type
!=
TSDB_CHILD_TABLE
)
{
uError
(
"WriteRaw:processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE"
);
goto
_exit
;
}
}
string
=
buildCreateCTableJson
(
pCreateReq
,
rsp
->
createTableNum
);
_exit:
_exit:
for
(
int
i
=
0
;
i
<
rsp
->
createTableNum
;
i
++
)
{
tDecoderClear
(
&
decoder
[
i
]);
taosMemoryFreeClear
(
pCreateReq
[
i
].
comment
);
...
...
@@ -500,7 +500,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
char
*
buf
=
NULL
;
if
(
vAlterTbReq
.
tagType
==
TSDB_DATA_TYPE_JSON
)
{
if
(
!
tTagIsJson
(
vAlterTbReq
.
pTagVal
))
{
if
(
!
tTagIsJson
(
vAlterTbReq
.
pTagVal
))
{
uError
(
"processAlterTable isJson false"
);
goto
_exit
;
}
...
...
@@ -524,7 +524,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
}
string
=
cJSON_PrintUnformatted
(
json
);
_exit:
_exit:
cJSON_Delete
(
json
);
tDecoderClear
(
&
decoder
);
return
string
;
...
...
@@ -557,12 +557,12 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string
=
cJSON_PrintUnformatted
(
json
);
_exit:
_exit:
cJSON_Delete
(
json
);
tDecoderClear
(
&
decoder
);
return
string
;
}
static
char
*
processDeleteTable
(
SMqMetaRsp
*
metaRsp
){
static
char
*
processDeleteTable
(
SMqMetaRsp
*
metaRsp
)
{
SDeleteRes
req
=
{
0
};
SDecoder
coder
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -596,7 +596,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp){
string
=
cJSON_PrintUnformatted
(
json
);
_exit:
_exit:
cJSON_Delete
(
json
);
tDecoderClear
(
&
coder
);
return
string
;
...
...
@@ -638,7 +638,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string
=
cJSON_PrintUnformatted
(
json
);
_exit:
_exit:
cJSON_Delete
(
json
);
tDecoderClear
(
&
decoder
);
return
string
;
...
...
@@ -726,7 +726,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code
=
pRequest
->
code
;
taosMemoryFree
(
pCmdMsg
.
pMsg
);
end:
end:
destroyRequest
(
pRequest
);
tFreeSMCreateStbReq
(
&
pReq
);
tDecoderClear
(
&
coder
);
...
...
@@ -796,7 +796,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code
=
pRequest
->
code
;
taosMemoryFree
(
pCmdMsg
.
pMsg
);
end:
end:
destroyRequest
(
pRequest
);
tDecoderClear
(
&
coder
);
return
code
;
...
...
@@ -857,9 +857,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp
(
pVgroupHashmap
,
destroyCreateTbReqBatch
);
SRequestConnInfo
conn
=
{.
pTrans
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
pRequest
->
tableList
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SName
));
// loop to create table
...
...
@@ -939,7 +939,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code
=
pRequest
->
code
;
end:
end:
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
pCreateReq
=
req
.
pReqs
+
iReq
;
taosMemoryFreeClear
(
pCreateReq
->
comment
);
...
...
@@ -1009,9 +1009,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp
(
pVgroupHashmap
,
destroyDropTbReqBatch
);
SRequestConnInfo
conn
=
{.
pTrans
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
pRequest
->
tableList
=
taosArrayInit
(
req
.
nReqs
,
sizeof
(
SName
));
// loop to create table
for
(
int32_t
iReq
=
0
;
iReq
<
req
.
nReqs
;
iReq
++
)
{
...
...
@@ -1063,7 +1063,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
}
code
=
pRequest
->
code
;
end:
end:
taosHashCleanup
(
pVgroupHashmap
);
destroyRequest
(
pRequest
);
tDecoderClear
(
&
coder
);
...
...
@@ -1131,7 +1131,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
}
taos_free_result
(
res
);
end:
end:
tDecoderClear
(
&
coder
);
return
code
;
}
...
...
@@ -1178,9 +1178,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
}
SRequestConnInfo
conn
=
{.
pTrans
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
SVgroupInfo
pInfo
=
{
0
};
SName
pName
=
{
0
};
...
...
@@ -1239,7 +1239,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code
=
handleAlterTbExecRes
(
pRes
->
res
,
pCatalog
);
}
}
end:
end:
taosArrayDestroy
(
pArray
);
if
(
pVgData
)
taosMemoryFreeClear
(
pVgData
->
pData
);
taosMemoryFreeClear
(
pVgData
);
...
...
@@ -1402,7 +1402,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
destroyRequest
(
pRequest
);
...
...
@@ -1521,7 +1521,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl
(
pRequest
,
pQuery
,
true
,
NULL
);
code
=
pRequest
->
code
;
end:
end:
tDeleteSMqDataRsp
(
&
rspObj
.
rsp
);
tDecoderClear
(
&
decoder
);
qDestroyQuery
(
pQuery
);
...
...
@@ -1619,7 +1619,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto
end
;
}
if
(
pCreateReq
.
type
!=
TSDB_CHILD_TABLE
)
{
if
(
pCreateReq
.
type
!=
TSDB_CHILD_TABLE
)
{
uError
(
"WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s"
,
tbName
);
code
=
TSDB_CODE_TSC_INVALID_VALUE
;
goto
end
;
...
...
source/common/src/tmsg.c
浏览文件 @
4c2a41ec
...
...
@@ -2821,8 +2821,8 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
if
(
tEncodeI8
(
&
encoder
,
pRetension
->
keepUnit
)
<
0
)
return
-
1
;
}
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
schemaless
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pRsp
->
sstTrigger
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
...
...
@@ -2873,6 +2873,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
}
}
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
schemaless
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pRsp
->
sstTrigger
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
4c2a41ec
...
...
@@ -889,7 +889,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
cfgRsp
.
numOfRetensions
=
pDb
->
cfg
.
numOfRetensions
;
cfgRsp
.
pRetensions
=
pDb
->
cfg
.
pRetensions
;
cfgRsp
.
schemaless
=
pDb
->
cfg
.
schemaless
;
cfgRsp
.
sstTrigger
=
pDb
->
cfg
.
sstTrigger
;
int32_t
contLen
=
tSerializeSDbCfgRsp
(
NULL
,
0
,
&
cfgRsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
4c2a41ec
...
...
@@ -153,6 +153,8 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
int32_t
metaTbCursorNext
(
SMTbCursor
*
pTbCur
,
ETableType
jumpTableType
);
int32_t
metaTbCursorPrev
(
SMTbCursor
*
pTbCur
);
#endif
// tsdb
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
4c2a41ec
...
...
@@ -310,7 +310,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
}
int
metaTbCursorNext
(
SMTbCursor
*
pTbCur
,
ETableType
jumpTableType
)
{
int
32_t
metaTbCursorNext
(
SMTbCursor
*
pTbCur
,
ETableType
jumpTableType
)
{
int
ret
;
void
*
pBuf
;
STbCfg
tbCfg
;
...
...
@@ -334,6 +334,30 @@ int metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
return
0
;
}
int32_t
metaTbCursorPrev
(
SMTbCursor
*
pTbCur
)
{
int
ret
;
void
*
pBuf
;
STbCfg
tbCfg
;
for
(;;)
{
ret
=
tdbTbcPrev
(
pTbCur
->
pDbc
,
&
pTbCur
->
pKey
,
&
pTbCur
->
kLen
,
&
pTbCur
->
pVal
,
&
pTbCur
->
vLen
);
if
(
ret
<
0
)
{
return
-
1
;
}
tDecoderClear
(
&
pTbCur
->
mr
.
coder
);
metaGetTableEntryByVersion
(
&
pTbCur
->
mr
,
((
SUidIdxVal
*
)
pTbCur
->
pVal
)[
0
].
version
,
*
(
tb_uid_t
*
)
pTbCur
->
pKey
);
if
(
pTbCur
->
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
continue
;
}
break
;
}
return
0
;
}
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
int
lock
)
{
void
*
pData
=
NULL
;
int
nData
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbCacheRead.c
浏览文件 @
4c2a41ec
...
...
@@ -268,7 +268,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
taosThreadMutexLock
(
&
pr
->
readerMutex
);
tsdbTakeReadSnap
((
STsdbReader
*
)
pr
,
tsdbCacheQueryReseek
,
&
pr
->
pReadSnap
);
code
=
tsdbTakeReadSnap
((
STsdbReader
*
)
pr
,
tsdbCacheQueryReseek
,
&
pr
->
pReadSnap
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_end
;
}
pr
->
pDataFReader
=
NULL
;
pr
->
pDataFReaderLast
=
NULL
;
...
...
@@ -279,7 +282,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
code
=
doExtractCacheRow
(
pr
,
lruCache
,
pKeyInfo
->
uid
,
&
pRow
,
&
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_end
;
}
if
(
h
==
NULL
)
{
...
...
@@ -352,7 +355,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
STableKeyInfo
*
pKeyInfo
=
&
pr
->
pTableList
[
i
];
code
=
doExtractCacheRow
(
pr
,
lruCache
,
pKeyInfo
->
uid
,
&
pRow
,
&
h
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
goto
_end
;
}
if
(
h
==
NULL
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
4c2a41ec
...
...
@@ -458,9 +458,8 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree
(
pHeadF
);
}
}
else
{
nRef
=
pHeadF
->
nRef
;
*
pHeadF
=
*
pSetNew
->
pHeadF
;
pHeadF
->
nRef
=
nRef
;
ASSERT
(
pHeadF
->
offset
==
pSetNew
->
pHeadF
->
offset
);
ASSERT
(
pHeadF
->
size
==
pSetNew
->
pHeadF
->
size
);
}
// data
...
...
@@ -481,9 +480,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree
(
pDataF
);
}
}
else
{
nRef
=
pDataF
->
nRef
;
*
pDataF
=
*
pSetNew
->
pDataF
;
pDataF
->
nRef
=
nRef
;
pDataF
->
size
=
pSetNew
->
pDataF
->
size
;
}
// sma
...
...
@@ -504,9 +501,7 @@ static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSe
taosMemoryFree
(
pSmaF
);
}
}
else
{
nRef
=
pSmaF
->
nRef
;
*
pSmaF
=
*
pSetNew
->
pSmaF
;
pSmaF
->
nRef
=
nRef
;
pSmaF
->
size
=
pSetNew
->
pSmaF
->
size
;
}
// stt
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
4c2a41ec
...
...
@@ -1776,12 +1776,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
if
(
minKey
==
k
.
ts
)
{
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
if
(
pSchema
==
NULL
)
{
return
terrno
;
}
if
(
init
)
{
tsdbRowMerge
(
&
merge
,
pRow
);
tsdbRowMerge
rAdd
(
&
merge
,
pRow
,
pSchema
);
}
else
{
init
=
true
;
STSchema
*
pSchema
=
doGetSchemaForTSRow
(
TSDBROW_SVERSION
(
pRow
),
pReader
,
pBlockScanInfo
->
uid
);
int32_t
code
=
tsdbRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
int32_t
code
=
tsdbRowMergerInit
(
&
merge
,
pRow
,
pSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
@@ -2882,7 +2885,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if
(
pResBlock
->
info
.
rows
>
0
)
{
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, elapsed time:%.2f ms %s"
,
" rows:%d, elapsed time:%.2f ms %s"
,
pReader
,
pResBlock
->
info
.
id
.
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
el
,
pReader
->
idStr
);
}
...
...
@@ -2932,7 +2935,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if
(
pResBlock
->
info
.
rows
>
0
)
{
tsdbDebug
(
"%p uid:%"
PRIu64
", composed data block created, brange:%"
PRIu64
"-%"
PRIu64
" rows:%d, elapsed time:%.2f ms %s"
,
" rows:%d, elapsed time:%.2f ms %s"
,
pReader
,
pResBlock
->
info
.
id
.
uid
,
pResBlock
->
info
.
window
.
skey
,
pResBlock
->
info
.
window
.
ekey
,
pResBlock
->
info
.
rows
,
el
,
pReader
->
idStr
);
}
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
4c2a41ec
...
...
@@ -732,6 +732,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
tsdbRowGetColVal
(
pRow
,
pTSchema
,
jCol
++
,
pColVal
);
if
(
key
.
version
>
pMerger
->
version
)
{
#if 0
if (!COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
...
...
@@ -747,6 +748,28 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema)
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
#endif
if
(
!
COL_VAL_IS_NONE
(
pColVal
))
{
if
(
IS_VAR_DATA_TYPE
(
pColVal
->
type
))
{
SColVal
*
pTColVal
=
taosArrayGet
(
pMerger
->
pArray
,
iCol
);
if
(
!
COL_VAL_IS_NULL
(
pColVal
))
{
code
=
tRealloc
(
&
pTColVal
->
value
.
pData
,
pColVal
->
value
.
nData
);
if
(
code
)
return
code
;
pTColVal
->
value
.
nData
=
pColVal
->
value
.
nData
;
if
(
pTColVal
->
value
.
nData
)
{
memcpy
(
pTColVal
->
value
.
pData
,
pColVal
->
value
.
pData
,
pTColVal
->
value
.
nData
);
}
pTColVal
->
flag
=
0
;
}
else
{
tFree
(
pTColVal
->
value
.
pData
);
pTColVal
->
value
.
pData
=
NULL
;
taosArraySet
(
pMerger
->
pArray
,
iCol
,
pColVal
);
}
}
else
{
taosArraySet
(
pMerger
->
pArray
,
iCol
,
pColVal
);
}
}
}
else
if
(
key
.
version
<
pMerger
->
version
)
{
SColVal
*
tColVal
=
(
SColVal
*
)
taosArrayGet
(
pMerger
->
pArray
,
iCol
);
if
(
COL_VAL_IS_NONE
(
tColVal
)
&&
!
COL_VAL_IS_NONE
(
pColVal
))
{
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
4c2a41ec
...
...
@@ -805,6 +805,7 @@ int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t
ctgAcquireVgMetaFromCache
(
SCatalog
*
pCtg
,
const
char
*
dbFName
,
const
char
*
tbName
,
SCtgDBCache
**
pDb
,
SCtgTbCache
**
pTb
);
int32_t
ctgCopyTbMeta
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
SCtgDBCache
**
pDb
,
SCtgTbCache
**
pTb
,
STableMeta
**
pTableMeta
,
char
*
dbFName
);
void
ctgReleaseVgMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
void
ctgReleaseTbMetaToCache
(
SCatalog
*
pCtg
,
SCtgDBCache
*
dbCache
,
SCtgTbCache
*
pCache
);
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCtgDebug
gCTGDebug
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
4c2a41ec
...
...
@@ -598,10 +598,16 @@ int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInf
CTG_ERR_JRET
(
ctgGetVgInfoFromHashValue
(
pCtg
,
dbCache
->
vgCache
.
vgInfo
,
pTableName
,
pVgroup
));
ctgRUnlockVgInfo
(
dbCache
);
SCtgTbMetaCtx
ctx
=
{
0
};
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
flag
=
CTG_FLAG_UNKNOWN_STB
;
CTG_ERR_JRET
(
ctgCopyTbMeta
(
pCtg
,
&
ctx
,
&
dbCache
,
&
tbCache
,
pTableMeta
,
db
));
code
=
ctgCopyTbMeta
(
pCtg
,
&
ctx
,
&
dbCache
,
&
tbCache
,
pTableMeta
,
db
);
ctgReleaseTbMetaToCache
(
pCtg
,
dbCache
,
tbCache
);
CTG_RET
(
code
);
_return:
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
4c2a41ec
...
...
@@ -999,6 +999,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
CTG_ERR_JRET
(
ctgGetTbMetaFromVnode
(
pCtg
,
pConn
,
pName
,
&
vgInfo
,
NULL
,
tReq
));
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
}
else
{
SBuildUseDBInput
input
=
{
0
};
...
...
@@ -1168,6 +1169,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
CTG_ERR_JRET
(
ctgGetTbMetaFromVnode
(
pCtg
,
pConn
,
pName
,
&
vgInfo
,
NULL
,
tReq
));
ctgReleaseVgInfoToCache
(
pCtg
,
dbCache
);
dbCache
=
NULL
;
}
else
{
SBuildUseDBInput
input
=
{
0
};
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
4c2a41ec
...
...
@@ -2118,7 +2118,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
_return:
if
(
dbCache
)
{
if
(
code
==
TSDB_CODE_SUCCESS
&&
dbCache
)
{
ctgWUnlockVgInfo
(
dbCache
);
}
...
...
source/libs/command/src/command.c
浏览文件 @
4c2a41ec
...
...
@@ -281,10 +281,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
len
+=
sprintf
(
buf2
+
VARSTR_HEADER_SIZE
,
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d
STT_TRIGGER %d
KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
"WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d"
,
dbFName
,
pCfg
->
buffer
,
pCfg
->
cacheSize
,
cacheModelStr
(
pCfg
->
cacheLast
),
pCfg
->
compression
,
pCfg
->
daysPerFile
,
pCfg
->
walFsyncPeriod
,
pCfg
->
maxRows
,
pCfg
->
minRows
,
pCfg
->
daysToKeep0
,
pCfg
->
daysToKeep1
,
pCfg
->
daysToKeep2
,
pCfg
->
walFsyncPeriod
,
pCfg
->
maxRows
,
pCfg
->
minRows
,
pCfg
->
sstTrigger
,
pCfg
->
daysToKeep0
,
pCfg
->
daysToKeep1
,
pCfg
->
daysToKeep2
,
pCfg
->
pages
,
pCfg
->
pageSize
,
prec
,
pCfg
->
replications
,
pCfg
->
walLevel
,
pCfg
->
numOfVgroups
,
1
==
pCfg
->
numOfStables
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
4c2a41ec
...
...
@@ -24,12 +24,16 @@
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
static
void
cleanupRefPool
()
{
int32_t
ref
=
atomic_val_compare_exchange_32
(
&
exchangeObjRefPool
,
exchangeObjRefPool
,
0
);
taosCloseRef
(
ref
);
}
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
atexit
(
cleanupRefPool
);
}
static
int32_t
doSetSMABlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
...
...
@@ -448,7 +452,6 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
qDebug
(
"start to create subplan task, TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
taskId
,
pSubplan
->
id
.
queryId
);
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
4c2a41ec
...
...
@@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t
pageId
=
0
;
pPage
=
getNewBufPage
(
pInfo
->
pBuf
,
&
pageId
);
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
if
(
pPage
==
NULL
)
{
return
pPage
;
}
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
*
(
int32_t
*
)
pPage
=
0
;
}
else
{
int32_t
*
curId
=
taosArrayGetLast
(
p
->
pPageList
);
...
...
@@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t
pageId
=
0
;
pPage
=
getNewBufPage
(
pInfo
->
pBuf
,
&
pageId
);
if
(
pPage
==
NULL
)
{
qError
(
"failed to get new buffer, code:%s"
,
tstrerror
(
terrno
));
return
NULL
;
}
taosArrayPush
(
p
->
pPageList
,
&
pageId
);
memset
(
pPage
,
0
,
getBufPageSize
(
pInfo
->
pBuf
));
}
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
4c2a41ec
...
...
@@ -66,7 +66,7 @@ typedef struct SSysTableScanInfo {
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
int32_t
tbnameSlotId
;
int32_t
tbnameSlotId
;
}
SSysTableScanInfo
;
typedef
struct
{
...
...
@@ -81,10 +81,10 @@ typedef struct MergeIndex {
}
MergeIndex
;
typedef
struct
SBlockDistInfo
{
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
SSDataBlock
*
pResBlock
;
STsdbReader
*
pHandle
;
SReadHandle
readHandle
;
uint64_t
uid
;
// table uid
}
SBlockDistInfo
;
static
int32_t
sysChkFilter__Comm
(
SNode
*
pNode
);
...
...
@@ -129,20 +129,20 @@ static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time"
static
char
*
SYSTABLE_SPECIAL_COL
[]
=
{
"db_name"
,
"vgroup_id"
};
static
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
);
static
SSDataBlock
*
buildInfoSchemaTableMetaBlock
(
char
*
tableName
);
static
void
destroySysScanOperator
(
void
*
param
);
static
int32_t
loadSysTableCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
);
static
SSDataBlock
*
doFilterResult
(
SSDataBlock
*
pDataBlock
,
SFilterInfo
*
pFilterInfo
);
static
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
);
static
SSDataBlock
*
buildInfoSchemaTableMetaBlock
(
char
*
tableName
);
static
void
destroySysScanOperator
(
void
*
param
);
static
int32_t
loadSysTableCallback
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
);
static
SSDataBlock
*
doFilterResult
(
SSDataBlock
*
pDataBlock
,
SFilterInfo
*
pFilterInfo
);
static
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
);
static
int32_t
sysTableUserTagsFillOneTableTags
(
const
SSysTableScanInfo
*
pInfo
,
SMetaReader
*
smrSuperTable
,
SMetaReader
*
smrChildTable
,
const
char
*
dbname
,
const
char
*
tableName
,
int32_t
*
pNumOfRows
,
const
SSDataBlock
*
dataBlock
);
static
int32_t
sysTableUserColsFillOneTableCols
(
const
SSysTableScanInfo
*
pInfo
,
const
char
*
dbname
,
int32_t
*
pNumOfRows
,
const
SSDataBlock
*
dataBlock
,
char
*
t
Name
,
SSchemaWrapper
*
schemaRow
,
char
*
t
ableType
);
static
int32_t
sysTableUserColsFillOneTableCols
(
const
SSysTableScanInfo
*
pInfo
,
const
char
*
dbname
,
int32_t
*
pNumOfRows
,
const
SSDataBlock
*
dataBlock
,
char
*
tName
,
SSchemaWrapper
*
schemaRow
,
char
*
tableType
);
static
void
relocateAndFilterSysTagsScanResult
(
SSysTableScanInfo
*
pInfo
,
int32_t
numOfRows
,
SSDataBlock
*
dataBlock
,
SFilterInfo
*
pFilterInfo
);
...
...
@@ -204,11 +204,11 @@ int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
if
(
func
==
NULL
)
return
-
1
;
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
val
=
pVal
->
datum
.
p
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
val
=
pVal
->
datum
.
p
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
return
-
1
;
}
...
...
@@ -223,11 +223,11 @@ int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
if
(
func
==
NULL
)
return
-
1
;
SMetaFltParam
param
=
{.
suid
=
0
,
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
.
cid
=
0
,
.
type
=
TSDB_DATA_TYPE_BIGINT
,
.
val
=
&
pVal
->
datum
.
i
,
.
reverse
=
reverse
,
.
filterFunc
=
func
};
int32_t
ret
=
metaFilterCreateTime
(
pMeta
,
&
param
,
result
);
return
ret
;
...
...
@@ -355,9 +355,9 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt);
static
SSDataBlock
*
sysTableScanFromMNode
(
SOperatorInfo
*
pOperator
,
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SExecTaskInfo
*
pTaskInfo
);
void
extractTbnameSlotId
(
SSysTableScanInfo
*
pInfo
,
const
SScanPhysiNode
*
pScanNode
);
static
SSDataBlock
*
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
);
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
)
{
static
SSDataBlock
*
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
);
__optSysFilter
optSysGetFilterFunc
(
int32_t
ctype
,
bool
*
reverse
)
{
if
(
ctype
==
OP_TYPE_LOWER_EQUAL
||
ctype
==
OP_TYPE_LOWER_THAN
)
{
*
reverse
=
true
;
}
...
...
@@ -479,13 +479,13 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
}
}
char
typeName
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
SSchemaWrapper
*
schemaRow
=
NULL
;
if
(
smrTable
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
schemaRow
=
&
smrTable
.
me
.
stbEntry
.
schemaRow
;
char
typeName
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
SSchemaWrapper
*
schemaRow
=
NULL
;
if
(
smrTable
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
schemaRow
=
&
smrTable
.
me
.
stbEntry
.
schemaRow
;
STR_TO_VARSTR
(
typeName
,
"CHILD_TABLE"
);
}
else
if
(
smrTable
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
schemaRow
=
&
smrTable
.
me
.
ntbEntry
.
schemaRow
;
}
else
if
(
smrTable
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
schemaRow
=
&
smrTable
.
me
.
ntbEntry
.
schemaRow
;
STR_TO_VARSTR
(
typeName
,
"NORMAL_TABLE"
);
}
...
...
@@ -507,50 +507,50 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
pInfo
->
pCur
=
metaOpenTbCursor
(
pInfo
->
readHandle
.
meta
);
}
SHashObj
*
stableSchema
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
SHashObj
*
stableSchema
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
taosHashSetFreeFp
(
stableSchema
,
tDeleteSSchemaWrapperForHash
);
while
((
ret
=
metaTbCursorNext
(
pInfo
->
pCur
,
TSDB_TABLE_MAX
))
==
0
)
{
char
typeName
[
TSDB_TABLE_FNAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
tableName
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
SSchemaWrapper
*
schemaRow
=
NULL
;
SSchemaWrapper
*
schemaRow
=
NULL
;
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
qDebug
(
"sysTableScanUserCols cursor get super table"
);
void
*
schema
=
taosHashGet
(
stableSchema
,
&
pInfo
->
pCur
->
mr
.
me
.
uid
,
sizeof
(
int64_t
));
if
(
schema
==
NULL
)
{
SSchemaWrapper
*
schemaWrapper
=
tCloneSSchemaWrapper
(
&
pInfo
->
pCur
->
mr
.
me
.
stbEntry
.
schemaRow
);
void
*
schema
=
taosHashGet
(
stableSchema
,
&
pInfo
->
pCur
->
mr
.
me
.
uid
,
sizeof
(
int64_t
));
if
(
schema
==
NULL
)
{
SSchemaWrapper
*
schemaWrapper
=
tCloneSSchemaWrapper
(
&
pInfo
->
pCur
->
mr
.
me
.
stbEntry
.
schemaRow
);
taosHashPut
(
stableSchema
,
&
pInfo
->
pCur
->
mr
.
me
.
uid
,
sizeof
(
int64_t
),
&
schemaWrapper
,
POINTER_BYTES
);
}
continue
;
}
else
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
}
else
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
qDebug
(
"sysTableScanUserCols cursor get child table"
);
STR_TO_VARSTR
(
typeName
,
"CHILD_TABLE"
);
STR_TO_VARSTR
(
tableName
,
pInfo
->
pCur
->
mr
.
me
.
name
);
int64_t
suid
=
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
;
void
*
schema
=
taosHashGet
(
stableSchema
,
&
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
,
sizeof
(
int64_t
));
if
(
schema
!=
NULL
)
{
schemaRow
=
*
(
SSchemaWrapper
**
)
schema
;
}
else
{
void
*
schema
=
taosHashGet
(
stableSchema
,
&
pInfo
->
pCur
->
mr
.
me
.
ctbEntry
.
suid
,
sizeof
(
int64_t
));
if
(
schema
!=
NULL
)
{
schemaRow
=
*
(
SSchemaWrapper
**
)
schema
;
}
else
{
tDecoderClear
(
&
pInfo
->
pCur
->
mr
.
coder
);
int
code
=
metaGetTableEntryByUid
(
&
pInfo
->
pCur
->
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// terrno has been set by metaGetTableEntryByName, therefore, return directly
qError
(
"sysTableScanUserCols get meta by suid:%"
PRId64
" error, code:%d"
,
suid
,
code
);
qError
(
"sysTableScanUserCols get meta by suid:%"
PRId64
" error, code:%d"
,
suid
,
code
);
blockDataDestroy
(
dataBlock
);
pInfo
->
loadInfo
.
totalRows
=
0
;
taosHashCleanup
(
stableSchema
);
return
NULL
;
}
schemaRow
=
&
pInfo
->
pCur
->
mr
.
me
.
stbEntry
.
schemaRow
;
schemaRow
=
&
pInfo
->
pCur
->
mr
.
me
.
stbEntry
.
schemaRow
;
}
}
else
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
}
else
if
(
pInfo
->
pCur
->
mr
.
me
.
type
==
TSDB_NORMAL_TABLE
)
{
qDebug
(
"sysTableScanUserCols cursor get normal table"
);
schemaRow
=
&
pInfo
->
pCur
->
mr
.
me
.
ntbEntry
.
schemaRow
;
schemaRow
=
&
pInfo
->
pCur
->
mr
.
me
.
ntbEntry
.
schemaRow
;
STR_TO_VARSTR
(
typeName
,
"NORMAL_TABLE"
);
STR_TO_VARSTR
(
tableName
,
pInfo
->
pCur
->
mr
.
me
.
name
);
}
else
{
}
else
{
qDebug
(
"sysTableScanUserCols cursor get invalid table"
);
continue
;
}
...
...
@@ -665,6 +665,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
pInfo
->
pCur
=
metaOpenTbCursor
(
pInfo
->
readHandle
.
meta
);
}
bool
blockFull
=
false
;
while
((
ret
=
metaTbCursorNext
(
pInfo
->
pCur
,
TSDB_SUPER_TABLE
))
==
0
)
{
if
(
pInfo
->
pCur
->
mr
.
me
.
type
!=
TSDB_CHILD_TABLE
)
{
continue
;
...
...
@@ -686,17 +687,25 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
T_LONG_JMP
(
pTaskInfo
->
env
,
terrno
);
}
sysTableUserTagsFillOneTableTags
(
pInfo
,
&
smrSuperTable
,
&
pInfo
->
pCur
->
mr
,
dbname
,
tableName
,
&
numOfRows
,
dataBlock
);
if
((
smrSuperTable
.
me
.
stbEntry
.
schemaTag
.
nCols
+
numOfRows
)
>
pOperator
->
resultInfo
.
capacity
)
{
metaTbCursorPrev
(
pInfo
->
pCur
);
blockFull
=
true
;
}
else
{
sysTableUserTagsFillOneTableTags
(
pInfo
,
&
smrSuperTable
,
&
pInfo
->
pCur
->
mr
,
dbname
,
tableName
,
&
numOfRows
,
dataBlock
);
}
metaReaderClear
(
&
smrSuperTable
);
if
(
numOfRows
>=
pOperator
->
resultInfo
.
capacity
)
{
if
(
blockFull
||
numOfRows
>=
pOperator
->
resultInfo
.
capacity
)
{
relocateAndFilterSysTagsScanResult
(
pInfo
,
numOfRows
,
dataBlock
,
pOperator
->
exprSupp
.
pFilterInfo
);
numOfRows
=
0
;
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
break
;
}
blockFull
=
false
;
}
}
...
...
@@ -902,10 +911,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
sysTableUserColsFillOneTableCols
(
const
SSysTableScanInfo
*
pInfo
,
const
char
*
dbname
,
int32_t
*
pNumOfRows
,
const
SSDataBlock
*
dataBlock
,
char
*
tName
,
SSchemaWrapper
*
schemaRow
,
char
*
tableType
)
{
if
(
schemaRow
==
NULL
)
{
static
int32_t
sysTableUserColsFillOneTableCols
(
const
SSysTableScanInfo
*
pInfo
,
const
char
*
dbname
,
int32_t
*
pNumOfRows
,
const
SSDataBlock
*
dataBlock
,
char
*
tName
,
SSchemaWrapper
*
schemaRow
,
char
*
tableType
)
{
if
(
schemaRow
==
NULL
)
{
qError
(
"sysTableUserColsFillOneTableCols schemaRow is NULL"
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -941,9 +950,8 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo,
colTypeLen
+=
sprintf
(
varDataVal
(
colTypeStr
)
+
colTypeLen
,
"(%d)"
,
(
int32_t
)(
schemaRow
->
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
));
}
else
if
(
colType
==
TSDB_DATA_TYPE_NCHAR
)
{
colTypeLen
+=
sprintf
(
varDataVal
(
colTypeStr
)
+
colTypeLen
,
"(%d)"
,
(
int32_t
)((
schemaRow
->
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
));
colTypeLen
+=
sprintf
(
varDataVal
(
colTypeStr
)
+
colTypeLen
,
"(%d)"
,
(
int32_t
)((
schemaRow
->
pSchema
[
i
].
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
));
}
varDataSetLen
(
colTypeStr
,
colTypeLen
);
colDataAppend
(
pColInfoData
,
numOfRows
,
(
char
*
)
colTypeStr
,
false
);
...
...
@@ -1550,9 +1558,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if
(
pInfo
->
showRewrite
)
{
getDBNameFromCondition
(
pInfo
->
pCondition
,
dbName
);
sprintf
(
pInfo
->
req
.
db
,
"%d.%s"
,
pInfo
->
accountId
,
dbName
);
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_COLS
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_COLS
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
getDBNameFromCondition
(
pInfo
->
pCondition
,
dbName
);
if
(
dbName
[
0
])
sprintf
(
pInfo
->
req
.
db
,
"%d.%s"
,
pInfo
->
accountId
,
dbName
);
if
(
dbName
[
0
])
sprintf
(
pInfo
->
req
.
db
,
"%d.%s"
,
pInfo
->
accountId
,
dbName
);
sysTableIsCondOnOneTable
(
pInfo
->
pCondition
,
pInfo
->
req
.
filterTb
);
}
...
...
@@ -1573,12 +1581,12 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return
sysTableScanFillTbName
(
pOperator
,
pInfo
,
name
,
pBlock
);
}
static
SSDataBlock
*
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
)
{
static
SSDataBlock
*
sysTableScanFillTbName
(
SOperatorInfo
*
pOperator
,
const
SSysTableScanInfo
*
pInfo
,
const
char
*
name
,
SSDataBlock
*
pBlock
)
{
if
(
pBlock
!=
NULL
)
{
if
(
pInfo
->
tbnameSlotId
!=
-
1
)
{
SColumnInfoData
*
pColumnInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pBlock
->
pDataBlock
,
pInfo
->
tbnameSlotId
);
char
varTbName
[
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
varTbName
[
TSDB_TABLE_FNAME_LEN
-
1
+
VARSTR_HEADER_SIZE
]
=
{
0
};
memcpy
(
varDataVal
(
varTbName
),
name
,
strlen
(
name
));
varDataSetLen
(
varTbName
,
strlen
(
name
));
for
(
int
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
...
...
@@ -1669,7 +1677,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
code
=
TDB_CODE_SUCCESS
;
int32_t
code
=
TDB_CODE_SUCCESS
;
SSysTableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SSysTableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -1717,10 +1725,11 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo
(
pOperator
,
"SysTableScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
exprSupp
.
numOfExprs
=
taosArrayGetSize
(
pInfo
->
pRes
->
pDataBlock
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doSysTableScan
,
NULL
,
destroySysScanOperator
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
_error:
if
(
pInfo
!=
NULL
)
{
destroySysScanOperator
(
pInfo
);
}
...
...
@@ -1757,7 +1766,7 @@ void destroySysScanOperator(void* param) {
const
char
*
name
=
tNameGetTableName
(
&
pInfo
->
name
);
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_TABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
||
strncasecmp
(
name
,
TSDB_INS_TABLE_TAGS
,
TSDB_TABLE_FNAME_LEN
)
==
0
||
strncasecmp
(
name
,
TSDB_INS_TABLE_COLS
,
TSDB_TABLE_FNAME_LEN
)
==
0
||
pInfo
->
pCur
!=
NULL
)
{
strncasecmp
(
name
,
TSDB_INS_TABLE_COLS
,
TSDB_TABLE_FNAME_LEN
)
==
0
||
pInfo
->
pCur
!=
NULL
)
{
metaCloseTbCursor
(
pInfo
->
pCur
);
pInfo
->
pCur
=
NULL
;
}
...
...
@@ -2165,7 +2174,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
// make the valgrind happy that all memory buffer has been initialized already.
if
(
slotId
!=
0
)
{
SColumnInfoData
*
p1
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
int64_t
v
=
0
;
int64_t
v
=
0
;
colDataAppendInt64
(
p1
,
0
,
&
v
);
}
...
...
@@ -2175,10 +2184,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
}
static
void
destroyBlockDistScanOperatorInfo
(
void
*
param
)
{
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
tsdbReaderClose
(
pDistInfo
->
pHandle
);
taosMemoryFreeClear
(
param
);
SBlockDistInfo
*
pDistInfo
=
(
SBlockDistInfo
*
)
param
;
blockDataDestroy
(
pDistInfo
->
pResBlock
);
tsdbReaderClose
(
pDistInfo
->
pHandle
);
taosMemoryFreeClear
(
param
);
}
static
int32_t
initTableblockDistQueryCond
(
uint64_t
uid
,
SQueryTableDataCond
*
pCond
)
{
...
...
@@ -2250,8 +2259,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
setOperatorInfo
(
pOperator
,
"DataBlockDistScanOperator"
,
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
doBlockInfoScan
,
NULL
,
destroyBlockDistScanOperatorInfo
,
optrDefaultBufFn
,
NULL
);
return
pOperator
;
_error:
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
4c2a41ec
...
...
@@ -3053,14 +3053,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if
(
pHandle
->
currentPage
==
-
1
)
{
pPage
=
getNewBufPage
(
pHandle
->
pBuf
,
&
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pPage
->
num
=
sizeof
(
SFilePage
);
}
else
{
pPage
=
getBufPage
(
pHandle
->
pBuf
,
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
if
(
pPage
->
num
+
length
>
getBufPageSize
(
pHandle
->
pBuf
))
{
...
...
@@ -3068,7 +3066,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
pPage
=
getNewBufPage
(
pHandle
->
pBuf
,
&
pHandle
->
currentPage
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pPage
->
num
=
sizeof
(
SFilePage
);
...
...
@@ -3115,7 +3112,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if
(
pHandle
->
pBuf
!=
NULL
)
{
SFilePage
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
pPos
->
pageId
);
if
(
pPage
==
NULL
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
memcpy
(
pPage
->
data
+
pPos
->
offset
,
pBuf
,
length
);
...
...
source/libs/function/src/tpercentile.c
浏览文件 @
4c2a41ec
...
...
@@ -50,8 +50,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if
(
pg
==
NULL
)
{
return
NULL
;
}
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
memcpy
(
buffer
->
data
+
offset
,
pg
->
data
,
(
size_t
)(
pg
->
num
*
pMemBucket
->
bytes
));
offset
+=
(
int32_t
)(
pg
->
num
*
pMemBucket
->
bytes
);
}
...
...
@@ -116,7 +116,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t
*
pageId
=
taosArrayGet
(
list
,
0
);
SFilePage
*
pPage
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
if
(
pPage
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
ASSERT
(
pPage
->
num
==
1
);
...
...
@@ -283,7 +283,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return
NULL
;
}
int32_t
ret
=
createDiskbasedBuf
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
512
,
"1"
,
tsTempDir
);
int32_t
ret
=
createDiskbasedBuf
(
&
pBucket
->
pBuffer
,
pBucket
->
bufPageSize
,
pBucket
->
bufPageSize
*
1024
,
"1"
,
tsTempDir
);
if
(
ret
!=
0
)
{
tMemBucketDestroy
(
pBucket
);
return
NULL
;
...
...
@@ -395,7 +395,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot
->
info
.
data
=
getNewBufPage
(
pBucket
->
pBuffer
,
&
pageId
);
if
(
pSlot
->
info
.
data
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
pSlot
->
info
.
pageId
=
pageId
;
taosArrayPush
(
pPageIdList
,
&
pageId
);
...
...
@@ -489,8 +489,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed.
SFilePage
*
buffer
=
loadDataFromFilePage
(
pMemBucket
,
i
);
if
(
buffer
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
int32_t
currentIdx
=
count
-
num
;
char
*
thisVal
=
buffer
->
data
+
pMemBucket
->
bytes
*
currentIdx
;
...
...
@@ -536,7 +537,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t
*
pageId
=
taosArrayGet
(
list
,
f
);
SFilePage
*
pg
=
getBufPage
(
pMemBucket
->
pBuffer
,
*
pageId
);
if
(
pg
==
NULL
)
{
return
TSDB_CODE_NO_AVAIL_DISK
;
return
terrno
;
}
int32_t
code
=
tMemBucketPut
(
pMemBucket
,
pg
->
data
,
(
int32_t
)
pg
->
num
);
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
4c2a41ec
...
...
@@ -228,9 +228,14 @@ typedef struct SQWorkerMgmt {
case QW_PHASE_POST_FETCH: \
ctx->inFetch = 0; \
break; \
default: \
case QW_PHASE_PRE_QUERY: \
case QW_PHASE_POST_QUERY: \
case QW_PHASE_PRE_CQUERY: \
case QW_PHASE_POST_CQUERY: \
atomic_store_8(&(ctx)->phase, _value); \
break; \
default: \
break; \
} \
} while (0)
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
4c2a41ec
...
...
@@ -550,7 +550,9 @@ _return:
if
(
ctx
)
{
QW_UPDATE_RSP_CODE
(
ctx
,
code
);
QW_SET_PHASE
(
ctx
,
phase
);
if
(
QW_PHASE_POST_CQUERY
!=
phase
)
{
QW_SET_PHASE
(
ctx
,
phase
);
}
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
qwReleaseTaskCtx
(
mgmt
,
ctx
);
...
...
@@ -757,7 +759,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
qComplete
||
(
queryStop
&&
(
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryContinue
)))
||
code
)
{
// Note: query is not running anymore
QW_SET_PHASE
(
ctx
,
0
);
QW_SET_PHASE
(
ctx
,
QW_PHASE_POST_CQUERY
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
break
;
}
...
...
source/libs/stream/src/streamMeta.c
浏览文件 @
4c2a41ec
...
...
@@ -207,6 +207,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
if
(
ppTask
)
{
SStreamTask
*
pTask
=
*
ppTask
;
taosHashRemove
(
pMeta
->
pTasks
,
&
taskId
,
sizeof
(
int32_t
));
tdbTbDelete
(
pMeta
->
pTaskDb
,
&
taskId
,
sizeof
(
int32_t
),
pMeta
->
txn
);
/*if (pTask->timer) {
* taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
4c2a41ec
...
...
@@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT
(
pAppendEntry
->
index
==
appendIndex
);
// append
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
,
false
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, append error, append-index:%"
PRId64
,
appendIndex
);
...
...
@@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
// append
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pAppendEntry
,
false
);
if
(
code
!=
0
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"ignore, log not exist, append error, append-index:%"
PRId64
,
appendIndex
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
4c2a41ec
...
...
@@ -2488,7 +2488,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
LRUHandle
*
h
=
NULL
;
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
int32_t
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
);
int32_t
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
,
false
);
if
(
code
!=
0
)
{
sError
(
"append noop error"
);
return
-
1
;
...
...
@@ -2731,7 +2731,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
// append entry
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
,
false
);
if
(
code
!=
0
)
{
if
(
ths
->
replicaNum
==
1
)
{
if
(
h
)
{
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
4c2a41ec
...
...
@@ -364,7 +364,11 @@ _out:
return
ret
;
}
int32_t
syncLogStorePersist
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
static
inline
bool
syncLogStoreNeedFlush
(
SSyncRaftEntry
*
pEntry
,
int32_t
replicaNum
)
{
return
(
replicaNum
>
1
)
&&
(
pEntry
->
originalRpcType
==
TDMT_VND_COMMIT
);
}
int32_t
syncLogStorePersist
(
SSyncLogStore
*
pLogStore
,
SSyncNode
*
pNode
,
SSyncRaftEntry
*
pEntry
)
{
ASSERT
(
pEntry
->
index
>=
0
);
SyncIndex
lastVer
=
pLogStore
->
syncLogLastIndex
(
pLogStore
);
if
(
lastVer
>=
pEntry
->
index
&&
pLogStore
->
syncLogTruncate
(
pLogStore
,
pEntry
->
index
)
<
0
)
{
...
...
@@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
lastVer
=
pLogStore
->
syncLogLastIndex
(
pLogStore
);
ASSERT
(
pEntry
->
index
==
lastVer
+
1
);
if
(
pLogStore
->
syncLogAppendEntry
(
pLogStore
,
pEntry
)
<
0
)
{
bool
doFsync
=
syncLogStoreNeedFlush
(
pEntry
,
pNode
->
replicaNum
);
if
(
pLogStore
->
syncLogAppendEntry
(
pLogStore
,
pEntry
,
doFsync
)
<
0
)
{
sError
(
"failed to append sync log entry since %s. index:%"
PRId64
", term:%"
PRId64
""
,
terrstr
(),
pEntry
->
index
,
pEntry
->
term
);
return
-
1
;
...
...
@@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
(
void
)
syncNodeReplicateWithoutLock
(
pNode
);
// persist
if
(
syncLogStorePersist
(
pLogStore
,
pEntry
)
<
0
)
{
if
(
syncLogStorePersist
(
pLogStore
,
p
Node
,
p
Entry
)
<
0
)
{
sError
(
"vgId:%d, failed to persist sync log entry from buffer since %s. index:%"
PRId64
,
pNode
->
vgId
,
terrstr
(),
pEntry
->
index
);
goto
_out
;
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
4c2a41ec
...
...
@@ -23,7 +23,7 @@
// public function
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
);
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
);
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
bool
forceSync
);
static
int32_t
raftLogTruncate
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
fromIndex
);
static
bool
raftLogExist
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
static
int32_t
raftLogUpdateCommitIndex
(
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
...
...
@@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return
SYNC_TERM_INVALID
;
}
static
inline
bool
raftLogForceSync
(
SSyncRaftEntry
*
pEntry
)
{
return
(
pEntry
->
originalRpcType
==
TDMT_VND_COMMIT
);
}
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
bool
forceSync
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
...
...
@@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT
(
pEntry
->
index
==
index
);
bool
forceSync
=
raftLogForceSync
(
pEntry
);
walFsync
(
pWal
,
forceSync
);
sNTrace
(
pData
->
pSyncNode
,
"write index:%"
PRId64
", type:%s, origin type:%s, elapsed:%"
PRId64
,
pEntry
->
index
,
...
...
source/os/src/osSysinfo.c
浏览文件 @
4c2a41ec
...
...
@@ -840,7 +840,11 @@ int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
uuid_generate
(
uuid
);
// it's caller's responsibility to make enough space for `uid`, that's 36-char + 1-null
uuid_unparse_lower
(
uuid
,
buf
);
memcpy
(
uid
,
buf
,
uidlen
);
int
n
=
snprintf
(
uid
,
uidlen
,
"%.*s"
,
(
int
)
sizeof
(
buf
),
buf
);
// though less performance, much safer
if
(
n
>=
uidlen
)
{
// target buffer is too small
return
-
1
;
}
return
0
;
#else
int
len
=
0
;
...
...
source/util/src/tarray.c
浏览文件 @
4c2a41ec
...
...
@@ -20,7 +20,10 @@
// todo refactor API
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
if
(
elemSize
==
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
if
(
size
<
TARRAY_MIN_SIZE
)
{
size
=
TARRAY_MIN_SIZE
;
...
...
@@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
}
void
taosArrayRemoveDuplicate
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
))
{
assert
(
pArray
);
size_t
size
=
pArray
->
size
;
if
(
size
<=
1
)
{
return
;
...
...
@@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void
taosArrayRemoveDuplicateP
(
SArray
*
pArray
,
__compar_fn_t
comparFn
,
void
(
*
fp
)(
void
*
))
{
assert
(
pArray
);
size_t
size
=
pArray
->
size
;
if
(
size
<=
1
)
{
return
;
...
...
@@ -197,11 +196,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) {
}
void
*
taosArrayPop
(
SArray
*
pArray
)
{
assert
(
pArray
!=
NULL
);
if
(
pArray
->
size
==
0
)
{
return
NULL
;
}
pArray
->
size
-=
1
;
return
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
);
}
...
...
@@ -210,16 +208,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
if
(
NULL
==
pArray
)
{
return
NULL
;
}
assert
(
index
<
pArray
->
size
);
if
(
index
>=
pArray
->
size
)
{
uError
(
"index is out of range, current:%"
PRIzu
" max:%d"
,
index
,
pArray
->
capacity
);
return
NULL
;
}
return
TARRAY_GET_ELEM
(
pArray
,
index
);
}
void
*
taosArrayGetP
(
const
SArray
*
pArray
,
size_t
index
)
{
assert
(
index
<
pArray
->
size
);
void
*
d
=
TARRAY_GET_ELEM
(
pArray
,
index
)
;
return
*
(
void
**
)
d
;
void
**
p
=
taosArrayGet
(
pArray
,
index
);
if
(
p
==
NULL
)
{
return
NULL
;
}
return
*
p
;
}
void
*
taosArrayGetLast
(
const
SArray
*
pArray
)
{
return
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
-
1
);
}
...
...
@@ -312,9 +315,12 @@ void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp)
}
SArray
*
taosArrayFromList
(
const
void
*
src
,
size_t
size
,
size_t
elemSize
)
{
assert
(
src
!=
NULL
&&
elemSize
>
0
);
SArray
*
pDst
=
taosArrayInit
(
size
,
elemSize
);
if
(
elemSize
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
SArray
*
pDst
=
taosArrayInit
(
size
,
elemSize
);
memcpy
(
pDst
->
pData
,
src
,
elemSize
*
size
);
pDst
->
size
=
size
;
...
...
@@ -322,8 +328,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
}
SArray
*
taosArrayDup
(
const
SArray
*
pSrc
,
__array_item_dup_fn_t
fn
)
{
assert
(
pSrc
!=
NULL
);
if
(
pSrc
->
size
==
0
)
{
// empty array list
return
taosArrayInit
(
8
,
pSrc
->
elemSize
);
}
...
...
@@ -415,14 +419,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
}
void
taosArraySort
(
SArray
*
pArray
,
__compar_fn_t
compar
)
{
ASSERT
(
pArray
!=
NULL
&&
compar
!=
NULL
);
taosSort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
compar
);
}
void
*
taosArraySearch
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int32_t
flags
)
{
assert
(
pArray
!=
NULL
&&
comparFn
!=
NULL
);
assert
(
key
!=
NULL
);
return
taosbsearch
(
key
,
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
,
flags
);
}
...
...
source/util/src/tlog.c
浏览文件 @
4c2a41ec
...
...
@@ -897,6 +897,7 @@ void taosLogCrashInfo(char* nodeType, char* pMsg, int64_t msgLen, int signum, vo
pFile
=
taosOpenFile
(
filepath
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosPrintLog
(
flags
,
level
,
dflag
,
"failed to open file:%s since %s"
,
filepath
,
terrstr
());
goto
_return
;
}
...
...
source/util/src/tpagedbuf.c
浏览文件 @
4c2a41ec
...
...
@@ -5,7 +5,10 @@
#include "thash.h"
#include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef
struct
SPageDiskInfo
{
...
...
@@ -14,7 +17,7 @@ typedef struct SPageDiskInfo {
}
SPageDiskInfo
,
SFreeListItem
;
struct
SPageInfo
{
SListNode
*
pn
;
// point to list node struct
SListNode
*
pn
;
// point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void
*
pData
;
int64_t
offset
;
int32_t
pageId
;
...
...
@@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba
return
data
;
}
static
uint64_t
allocatePositionInFile
(
SDiskbasedBuf
*
pBuf
,
size_t
size
)
{
static
uint64_t
allocate
New
PositionInFile
(
SDiskbasedBuf
*
pBuf
,
size_t
size
)
{
if
(
pBuf
->
pFree
==
NULL
)
{
return
pBuf
->
nextPos
;
}
else
{
...
...
@@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
}
}
static
void
setPageNotInBuf
(
SPageInfo
*
pPageInfo
)
{
pPageInfo
->
pData
=
NULL
;
}
static
FORCE_INLINE
size_t
getAllocPageSize
(
int32_t
pageSize
)
{
return
pageSize
+
POINTER_BYTES
+
sizeof
(
SFilePage
);
}
/**
* +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
...
...
@@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize
* @param pg
* @return
*/
static
char
*
doFlushPageToDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
ASSERT
(
!
pg
->
used
&&
pg
->
pData
!=
NULL
);
static
FORCE_INLINE
size_t
getAllocPageSize
(
int32_t
pageSize
)
{
return
pageSize
+
POINTER_BYTES
+
sizeof
(
SFilePage
);
}
static
char
*
doFlushBufPage
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
if
(
pg
->
pData
==
NULL
||
pg
->
used
)
{
uError
(
"invalid params in paged buffer process when flushing buf to disk, %s"
,
pBuf
->
id
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
int32_t
size
=
pBuf
->
pageSize
;
char
*
t
=
NULL
;
if
(
pg
->
offset
==
-
1
||
pg
->
dirty
)
{
void
*
payload
=
GET_
DATA_PAYLOAD
(
pg
);
if
(
(
!
HAS_DATA_IN_DISK
(
pg
))
||
pg
->
dirty
)
{
void
*
payload
=
GET_
PAYLOAD_DATA
(
pg
);
t
=
doCompressData
(
payload
,
pBuf
->
pageSize
,
&
size
,
pBuf
);
ASSERTS
(
size
>=
0
,
"size is negative"
);
if
(
size
<
0
)
{
uError
(
"failed to compress data when flushing data to disk, %s"
,
pBuf
->
id
);
return
NULL
;
}
}
// this page is flushed to disk for the first time
if
(
pg
->
dirty
)
{
if
(
pg
->
offset
==
-
1
)
{
ASSERTS
(
pg
->
dirty
==
true
,
"pg->dirty is false"
);
pg
->
offset
=
allocatePositionInFile
(
pBuf
,
size
);
if
(
!
HAS_DATA_IN_DISK
(
pg
))
{
pg
->
offset
=
allocateNewPositionInFile
(
pBuf
,
size
);
pBuf
->
nextPos
+=
size
;
int32_t
ret
=
taosLSeekFile
(
pBuf
->
pFile
,
pg
->
offset
,
SEEK_SET
);
...
...
@@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return
NULL
;
}
// extend the file size
if
(
pBuf
->
fileSize
<
pg
->
offset
+
size
)
{
pBuf
->
fileSize
=
pg
->
offset
+
size
;
}
...
...
@@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
taosArrayPush
(
pBuf
->
pFree
,
&
dinfo
);
// 2. allocate new position, and update the info
pg
->
offset
=
allocatePositionInFile
(
pBuf
,
size
);
pg
->
offset
=
allocate
New
PositionInFile
(
pBuf
,
size
);
pBuf
->
nextPos
+=
size
;
}
...
...
@@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
size
=
pg
->
length
;
}
ASSERT
(
size
>
0
||
(
pg
->
offset
==
-
1
&&
pg
->
length
==
-
1
));
char
*
pDataBuf
=
pg
->
pData
;
memset
(
pDataBuf
,
0
,
getAllocPageSize
(
pBuf
->
pageSize
));
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_flush %p, pageId:%d, offset:%d"
,
pDataBuf
,
pg
->
pageId
,
pg
->
offset
);
#endif
pg
->
length
=
size
;
// on disk size
return
pDataBuf
;
}
static
char
*
flush
PageToDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
static
char
*
flush
BufPage
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
ASSERT
(((
int64_t
)
pBuf
->
numOfPages
*
pBuf
->
pageSize
)
==
pBuf
->
totalBufSize
&&
pBuf
->
numOfPages
>=
pBuf
->
inMemPages
);
if
(
pBuf
->
pFile
==
NULL
)
{
if
((
ret
=
createDiskFile
(
pBuf
))
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
}
char
*
p
=
doFlushPageToDisk
(
pBuf
,
pg
);
setPageNotInBuf
(
pg
);
pg
->
dirty
=
false
;
char
*
p
=
doFlushBufPage
(
pBuf
,
pg
);
CLEAR_BUF_PAGE_IN_MEM_FLAG
(
pg
);
pg
->
dirty
=
false
;
return
p
;
}
// load file block data in disk
static
int32_t
loadPageFromDisk
(
SDiskbasedBuf
*
pBuf
,
SPageInfo
*
pg
)
{
if
(
pg
->
offset
<
0
||
pg
->
length
<=
0
)
{
uError
(
"failed to load buf page from disk, offset:%"
PRId64
", length:%d, %s"
,
pg
->
offset
,
pg
->
length
,
pBuf
->
id
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
ret
=
taosLSeekFile
(
pBuf
->
pFile
,
pg
->
offset
,
SEEK_SET
);
if
(
ret
==
-
1
)
{
ret
=
TAOS_SYSTEM_ERROR
(
errno
);
return
ret
;
}
void
*
pPage
=
(
void
*
)
GET_
DATA_PAYLOAD
(
pg
);
void
*
pPage
=
(
void
*
)
GET_
PAYLOAD_DATA
(
pg
);
ret
=
(
int32_t
)
taosReadFile
(
pBuf
->
pFile
,
pPage
,
pg
->
length
);
if
(
ret
!=
pg
->
length
)
{
ret
=
TAOS_SYSTEM_ERROR
(
errno
);
...
...
@@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return
0
;
}
static
SPageInfo
*
register
Page
(
SDiskbasedBuf
*
pBuf
,
int32_t
pageId
)
{
static
SPageInfo
*
register
NewPageInfo
(
SDiskbasedBuf
*
pBuf
,
int32_t
pageId
)
{
pBuf
->
numOfPages
+=
1
;
SPageInfo
*
ppi
=
taosMemoryMalloc
(
sizeof
(
SPageInfo
));
if
(
ppi
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
ppi
->
pageId
=
pageId
;
ppi
->
pData
=
NULL
;
...
...
@@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListNode
*
pn
=
NULL
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
SPageInfo
*
pageInfo
=
*
(
SPageInfo
**
)
pn
->
data
;
ASSERT
(
pageInfo
->
pageId
>=
0
&&
pageInfo
->
pn
==
pn
);
SPageInfo
*
p
=
*
(
SPageInfo
**
)(
pageInfo
->
pData
);
ASSERT
(
pageInfo
->
pageId
>=
0
&&
pageInfo
->
pn
==
pn
&&
p
==
pageInfo
);
if
(
!
pageInfo
->
used
)
{
// printf("%d is chosen\n", pageInfo->pageId);
break
;
}
else
{
// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
}
}
return
pn
;
}
static
char
*
evacOneDataPage
(
SDiskbasedBuf
*
pBuf
)
{
char
*
bufPage
=
NULL
;
static
char
*
evictBufPage
(
SDiskbasedBuf
*
pBuf
)
{
SListNode
*
pn
=
getEldestUnrefedPage
(
pBuf
);
terrno
=
0
;
// all pages are referenced by user, try to allocate new space
if
(
pn
==
NULL
)
{
int32_t
prev
=
pBuf
->
inMemPages
;
// increase by 50% of previous mem pages
pBuf
->
inMemPages
=
(
int32_t
)(
pBuf
->
inMemPages
*
1
.
5
f
);
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
// pBuf->inMemPages, pBuf->pageSize);
}
else
{
tdListPopNode
(
pBuf
->
lruList
,
pn
);
if
(
pn
==
NULL
)
{
// no available buffer pages now, return.
return
NULL
;
}
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
ASSERTS
(
d
->
pn
==
pn
,
"d->pn not equal pn"
);
terrno
=
0
;
tdListPopNode
(
pBuf
->
lruList
,
pn
);
d
->
pn
=
NULL
;
taosMemoryFreeClear
(
pn
);
SPageInfo
*
d
=
*
(
SPageInfo
**
)
pn
->
data
;
bufPage
=
flushPageToDisk
(
pBuf
,
d
)
;
}
d
->
pn
=
NULL
;
taosMemoryFreeClear
(
pn
);
return
bufPage
;
return
flushBufPage
(
pBuf
,
d
)
;
}
static
void
lruListPushFront
(
SList
*
pList
,
SPageInfo
*
pi
)
{
...
...
@@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
SDiskbasedBuf
*
pPBuf
=
*
pBuf
;
if
(
pPBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
pPBuf
->
pageSize
=
pagesize
;
pPBuf
->
numOfPages
=
0
;
// all pages are in buffer in the first place
pPBuf
->
totalBufSize
=
0
;
pPBuf
->
inMemPages
=
inMemBufSize
/
pagesize
;
// maximum allowed pages, it is a soft limit.
pPBuf
->
allocateId
=
-
1
;
pPBuf
->
pFile
=
NULL
;
pPBuf
->
id
=
strdup
(
id
);
...
...
@@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf
->
freePgList
=
tdListNew
(
POINTER_BYTES
);
// at least more than 2 pages must be in memory
ASSERT
(
inMemBufSize
>=
pagesize
*
2
);
if
(
inMemBufSize
<
pagesize
*
2
)
{
inMemBufSize
=
pagesize
*
2
;
}
pPBuf
->
inMemPages
=
inMemBufSize
/
pagesize
;
// maximum allowed pages, it is a soft limit.
pPBuf
->
lruList
=
tdListNew
(
POINTER_BYTES
);
if
(
pPBuf
->
lruList
==
NULL
)
{
goto
_error
;
}
// init id hash table
_hash_fn_t
fn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
);
pPBuf
->
pIdList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pPBuf
->
pIdList
==
NULL
)
{
goto
_error
;
}
pPBuf
->
assistBuf
=
taosMemoryMalloc
(
pPBuf
->
pageSize
+
2
);
// EXTRA BYTES
if
(
pPBuf
->
assistBuf
==
NULL
)
{
goto
_error
;
}
pPBuf
->
all
=
taosHashInit
(
10
,
fn
,
true
,
false
);
pPBuf
->
prefix
=
(
char
*
)
dir
;
if
(
pPBuf
->
all
==
NULL
)
{
goto
_error
;
}
pPBuf
->
prefix
=
(
char
*
)
dir
;
pPBuf
->
emptyDummyIdList
=
taosArrayInit
(
1
,
sizeof
(
int32_t
));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize,
// pPBuf->inMemPages, pPBuf->path);
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
return
TSDB_CODE_SUCCESS
;
_error:
destroyDiskbasedBuf
(
pPBuf
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
static
char
*
doExtractPage
(
SDiskbasedBuf
*
pBuf
)
{
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
))
{
availablePage
=
evictBufPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
uWarn
(
"no available buf pages, current:%d, max:%d"
,
listNEles
(
pBuf
->
lruList
),
pBuf
->
inMemPages
)
}
}
else
{
availablePage
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
// add extract bytes in case of zipped buffer increased.
if
(
availablePage
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
availablePage
;
}
void
*
getNewBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
*
pageId
)
{
pBuf
->
statis
.
getPages
+=
1
;
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
)
)
{
availablePage
=
evacOneDataPage
(
pBuf
)
;
char
*
availablePage
=
doExtractPage
(
pBuf
)
;
if
(
availablePage
==
NULL
)
{
return
NULL
;
}
SPageInfo
*
pi
=
NULL
;
...
...
@@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
*
pageId
=
(
++
pBuf
->
allocateId
);
// register page id info
pi
=
registerPage
(
pBuf
,
*
pageId
);
pi
=
registerNewPageInfo
(
pBuf
,
*
pageId
);
if
(
pi
==
NULL
)
{
return
NULL
;
}
// add to hash map
taosHashPut
(
pBuf
->
all
,
pageId
,
sizeof
(
int32_t
),
&
pi
,
POINTER_BYTES
);
...
...
@@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to LRU list
ASSERT
(
listNEles
(
pBuf
->
lruList
)
<
pBuf
->
inMemPages
&&
pBuf
->
inMemPages
>
0
);
lruListPushFront
(
pBuf
->
lruList
,
pi
);
// allocate buf
if
(
availablePage
==
NULL
)
{
pi
->
pData
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
// add extract bytes in case of zipped buffer increased.
}
else
{
pi
->
pData
=
availablePage
;
}
pi
->
pData
=
availablePage
;
((
void
**
)
pi
->
pData
)[
0
]
=
pi
;
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%"
PRId64
,
pi
->
pData
,
pi
->
pageId
,
pi
->
offset
);
#endif
return
(
void
*
)(
GET_DATA_PAYLOAD
(
pi
));
return
(
void
*
)(
GET_PAYLOAD_DATA
(
pi
));
}
void
*
getBufPage
(
SDiskbasedBuf
*
pBuf
,
int32_t
id
)
{
ASSERT
(
pBuf
!=
NULL
&&
id
>=
0
);
if
(
id
<
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
uError
(
"invalid page id:%d, %s"
,
id
,
pBuf
->
id
);
return
NULL
;
}
pBuf
->
statis
.
getPages
+=
1
;
SPageInfo
**
pi
=
taosHashGet
(
pBuf
->
all
,
&
id
,
sizeof
(
int32_t
));
ASSERT
(
pi
!=
NULL
&&
*
pi
!=
NULL
);
if
(
pi
==
NULL
||
*
pi
==
NULL
)
{
uError
(
"failed to locate the buffer page:%d, %s"
,
id
,
pBuf
->
id
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
if
(
(
*
pi
)
->
pData
!=
NULL
)
{
// it is in memory
if
(
BUF_PAGE_IN_MEM
(
*
pi
)
)
{
// it is in memory
// no need to update the LRU list if only one page exists
if
(
pBuf
->
numOfPages
==
1
)
{
(
*
pi
)
->
used
=
true
;
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
SPageInfo
**
pInfo
=
(
SPageInfo
**
)((
*
pi
)
->
pn
->
data
);
ASSERT
(
*
pInfo
==
*
pi
);
if
(
*
pInfo
!=
*
pi
)
{
uError
(
"inconsistently data in paged buffer, pInfo:%p, pi:%p, %s"
,
*
pInfo
,
*
pi
,
pBuf
->
id
);
return
NULL
;
}
lruListMoveToFront
(
pBuf
->
lruList
,
(
*
pi
));
(
*
pi
)
->
used
=
true
;
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getBufPage1 pageId:%d, offset:%"
PRId64
,
(
*
pi
)
->
pageId
,
(
*
pi
)
->
offset
);
#endif
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
else
{
// not in memory
ASSERT
((
*
pi
)
->
pData
==
NULL
&&
(
*
pi
)
->
pn
==
NULL
&&
ASSERT
((
!
BUF_PAGE_IN_MEM
(
*
pi
))
&&
(
*
pi
)
->
pn
==
NULL
&&
(((
*
pi
)
->
length
>=
0
&&
(
*
pi
)
->
offset
>=
0
)
||
((
*
pi
)
->
length
==
-
1
&&
(
*
pi
)
->
offset
==
-
1
)));
char
*
availablePage
=
NULL
;
if
(
NO_IN_MEM_AVAILABLE_PAGES
(
pBuf
))
{
availablePage
=
evacOneDataPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
return
NULL
;
}
}
(
*
pi
)
->
pData
=
doExtractPage
(
pBuf
);
if
(
availablePage
==
NULL
)
{
(
*
pi
)
->
pData
=
taosMemoryCalloc
(
1
,
getAllocPageSize
(
pBuf
->
pageSize
));
}
else
{
(
*
pi
)
->
pData
=
availablePage
;
// failed to evict buffer page, return with error code.
if
((
*
pi
)
->
pData
==
NULL
)
{
return
NULL
;
}
// set the ptr to the new SPageInfo
...
...
@@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(
*
pi
)
->
used
=
true
;
// some data has been flushed to disk, and needs to be loaded into buffer again.
if
(
(
*
pi
)
->
length
>
0
&&
(
*
pi
)
->
offset
>=
0
)
{
if
(
HAS_DATA_IN_DISK
(
*
pi
)
)
{
int32_t
code
=
loadPageFromDisk
(
pBuf
,
*
pi
);
if
(
code
!=
0
)
{
terrno
=
code
;
return
NULL
;
}
}
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_getBufPage2 pageId:%d, offset:%"
PRId64
,
(
*
pi
)
->
pageId
,
(
*
pi
)
->
offset
);
#endif
return
(
void
*
)(
GET_
DATA_PAYLOAD
(
*
pi
));
return
(
void
*
)(
GET_
PAYLOAD_DATA
(
*
pi
));
}
}
void
releaseBufPage
(
SDiskbasedBuf
*
pBuf
,
void
*
page
)
{
if
(
ASSERTS
(
pBuf
!=
NULL
&&
page
!=
NULL
,
"pBuf or page is NULL"
)
)
{
if
(
page
==
NULL
)
{
return
;
}
SPageInfo
*
ppi
=
getPageInfoFromPayload
(
page
);
releaseBufPageInfo
(
pBuf
,
ppi
);
}
...
...
@@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG
uDebug
(
"page_releaseBufPageInfo pageId:%d, used:%d, offset:%"
PRId64
,
pi
->
pageId
,
pi
->
used
,
pi
->
offset
);
#endif
if
(
ASSERTS
(
pi
->
pData
!=
NULL
,
"pi->pData is NULL"
))
{
if
(
pi
==
NULL
)
{
return
;
}
if
(
pi
->
pData
==
NULL
)
{
uError
(
"pi->pData (page data) is null"
);
return
;
}
...
...
@@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
size_t
getTotalBufSize
(
const
SDiskbasedBuf
*
pBuf
)
{
return
(
size_t
)
pBuf
->
totalBufSize
;
}
SArray
*
getDataBufPagesIdList
(
SDiskbasedBuf
*
pBuf
)
{
ASSERT
(
pBuf
!=
NULL
);
return
pBuf
->
pIdList
;
}
...
...
@@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) {
}
int32_t
getPageId
(
const
SPageInfo
*
pPgInfo
)
{
ASSERT
(
pPgInfo
!=
NULL
);
return
pPgInfo
->
pageId
;
}
...
...
tests/develop-test/2-query/show_create_db.py
0 → 100644
浏览文件 @
4c2a41ec
import
sys
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
from
util.dnodes
import
tdDnodes
from
math
import
inf
class
TDTestCase
:
def
caseDescription
(
self
):
'''
case1<shenglian zhou>: [TD-11204]Difference improvement that can ignore negative
'''
return
def
init
(
self
,
conn
,
logSql
,
replicaVer
=
1
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
self
.
_conn
=
conn
def
restartTaosd
(
self
,
index
=
1
,
dbname
=
"db"
):
tdDnodes
.
stop
(
index
)
tdDnodes
.
startWithoutSleep
(
index
)
tdSql
.
execute
(
f
"use scd"
)
def
run
(
self
):
print
(
"running {}"
.
format
(
__file__
))
tdSql
.
execute
(
"drop database if exists scd"
)
tdSql
.
execute
(
"create database if not exists scd"
)
tdSql
.
execute
(
'use scd'
)
tdSql
.
execute
(
'create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);'
)
tdSql
.
execute
(
"create table tb1 using stb1 tags(1,'1',1.0);"
)
tdSql
.
execute
(
"create table tb2 using stb1 tags(2,'2',2.0);"
)
tdSql
.
execute
(
"create table tb3 using stb1 tags(3,'3',3.0);"
)
tdSql
.
execute
(
'create database scd2 stt_trigger 3;'
)
tdSql
.
execute
(
'create database scd4 stt_trigger 13;'
)
tdSql
.
query
(
'show create database scd;'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
tdSql
.
query
(
'show create database scd2;'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd2'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
tdSql
.
query
(
'show create database scd4'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd4'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
self
.
restartTaosd
(
1
,
dbname
=
'scd'
)
tdSql
.
query
(
'show create database scd;'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 1 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
tdSql
.
query
(
'show create database scd2;'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd2'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd2` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 3 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
tdSql
.
query
(
'show create database scd4'
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'scd4'
)
tdSql
.
checkData
(
0
,
1
,
"CREATE DATABASE `scd4` BUFFER 256 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 13 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0"
)
tdSql
.
execute
(
'drop database scd'
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/parallel_test/cases.task
浏览文件 @
4c2a41ec
...
...
@@ -1060,6 +1060,7 @@
#develop test
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py
...
...
tests/script/api/batchprepare.c
浏览文件 @
4c2a41ec
...
...
@@ -2828,7 +2828,7 @@ void runAll(TAOS *taos) {
printf
(
"%s Begin
\n
"
,
gCaseCtrl
.
caseCatalog
);
runCaseList
(
taos
);
#if
0
#if
1
strcpy
(
gCaseCtrl
.
caseCatalog
,
"Micro DB precision Test"
);
printf
(
"%s Begin
\n
"
,
gCaseCtrl
.
caseCatalog
);
gCaseCtrl
.
precision
=
TIME_PRECISION_MICRO
;
...
...
tests/script/tsim/query/sys_tbname.sim
浏览文件 @
4c2a41ec
...
...
@@ -86,4 +86,23 @@ if $data00 != @ins_tags@ then
return -1
endi
sql create stable stb(ts timestamp, f int) tags(t1 int, t2 int, t3 int, t4 int, t5 int);
$i = 0
$tbNum = 1000
$tbPrefix = stb_tb
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i , $i , $i , $i , $i )
$i = $i + 1
endw
sql select tag_value from information_schema.ins_tags where stable_name='stb';
if $rows != 5000 then
print $rows
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
tools/shell/inc/shellInt.h
浏览文件 @
4c2a41ec
...
...
@@ -148,5 +148,6 @@ void shellRunSingleCommandWebsocketImp(char *command);
// shellMain.c
extern
SShellObj
shell
;
extern
void
tscWriteCrashInfo
(
int
signum
,
void
*
sigInfo
,
void
*
context
);
#endif
/*_TD_SHELL_INT_H_*/
tools/shell/src/shellEngine.c
浏览文件 @
4c2a41ec
...
...
@@ -1137,10 +1137,8 @@ int32_t shellExecute() {
taosSetSignal
(
SIGTERM
,
shellQueryInterruptHandler
);
taosSetSignal
(
SIGHUP
,
shellQueryInterruptHandler
);
taosSetSignal
(
SIGABRT
,
shellQueryInterruptHandler
);
taosSetSignal
(
SIGINT
,
shellQueryInterruptHandler
);
#ifdef WEBSOCKET
if
(
!
shell
.
args
.
restful
&&
!
shell
.
args
.
cloud
)
{
#endif
...
...
tools/shell/src/shellMain.c
浏览文件 @
4c2a41ec
...
...
@@ -19,6 +19,29 @@
SShellObj
shell
=
{
0
};
void
shellCrashHandler
(
int
signum
,
void
*
sigInfo
,
void
*
context
)
{
taosIgnSignal
(
SIGTERM
);
taosIgnSignal
(
SIGHUP
);
taosIgnSignal
(
SIGINT
);
taosIgnSignal
(
SIGBREAK
);
#if !defined(WINDOWS)
taosIgnSignal
(
SIGBUS
);
#endif
taosIgnSignal
(
SIGABRT
);
taosIgnSignal
(
SIGFPE
);
taosIgnSignal
(
SIGSEGV
);
tscWriteCrashInfo
(
signum
,
sigInfo
,
context
);
#ifdef _TD_DARWIN_64
exit
(
signum
);
#elif defined(WINDOWS)
exit
(
signum
);
#endif
}
int
main
(
int
argc
,
char
*
argv
[])
{
shell
.
exit
=
false
;
#ifdef WEBSOCKET
...
...
@@ -26,6 +49,13 @@ int main(int argc, char *argv[]) {
shell
.
args
.
cloud
=
true
;
#endif
#if !defined(WINDOWS)
taosSetSignal
(
SIGBUS
,
shellCrashHandler
);
#endif
taosSetSignal
(
SIGABRT
,
shellCrashHandler
);
taosSetSignal
(
SIGFPE
,
shellCrashHandler
);
taosSetSignal
(
SIGSEGV
,
shellCrashHandler
);
if
(
shellCheckIntSize
()
!=
0
)
{
return
-
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录