Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
177fe40e
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
177fe40e
编写于
10月 17, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
merge 3.0
上级
68e0ddfa
ced97588
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
367 addition
and
157 deletion
+367
-157
docs/examples/python/tmq_example.py
docs/examples/python/tmq_example.py
+57
-5
source/client/src/clientHb.c
source/client/src/clientHb.c
+1
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+2
-2
source/client/src/clientRawBlockWrite.c
source/client/src/clientRawBlockWrite.c
+13
-7
source/client/src/clientSml.c
source/client/src/clientSml.c
+25
-11
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+3
-0
source/common/src/tdataformat.c
source/common/src/tdataformat.c
+4
-0
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+6
-1
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+6
-9
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+10
-5
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+11
-8
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+8
-8
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+6
-31
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+25
-19
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+1
-3
source/libs/executor/src/tsimplehash.c
source/libs/executor/src/tsimplehash.c
+2
-2
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+2
-1
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+2
-3
source/util/src/tbloomfilter.c
source/util/src/tbloomfilter.c
+4
-4
tests/system-test/1-insert/alter_database.py
tests/system-test/1-insert/alter_database.py
+58
-0
tests/system-test/7-tmq/create_wrong_topic.py
tests/system-test/7-tmq/create_wrong_topic.py
+77
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-3
tools/shell/src/shellTire.c
tools/shell/src/shellTire.c
+3
-4
tools/taosws-rs
tools/taosws-rs
+1
-0
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+4
-4
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+29
-22
utils/test/c/tmq_taosx_ci.c
utils/test/c/tmq_taosx_ci.c
+1
-1
未找到文件。
docs/examples/python/tmq_example.py
浏览文件 @
177fe40e
import
taos
from
taos.tmq
import
TaosConsumer
consumer
=
TaosConsumer
(
'topic_ctb_column'
,
group_id
=
'vg2'
)
for
msg
in
consumer
:
for
row
in
msg
:
print
(
row
)
from
taos.tmq
import
*
conn
=
taos
.
connect
()
print
(
"init"
)
conn
.
execute
(
"drop database if exists py_tmq"
)
conn
.
execute
(
"create database if not exists py_tmq vgroups 2"
)
conn
.
select_db
(
"py_tmq"
)
conn
.
execute
(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
)
conn
.
execute
(
"create table if not exists tb1 using stb1 tags(1)"
)
conn
.
execute
(
"create table if not exists tb2 using stb1 tags(2)"
)
conn
.
execute
(
"create table if not exists tb3 using stb1 tags(3)"
)
print
(
"create topic"
)
conn
.
execute
(
"drop topic if exists topic_ctb_column"
)
conn
.
execute
(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
print
(
"build consumer"
)
conf
=
TaosTmqConf
()
conf
.
set
(
"group.id"
,
"tg2"
)
conf
.
set
(
"td.connect.user"
,
"root"
)
conf
.
set
(
"td.connect.pass"
,
"taosdata"
)
conf
.
set
(
"enable.auto.commit"
,
"true"
)
def
tmq_commit_cb_print
(
tmq
,
resp
,
offset
,
param
=
None
):
print
(
f
"commit:
{
resp
}
, tmq:
{
tmq
}
, offset:
{
offset
}
, param:
{
param
}
"
)
conf
.
set_auto_commit_cb
(
tmq_commit_cb_print
,
None
)
tmq
=
conf
.
new_consumer
()
print
(
"build topic list"
)
topic_list
=
TaosTmqList
()
topic_list
.
append
(
"topic_ctb_column"
)
print
(
"basic consume loop"
)
tmq
.
subscribe
(
topic_list
)
sub_list
=
tmq
.
subscription
()
print
(
"subscribed topics: "
,
sub_list
)
while
1
:
res
=
tmq
.
poll
(
1000
)
if
res
:
topic
=
res
.
get_topic_name
()
vg
=
res
.
get_vgroup_id
()
db
=
res
.
get_db_name
()
print
(
f
"topic:
{
topic
}
\n
vgroup id:
{
vg
}
\n
db:
{
db
}
"
)
for
row
in
res
:
print
(
row
)
source/client/src/clientHb.c
浏览文件 @
177fe40e
...
...
@@ -173,7 +173,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj
->
pAppInfo
->
totalDnodes
=
pRsp
->
query
->
totalDnodes
;
pTscObj
->
pAppInfo
->
onlineDnodes
=
pRsp
->
query
->
onlineDnodes
;
pTscObj
->
connId
=
pRsp
->
query
->
connId
;
tscTrace
(
"conn %
p
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
tscTrace
(
"conn %
u
hb rsp, dnodes %d/%d"
,
pTscObj
->
connId
,
pTscObj
->
pAppInfo
->
onlineDnodes
,
pTscObj
->
pAppInfo
->
totalDnodes
);
if
(
pRsp
->
query
->
killRid
)
{
...
...
source/client/src/clientImpl.c
浏览文件 @
177fe40e
...
...
@@ -186,7 +186,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
STscObj
*
pTscObj
=
(
*
pRequest
)
->
pTscObj
;
if
(
taosHashPut
(
pTscObj
->
pRequests
,
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
),
&
(
*
pRequest
)
->
self
,
sizeof
((
*
pRequest
)
->
self
)))
{
tscError
(
"%
d failed to add to request container, reqId:0x%"
PRIx64
", conn:%d
, %s"
,
(
*
pRequest
)
->
self
,
tscError
(
"%
"
PRIx64
" failed to add to request container, reqId:0x%"
PRIu64
", conn:%"
PRIx64
"
, %s"
,
(
*
pRequest
)
->
self
,
(
*
pRequest
)
->
requestId
,
pTscObj
->
id
,
sql
);
taosMemoryFree
(
param
);
...
...
@@ -371,7 +371,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
pInfo
->
pQnodeList
=
taosArrayDup
(
pNodeList
);
taosArraySort
(
pInfo
->
pQnodeList
,
compareQueryNodeLoad
);
tscDebug
(
"QnodeList updated in cluster 0x%"
PRIx64
", num:%d"
,
pInfo
->
clusterId
,
taosArrayGetSize
(
pInfo
->
pQnodeList
));
(
int
)
taosArrayGetSize
(
pInfo
->
pQnodeList
));
}
taosThreadMutexUnlock
(
&
pInfo
->
qnodeMutex
);
...
...
source/client/src/clientRawBlockWrite.c
浏览文件 @
177fe40e
...
...
@@ -410,6 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVAlterTbReq
vAlterTbReq
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -419,7 +420,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -524,6 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropStbReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -533,7 +535,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -556,6 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder
decoder
=
{
0
};
SVDropTbBatchReq
req
=
{
0
};
char
*
string
=
NULL
;
cJSON
*
json
=
NULL
;
// decode
void
*
data
=
POINTER_SHIFT
(
metaRsp
->
metaRsp
,
sizeof
(
SMsgHead
));
...
...
@@ -565,7 +568,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
goto
_exit
;
}
cJSON
*
json
=
cJSON_CreateObject
();
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
_exit
;
}
...
...
@@ -684,7 +687,7 @@ end:
static
int32_t
taosDropStb
(
TAOS
*
taos
,
void
*
meta
,
int32_t
metaLen
)
{
SVDropStbReq
req
=
{
0
};
SDecoder
coder
;
SDecoder
coder
=
{
0
}
;
SMDropStbReq
pReq
=
{
0
};
int32_t
code
=
TSDB_CODE_SUCCESS
;
SRequestObj
*
pRequest
=
NULL
;
...
...
@@ -1212,6 +1215,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
code
=
TSDB_CODE_SUCCESS
;
STableMeta
*
pTableMeta
=
NULL
;
SQuery
*
pQuery
=
NULL
;
SSubmitReq
*
subReq
=
NULL
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
createRequest
(
*
(
int64_t
*
)
taos
,
TSDB_SQL_INSERT
);
if
(
!
pRequest
)
{
...
...
@@ -1228,8 +1232,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
SName
pName
=
{
TSDB_TABLE_NAME_T
,
pRequest
->
pTscObj
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
pRequest
->
pDb
);
strcpy
(
pName
.
tname
,
tbname
);
tstrncpy
(
pName
.
dbname
,
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
tstrncpy
(
pName
.
tname
,
tbname
,
sizeof
(
pName
.
tname
)
);
struct
SCatalog
*
pCatalog
=
NULL
;
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
...
...
@@ -1278,7 +1282,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t
submitLen
=
sizeof
(
SSubmitBlk
)
+
schemaLen
+
rows
*
extendedRowSize
;
int32_t
totalLen
=
sizeof
(
SSubmitReq
)
+
submitLen
;
SSubmitReq
*
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
subReq
=
taosMemoryCalloc
(
1
,
totalLen
);
SSubmitBlk
*
blk
=
POINTER_SHIFT
(
subReq
,
sizeof
(
SSubmitReq
));
void
*
blkSchema
=
POINTER_SHIFT
(
blk
,
sizeof
(
SSubmitBlk
));
STSRow
*
rowData
=
POINTER_SHIFT
(
blkSchema
,
schemaLen
);
...
...
@@ -1352,6 +1356,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
if
(
NULL
==
pQuery
)
{
uError
(
"create SQuery error"
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
subReq
);
goto
end
;
}
pQuery
->
execMode
=
QUERY_EXEC_MODE_SCHEDULE
;
...
...
@@ -1390,6 +1395,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end:
taosMemoryFreeClear
(
pTableMeta
);
qDestroyQuery
(
pQuery
);
taosMemoryFree
(
subReq
);
return
code
;
}
...
...
source/client/src/clientSml.c
浏览文件 @
177fe40e
...
...
@@ -299,6 +299,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
for
(;
i
<
taosArrayGetSize
(
cols
);
i
++
)
{
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosArrayGetP
(
cols
,
i
);
if
(
taosHashGet
(
hashTmp
,
kv
->
key
,
kv
->
keyLen
)
==
NULL
)
{
taosHashCleanup
(
hashTmp
);
return
-
1
;
}
}
...
...
@@ -430,7 +431,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
STableMeta
*
pTableMeta
=
NULL
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
SRequestConnInfo
conn
=
{
0
};
conn
.
pTrans
=
info
->
taos
->
pAppInfo
->
pTransporter
;
...
...
@@ -874,7 +875,8 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
kv
->
i
=
ts
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1009,6 +1011,7 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
static
int32_t
smlParseTelnetTags
(
const
char
*
data
,
SArray
*
cols
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
const
char
*
sql
=
data
;
size_t
childTableNameLen
=
strlen
(
tsSmlChildTableName
);
while
(
*
sql
!=
'\0'
)
{
...
...
@@ -1082,7 +1085,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
kv
->
length
=
valueLen
;
kv
->
type
=
TSDB_DATA_TYPE_NCHAR
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1370,8 +1373,14 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj
*
s2
=
*
(
SHashObj
**
)
key2
;
SSmlKv
*
kv1
=
*
(
SSmlKv
**
)
taosHashGet
(
s1
,
TS
,
TS_LEN
);
SSmlKv
*
kv2
=
*
(
SSmlKv
**
)
taosHashGet
(
s2
,
TS
,
TS_LEN
);
ASSERT
(
kv1
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
ASSERT
(
kv2
->
type
==
TSDB_DATA_TYPE_TIMESTAMP
);
if
(
!
kv1
||
kv1
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv1"
);
return
-
1
;
}
if
(
!
kv2
||
kv2
->
type
!=
TSDB_DATA_TYPE_TIMESTAMP
){
uError
(
"smlKvTimeHashCompare kv2"
);
return
-
1
;
}
if
(
kv1
->
i
<
kv2
->
i
)
{
return
-
1
;
}
else
if
(
kv1
->
i
>
kv2
->
i
)
{
...
...
@@ -1735,7 +1744,7 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
kv
->
i
=
tsVal
;
kv
->
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
kv
->
length
=
(
int16_t
)
tDataTypes
[
kv
->
type
].
bytes
;
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1932,6 +1941,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
}
static
int32_t
smlParseColsFromJSON
(
cJSON
*
root
,
SArray
*
cols
)
{
if
(
!
cols
)
return
TSDB_CODE_OUT_OF_MEMORY
;
cJSON
*
metricVal
=
cJSON_GetObjectItem
(
root
,
"value"
);
if
(
metricVal
==
NULL
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1941,7 +1951,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if
(
!
kv
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
if
(
cols
)
taosArrayPush
(
cols
,
&
kv
);
taosArrayPush
(
cols
,
&
kv
);
kv
->
key
=
VALUE
;
kv
->
keyLen
=
VALUE_LEN
;
...
...
@@ -1955,7 +1965,9 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
static
int32_t
smlParseTagsFromJSON
(
cJSON
*
root
,
SArray
*
pKVs
,
char
*
childTableName
,
SHashObj
*
dumplicateKey
,
SSmlMsgBuf
*
msg
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
if
(
!
pKVs
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
cJSON
*
tags
=
cJSON_GetObjectItem
(
root
,
"tags"
);
if
(
tags
==
NULL
||
tags
->
type
!=
cJSON_Object
)
{
return
TSDB_CODE_TSC_INVALID_JSON
;
...
...
@@ -1985,14 +1997,14 @@ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableN
return
TSDB_CODE_TSC_INVALID_JSON
;
}
memset
(
childTableName
,
0
,
TSDB_TABLE_NAME_LEN
);
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
t
strncpy
(
childTableName
,
tag
->
valuestring
,
TSDB_TABLE_NAME_LEN
);
continue
;
}
// add kv to SSmlKv
SSmlKv
*
kv
=
(
SSmlKv
*
)
taosMemoryCalloc
(
sizeof
(
SSmlKv
),
1
);
if
(
!
kv
)
return
TSDB_CODE_OUT_OF_MEMORY
;
if
(
pKVs
)
taosArrayPush
(
pKVs
,
&
kv
);
taosArrayPush
(
pKVs
,
&
kv
);
// key
kv
->
keyLen
=
keyLen
;
...
...
@@ -2103,6 +2115,8 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
if
(
!
oneTable
)
{
tinfo
=
smlBuildTableInfo
();
if
(
!
tinfo
)
{
smlDestroyCols
(
cols
);
if
(
info
->
dataFormat
)
taosArrayDestroy
(
cols
);
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
taosHashPut
(
info
->
childTables
,
elements
.
measure
,
elements
.
measureTagsLen
,
&
tinfo
,
POINTER_BYTES
);
...
...
@@ -2295,7 +2309,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo
*
tableData
=
*
oneTable
;
SName
pName
=
{
TSDB_TABLE_NAME_T
,
info
->
taos
->
acctId
,
{
0
},
{
0
}};
strcpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
);
tstrncpy
(
pName
.
dbname
,
info
->
pRequest
->
pDb
,
sizeof
(
pName
.
dbname
)
);
memcpy
(
pName
.
tname
,
tableData
->
childTableName
,
strlen
(
tableData
->
childTableName
));
SRequestConnInfo
conn
=
{
0
};
...
...
source/client/src/clientStmt.c
浏览文件 @
177fe40e
...
...
@@ -201,6 +201,9 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
}
STableDataBlocks
**
pSrc
=
taosHashGet
(
pStmt
->
exec
.
pBlockHash
,
pStmt
->
bInfo
.
tbFName
,
strlen
(
pStmt
->
bInfo
.
tbFName
));
if
(
!
pSrc
){
return
TSDB_CODE_OUT_OF_MEMORY
;
}
STableDataBlocks
*
pDst
=
NULL
;
STMT_ERR_RET
(
qCloneStmtDataBlock
(
&
pDst
,
*
pSrc
));
...
...
source/common/src/tdataformat.c
浏览文件 @
177fe40e
...
...
@@ -916,6 +916,10 @@ char *tTagValToData(const STagVal *value, bool isJson) {
}
bool
tTagGet
(
const
STag
*
pTag
,
STagVal
*
pTagVal
)
{
if
(
!
pTag
||
!
pTagVal
){
return
false
;
}
int16_t
lidx
=
0
;
int16_t
ridx
=
pTag
->
nTag
-
1
;
int16_t
midx
;
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
177fe40e
...
...
@@ -51,7 +51,7 @@ static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema
return
-
1
;
}
str
cpy
(
pMetaRsp
->
tbName
,
tbName
);
str
ncpy
(
pMetaRsp
->
tbName
,
tbName
,
TSDB_TABLE_NAME_LEN
);
pMetaRsp
->
numOfColumns
=
pSchema
->
nCols
;
pMetaRsp
->
tableType
=
TSDB_NORMAL_TABLE
;
pMetaRsp
->
sversion
=
pSchema
->
version
;
...
...
@@ -817,6 +817,11 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
const
void
*
pData
=
NULL
;
int
nData
=
0
;
if
(
pAlterTbReq
->
tagName
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
// search name index
ret
=
tdbTbGet
(
pMeta
->
pNameIdx
,
pAlterTbReq
->
tbName
,
strlen
(
pAlterTbReq
->
tbName
)
+
1
,
&
pVal
,
&
nVal
);
if
(
ret
<
0
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
177fe40e
...
...
@@ -819,7 +819,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
nRef
=
atomic_sub_fetch_32
(
&
fSet
.
pHeadF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbHeadFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
fSet
.
pHeadF
,
fname
);
taosRemoveFile
(
fname
);
(
void
)
taosRemoveFile
(
fname
);
taosMemoryFree
(
fSet
.
pHeadF
);
}
}
else
{
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
177fe40e
...
...
@@ -151,9 +151,6 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s"
,
pInfo
->
loadBlocks
,
pIter
->
uid
,
pIter
->
iStt
,
pIter
->
iSttBlk
,
pInfo
->
currentLoadBlockIndex
,
pBlock
,
el
,
idStr
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
}
pInfo
->
blockIndex
[
pInfo
->
currentLoadBlockIndex
]
=
pIter
->
iSttBlk
;
tsdbDebug
(
"last block index list:%d, %d, %s"
,
pInfo
->
blockIndex
[
0
],
pInfo
->
blockIndex
[
1
],
idStr
);
...
...
@@ -466,8 +463,8 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
}
bool
tLDataIterNextRow
(
SLDataIter
*
pIter
,
const
char
*
idStr
)
{
int32_t
code
=
0
;
int32_t
step
=
pIter
->
backward
?
-
1
:
1
;
terrno
=
TSDB_CODE_SUCCESS
;
// no qualified last file block in current file, no need to fetch row
if
(
pIter
->
pSttBlk
==
NULL
)
{
...
...
@@ -476,6 +473,10 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
int32_t
iBlockL
=
pIter
->
iSttBlk
;
SBlockData
*
pBlockData
=
loadLastBlock
(
pIter
,
idStr
);
if
(
pBlockData
==
NULL
&&
terrno
!=
TSDB_CODE_SUCCESS
)
{
goto
_exit
;
}
pIter
->
iRow
+=
step
;
while
(
1
)
{
...
...
@@ -501,11 +502,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter
->
rInfo
.
row
=
tsdbRowFromBlockData
(
pBlockData
,
pIter
->
iRow
);
_exit:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
}
return
(
code
==
TSDB_CODE_SUCCESS
)
&&
(
pIter
->
pSttBlk
!=
NULL
);
return
(
terrno
==
TSDB_CODE_SUCCESS
)
&&
(
pIter
->
pSttBlk
!=
NULL
);
}
SRowInfo
*
tLDataIterGet
(
SLDataIter
*
pIter
)
{
return
&
pIter
->
rInfo
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
177fe40e
...
...
@@ -340,7 +340,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
pIter
->
pLastBlockReader
=
taosMemoryCalloc
(
1
,
sizeof
(
struct
SLastBlockReader
));
if
(
pIter
->
pLastBlockReader
==
NULL
)
{
int32_t
code
=
TSDB_CODE_OUT_OF_MEMORY
;
tsdbError
(
"failed to prepare the last block iterator, code:%
d
%s"
,
tstrerror
(
code
),
pReader
->
idStr
);
tsdbError
(
"failed to prepare the last block iterator, code:%
s
%s"
,
tstrerror
(
code
),
pReader
->
idStr
);
return
code
;
}
}
...
...
@@ -646,7 +646,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
double
el
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
tsdbDebug
(
"load block of %
d
tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"load block of %
"
PRIzu
"
tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s"
,
numOfTables
,
pBlockNum
->
numOfBlocks
,
numOfQTable
,
pBlockNum
->
numOfLastFiles
,
sizeInDisk
/
1000
.
0
,
el
,
pReader
->
idStr
);
...
...
@@ -1515,6 +1515,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
taosMemoryFree
(
pReader
->
pMemSchema
);
int32_t
code
=
metaGetTbTSchemaEx
(
pReader
->
pTsdb
->
pVnode
->
pMeta
,
pReader
->
suid
,
uid
,
sversion
,
&
pReader
->
pMemSchema
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
}
return
pReader
->
pMemSchema
;
}
...
...
@@ -3085,7 +3090,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
doMergeRowsInBuf
(
&
pBlockScanInfo
->
iter
,
pBlockScanInfo
->
uid
,
k
.
ts
,
pBlockScanInfo
->
delSkyline
,
&
merge
,
pReader
);
tRowMerge
(
&
merge
,
piRow
);
doMergeRowsInBuf
(
&
pBlockScanInfo
->
iiter
,
pBlockScanInfo
->
uid
,
k
.
ts
,
pBlockScanInfo
->
delSkyline
,
&
merge
,
pReader
);
doMergeRowsInBuf
(
&
pBlockScanInfo
->
iiter
,
pBlockScanInfo
->
uid
,
i
k
.
ts
,
pBlockScanInfo
->
delSkyline
,
&
merge
,
pReader
);
}
int32_t
code
=
tRowMergerGetRow
(
&
merge
,
pTSRow
);
...
...
@@ -3443,7 +3448,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
return
code
;
_err:
tsdbError
(
"failed to create data reader, code:%s %s"
,
tstrerror
(
code
),
pReader
->
idS
tr
);
tsdbError
(
"failed to create data reader, code:%s %s"
,
tstrerror
(
code
),
ids
tr
);
return
code
;
}
...
...
@@ -3732,7 +3737,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
int32_t
tsdbReaderReset
(
STsdbReader
*
pReader
,
SQueryTableDataCond
*
pCond
)
{
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
))
{
if
(
isEmptyQueryTimeWindow
(
&
pReader
->
window
)
||
pReader
->
pReadSnap
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
177fe40e
...
...
@@ -41,7 +41,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t
nReqs
;
tDecoderInit
(
&
dc
,
(
uint8_t
*
)
pMsg
->
pCont
+
sizeof
(
SMsgHead
),
pMsg
->
contLen
-
sizeof
(
SMsgHead
));
tStartDecode
(
&
dc
);
if
(
tStartDecode
(
&
dc
)
<
0
)
{
code
=
TSDB_CODE_INVALID_MSG
;
return
code
;
}
if
(
tDecodeI32v
(
&
dc
,
&
nReqs
)
<
0
)
{
code
=
TSDB_CODE_INVALID_MSG
;
...
...
@@ -167,9 +170,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t
ret
;
if
(
!
pVnode
->
inUse
)
{
terrno
=
TSDB_CODE_VND_NOT_SYNCED
;
vError
(
"vgId:%d, not ready to write since %s"
,
TD_VID
(
pVnode
),
terrstr
());
return
-
1
;
terrno
=
TSDB_CODE_VND_NOT_SYNCED
;
vError
(
"vgId:%d, not ready to write since %s"
,
TD_VID
(
pVnode
),
terrstr
());
return
-
1
;
}
vDebug
(
"vgId:%d, start to process write request %s, index:%"
PRId64
,
TD_VID
(
pVnode
),
TMSG_INFO
(
pMsg
->
msgType
),
...
...
@@ -293,14 +296,14 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vInfo
(
"vgId:%d, commit at version %"
PRId64
,
TD_VID
(
pVnode
),
version
);
// commit current change
if
(
vnodeCommit
(
pVnode
)
<
0
)
{
vError
(
"vgId:%d, failed to commit vnode since %s."
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
vError
(
"vgId:%d, failed to commit vnode since %s."
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
// start a new one
if
(
vnodeBegin
(
pVnode
)
<
0
)
{
vError
(
"vgId:%d, failed to begin vnode since %s."
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
vError
(
"vgId:%d, failed to begin vnode since %s."
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
goto
_err
;
}
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
177fe40e
...
...
@@ -1138,7 +1138,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
s
.
bytes
=
bytes
;
s
.
slotId
=
slotId
;
s
.
precision
=
precision
;
strncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
t
strncpy
(
s
.
name
,
name
,
tListLen
(
s
.
name
));
return
s
;
}
...
...
@@ -1366,7 +1366,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
fmGetFuncExecFuncs
(
pCtx
->
functionId
,
&
pCtx
->
fpSet
);
}
else
{
char
*
udfName
=
pExpr
->
pExpr
->
_function
.
pFunctNode
->
functionName
;
strncpy
(
pCtx
->
udfName
,
udfName
,
TSDB_FUNC_NAME_LEN
);
t
strncpy
(
pCtx
->
udfName
,
udfName
,
TSDB_FUNC_NAME_LEN
);
fmGetUdafExecFuncs
(
pCtx
->
functionId
,
&
pCtx
->
fpSet
);
}
pCtx
->
fpSet
.
getEnv
(
pExpr
->
pExpr
->
_function
.
pFunctNode
,
&
env
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
177fe40e
...
...
@@ -616,7 +616,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
int32_t
nOptrWithVal
=
0
;
int32_t
code
=
encodeOperator
(
pTaskInfo
->
pRoot
,
pOutput
,
len
,
&
nOptrWithVal
);
if
((
code
==
TSDB_CODE_SUCCESS
)
&&
(
nOptrWithVal
=
0
))
{
if
((
code
==
TSDB_CODE_SUCCESS
)
&&
(
nOptrWithVal
=
=
0
))
{
taosMemoryFreeClear
(
*
pOutput
);
*
len
=
0
;
}
...
...
@@ -701,10 +701,10 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
return
0
;
}
int32_t
initQueryTableDataCondForTmq
(
SQueryTableDataCond
*
pCond
,
SSnapContext
*
sContext
,
SMetaTableInfo
m
tInfo
)
{
int32_t
initQueryTableDataCondForTmq
(
SQueryTableDataCond
*
pCond
,
SSnapContext
*
sContext
,
SMetaTableInfo
*
pM
tInfo
)
{
memset
(
pCond
,
0
,
sizeof
(
SQueryTableDataCond
));
pCond
->
order
=
TSDB_ORDER_ASC
;
pCond
->
numOfCols
=
mtInfo
.
schema
->
nCols
;
pCond
->
numOfCols
=
pMtInfo
->
schema
->
nCols
;
pCond
->
colList
=
taosMemoryCalloc
(
pCond
->
numOfCols
,
sizeof
(
SColumnInfo
));
if
(
pCond
->
colList
==
NULL
)
{
terrno
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
...
...
@@ -712,15 +712,15 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
}
pCond
->
twindows
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pCond
->
suid
=
mtInfo
.
suid
;
pCond
->
suid
=
pMtInfo
->
suid
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
sContext
->
snapVersion
;
for
(
int32_t
i
=
0
;
i
<
pCond
->
numOfCols
;
++
i
)
{
pCond
->
colList
[
i
].
type
=
mtInfo
.
schema
->
pSchema
[
i
].
type
;
pCond
->
colList
[
i
].
bytes
=
mtInfo
.
schema
->
pSchema
[
i
].
bytes
;
pCond
->
colList
[
i
].
colId
=
mtInfo
.
schema
->
pSchema
[
i
].
colId
;
pCond
->
colList
[
i
].
type
=
pMtInfo
->
schema
->
pSchema
[
i
].
type
;
pCond
->
colList
[
i
].
bytes
=
pMtInfo
->
schema
->
pSchema
[
i
].
bytes
;
pCond
->
colList
[
i
].
colId
=
pMtInfo
->
schema
->
pSchema
[
i
].
colId
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -844,7 +844,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
taosArrayDestroy
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
if
(
mtInfo
.
uid
==
0
)
return
0
;
// no data
initQueryTableDataCondForTmq
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
sContext
,
mtInfo
);
initQueryTableDataCondForTmq
(
&
pTaskInfo
->
streamInfo
.
tableCond
,
sContext
,
&
mtInfo
);
pTaskInfo
->
streamInfo
.
tableCond
.
twindows
.
skey
=
pOffset
->
ts
;
pTaskInfo
->
tableqinfoList
.
pTableList
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
taosArrayPush
(
pTaskInfo
->
tableqinfoList
.
pTableList
,
&
(
STableKeyInfo
){.
uid
=
mtInfo
.
uid
,
.
groupId
=
0
});
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
177fe40e
...
...
@@ -895,33 +895,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
}
#endif
static
uint32_t
doFilterByBlockTimeWindow
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
)
{
#if 0
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NOT_LOAD;
int32_t numOfOutput = 0; // pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
// group by + first/last should not apply the first/last block filter
if (functionId < 0) {
status |= BLK_DATA_DATA_LOAD;
return status;
} else {
// status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
// if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
// return status;
// }
}
}
return status;
#endif
return
0
;
}
int32_t
loadDataBlockOnDemand
(
SExecTaskInfo
*
pTaskInfo
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
*
status
=
BLK_DATA_NOT_LOAD
;
...
...
@@ -1802,7 +1775,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
else
{
taosMemoryFree
(
pMsg
->
pData
);
pSourceDataInfo
->
code
=
code
;
qDebug
(
"%s fetch rsp received, index:%d,
error:%d
"
,
pSourceDataInfo
->
taskId
,
index
,
tstrerror
(
code
));
qDebug
(
"%s fetch rsp received, index:%d,
code:%s
"
,
pSourceDataInfo
->
taskId
,
index
,
tstrerror
(
code
));
}
pSourceDataInfo
->
status
=
EX_SOURCE_DATA_READY
;
...
...
@@ -2816,6 +2789,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
groupId
)
{
...
...
@@ -3397,8 +3372,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
bytes
;
strncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
t
strncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
// this the tags and pseudo function columns, we only keep the tag columns
...
...
@@ -3412,7 +3387,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
bytes
;
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
strncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
177fe40e
...
...
@@ -284,6 +284,19 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return
true
;
}
static
void
doSetTagColumnData
(
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
}
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -312,8 +325,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_NOT_LOAD
)
{
qDebug
(
"%s data block skipped, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
pCost
->
skipBlocks
+=
1
;
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
pCost
->
skipBlocks
+=
1
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
*
status
==
FUNC_DATA_REQUIRED_STATIS_LOAD
)
{
pCost
->
loadBlockStatis
+=
1
;
...
...
@@ -322,6 +336,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if
(
success
)
{
// failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug
(
"%s data block SMA loaded, brange:%"
PRId64
"-%"
PRId64
", rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pBlockInfo
->
window
.
skey
,
pBlockInfo
->
window
.
ekey
,
pBlockInfo
->
rows
);
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
return
TSDB_CODE_SUCCESS
;
}
else
{
qDebug
(
"%s failed to load SMA, since not all columns have SMA"
,
GET_TASKID
(
pTaskInfo
));
...
...
@@ -371,17 +386,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
relocateColumnData
(
pBlock
,
pTableScanInfo
->
pColMatchInfo
,
pCols
,
true
);
// currently only the tbname pseudo column
if
(
pTableScanInfo
->
pseudoSup
.
numOfExprs
>
0
)
{
SExprSupp
*
pSup
=
&
pTableScanInfo
->
pseudoSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pTableScanInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pBlock
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
}
doSetTagColumnData
(
pTableScanInfo
,
pBlock
,
pTaskInfo
);
if
(
pTableScanInfo
->
pFilterNode
!=
NULL
)
{
int64_t
st
=
taosGetTimestampUs
();
...
...
@@ -1079,12 +1084,13 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
static
STimeWindow
getSlidingWindow
(
TSKEY
*
startTsCol
,
TSKEY
*
endTsCol
,
uint64_t
*
gpIdCol
,
SInterval
*
pInterval
,
SDataBlockInfo
*
pDataBlockInfo
,
int32_t
*
pRowIndex
,
bool
hasGroup
)
{
SResultRowInfo
dumyInfo
;
SResultRowInfo
dumyInfo
=
{
0
}
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
startTsCol
[
*
pRowIndex
],
pInterval
,
TSDB_ORDER_ASC
);
STimeWindow
endWin
=
win
;
STimeWindow
preWin
=
win
;
uint64_t
groupId
=
gpIdCol
[
*
pRowIndex
];
while
(
1
)
{
if
(
hasGroup
)
{
(
*
pRowIndex
)
+=
1
;
...
...
@@ -1148,6 +1154,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
pResult
->
info
.
rows
++
;
}
}
blockDataDestroy
(
tmpBlock
);
if
(
pResult
->
info
.
rows
>
0
)
{
pResult
->
info
.
calWin
=
pInfo
->
updateWin
;
return
pResult
;
...
...
@@ -1316,7 +1325,7 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
blockDataEnsureCapacity
(
pResBlock
,
1
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
pTagCalSup
->
numOfExprs
,
NULL
);
projectApplyFunctions
(
pTagCalSup
->
pExprInfo
,
pResBlock
,
pSrcBlock
,
pTagCalSup
->
pCtx
,
1
,
NULL
);
ASSERT
(
pResBlock
->
info
.
rows
==
1
);
// build tagArray
...
...
@@ -1543,7 +1552,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
prepareStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
qDebug
(
"queue scan tsdb over, switch to wal ver %
d"
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
qDebug
(
"queue scan tsdb over, switch to wal ver %
"
PRId64
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
)
<
0
)
{
return
NULL
;
}
...
...
@@ -2120,9 +2129,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree
(
pStreamScan
->
pPseudoExpr
);
}
cleanupExprSupp
(
&
pStreamScan
->
tbnameCalSup
);
cleanupExprSupp
(
&
pStreamScan
->
tagCalSup
);
updateInfoDestroy
(
pStreamScan
->
pUpdateInfo
);
blockDataDestroy
(
pStreamScan
->
pRes
);
blockDataDestroy
(
pStreamScan
->
pUpdateRes
);
...
...
@@ -2995,7 +3001,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
while
(
1
)
{
int64_t
startTs
=
taosGetTimestampUs
();
strncpy
(
pInfo
->
req
.
tb
,
tNameGetTableName
(
&
pInfo
->
name
),
tListLen
(
pInfo
->
req
.
tb
));
t
strncpy
(
pInfo
->
req
.
tb
,
tNameGetTableName
(
&
pInfo
->
name
),
tListLen
(
pInfo
->
req
.
tb
));
strcpy
(
pInfo
->
req
.
user
,
pInfo
->
pUser
);
int32_t
contLen
=
tSerializeSRetrieveTableReq
(
NULL
,
0
,
&
pInfo
->
req
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
177fe40e
...
...
@@ -2842,14 +2842,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
int32_t
numOfCols
=
0
;
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSessionNode
->
window
.
pFuncs
,
NULL
,
&
numOfCols
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pSessionNode
->
window
.
node
.
pOutputDataBlockDesc
);
initBasicInfo
(
&
pInfo
->
binfo
,
pResBlock
);
int32_t
code
=
initAggInfo
(
&
pOperator
->
exprSupp
,
&
pInfo
->
aggSup
,
pExprInfo
,
numOfCols
,
keyBufSize
,
pTaskInfo
->
id
.
str
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
initBasicInfo
(
&
pInfo
->
binfo
,
pResBlock
);
pInfo
->
twAggSup
.
waterMark
=
pSessionNode
->
window
.
watermark
;
pInfo
->
twAggSup
.
calTrigger
=
pSessionNode
->
window
.
triggerType
;
pInfo
->
gap
=
pSessionNode
->
gap
;
...
...
@@ -4666,7 +4665,6 @@ void destroyStreamStateOperatorInfo(void* param) {
SStreamSessionAggOperatorInfo
*
pChInfo
=
pChild
->
info
;
destroyStreamSessionAggOperatorInfo
(
pChInfo
);
taosMemoryFreeClear
(
pChild
);
taosMemoryFreeClear
(
pChInfo
);
}
}
colDataDestroy
(
&
pInfo
->
twAggSup
.
timeWindowData
);
...
...
source/libs/executor/src/tsimplehash.c
浏览文件 @
177fe40e
...
...
@@ -18,7 +18,7 @@
#include "tlog.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_MAX_CAPACITY (1024 * 1024 * 16
L
)
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
...
...
@@ -104,7 +104,7 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
int32_t
newCapacity
=
(
int32_t
)(
pHashObj
->
capacity
<<
1u
);
if
(
newCapacity
>
HASH_MAX_CAPACITY
)
{
uDebug
(
"current capacity:%
zu
, maximum capacity:%"
PRIu64
", no resize applied due to limitation is reached"
,
uDebug
(
"current capacity:%
"
PRIzu
"
, maximum capacity:%"
PRIu64
", no resize applied due to limitation is reached"
,
pHashObj
->
capacity
,
HASH_MAX_CAPACITY
);
return
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
177fe40e
...
...
@@ -2357,7 +2357,8 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
}
static
int32_t
setTableCacheLastMode
(
STranslateContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
if
((
!
pSelect
->
hasLastRowFunc
&&
!
pSelect
->
hasLastFunc
)
||
QUERY_NODE_REAL_TABLE
!=
nodeType
(
pSelect
->
pFromTable
))
{
if
((
!
pSelect
->
hasLastRowFunc
&&
!
pSelect
->
hasLastFunc
)
||
QUERY_NODE_REAL_TABLE
!=
nodeType
(
pSelect
->
pFromTable
)
||
TSDB_SYSTEM_TABLE
==
((
SRealTableNode
*
)
pSelect
->
pFromTable
)
->
pMeta
->
tableType
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
177fe40e
...
...
@@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
EDealRes
scanPathOptHaveNormalColImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
// *((bool*)pContext) =
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
*
((
bool
*
)
pContext
)
=
true
;
*
((
bool
*
)
pContext
)
=
(
COLUMN_TYPE_TAG
!=
((
SColumnNode
*
)
pNode
)
->
colType
&&
COLUMN_TYPE_TBNAME
!=
((
SColumnNode
*
)
pNode
)
->
colType
);
return
*
((
bool
*
)
pContext
)
?
DEAL_RES_END
:
DEAL_RES_IGNORE_CHILD
;
}
return
DEAL_RES_CONTINUE
;
...
...
source/util/src/tbloomfilter.c
浏览文件 @
177fe40e
...
...
@@ -19,12 +19,12 @@
#include "taos.h"
#include "taoserror.h"
#define UNIT_NUM_BITS 64
#define UNIT_ADDR_NUM_BITS 6
#define UNIT_NUM_BITS 64
ULL
#define UNIT_ADDR_NUM_BITS 6
ULL
static
FORCE_INLINE
bool
setBit
(
uint64_t
*
buf
,
uint64_t
index
)
{
uint64_t
unitIndex
=
index
>>
UNIT_ADDR_NUM_BITS
;
uint64_t
mask
=
1
<<
(
index
%
UNIT_NUM_BITS
);
uint64_t
mask
=
1
ULL
<<
(
index
%
UNIT_NUM_BITS
);
uint64_t
old
=
buf
[
unitIndex
];
buf
[
unitIndex
]
|=
mask
;
return
buf
[
unitIndex
]
!=
old
;
...
...
@@ -32,7 +32,7 @@ static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
static
FORCE_INLINE
bool
getBit
(
uint64_t
*
buf
,
uint64_t
index
)
{
uint64_t
unitIndex
=
index
>>
UNIT_ADDR_NUM_BITS
;
uint64_t
mask
=
1
<<
(
index
%
UNIT_NUM_BITS
);
uint64_t
mask
=
1
ULL
<<
(
index
%
UNIT_NUM_BITS
);
return
buf
[
unitIndex
]
&
mask
;
}
...
...
tests/system-test/1-insert/alter_database.py
0 → 100644
浏览文件 @
177fe40e
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
buffer_boundary
=
[
3
,
4097
,
8193
,
12289
,
16384
]
self
.
buffer_error
=
[
self
.
buffer_boundary
[
0
]
-
1
,
self
.
buffer_boundary
[
-
1
]
+
1
,
12289
,
96
]
# pages_boundary >= 64
self
.
pages_boundary
=
[
64
,
128
,
512
]
self
.
pages_error
=
[
self
.
pages_boundary
[
0
]
-
1
]
def
alter_buffer
(
self
):
tdSql
.
execute
(
'create database db'
)
for
buffer
in
self
.
buffer_boundary
:
tdSql
.
execute
(
f
'alter database db buffer
{
buffer
}
'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
8
],
buffer
)
tdSql
.
execute
(
'drop database db'
)
tdSql
.
execute
(
'create database db vgroups 10'
)
for
buffer
in
self
.
buffer_error
:
tdSql
.
error
(
f
'alter database db buffer
{
buffer
}
'
)
tdSql
.
execute
(
'drop database db'
)
def
alter_pages
(
self
):
tdSql
.
execute
(
'create database db'
)
for
pages
in
self
.
pages_boundary
:
tdSql
.
execute
(
f
'alter database db pages
{
pages
}
'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
0
][
10
],
pages
)
tdSql
.
execute
(
'drop database db'
)
tdSql
.
execute
(
'create database db'
)
tdSql
.
query
(
'select * from information_schema.ins_databases where name = "db"'
)
self
.
pages_error
.
append
(
tdSql
.
queryResult
[
0
][
10
])
for
pages
in
self
.
pages_error
:
tdSql
.
error
(
f
'alter database db pages
{
pages
}
'
)
tdSql
.
execute
(
'drop database db'
)
def
run
(
self
):
tdSql
.
error
(
'create database db1 vgroups 10 buffer 12289'
)
self
.
alter_buffer
()
self
.
alter_pages
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
\ No newline at end of file
tests/system-test/7-tmq/create_wrong_topic.py
0 → 100644
浏览文件 @
177fe40e
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.common
import
*
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.sqlset
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
setsql
=
TDSetSql
()
self
.
rowNum
=
10
self
.
ts
=
1537146000000
self
.
binary_str
=
'taosdata'
self
.
nchar_str
=
'涛思数据'
self
.
column_dict
=
{
'ts'
:
'timestamp'
,
'col1'
:
'tinyint'
,
'col2'
:
'smallint'
,
'col3'
:
'int'
,
'col4'
:
'bigint'
,
'col5'
:
'tinyint unsigned'
,
'col6'
:
'smallint unsigned'
,
'col7'
:
'int unsigned'
,
'col8'
:
'bigint unsigned'
,
'col9'
:
'float'
,
'col10'
:
'double'
,
}
self
.
error_topic
=
[
'avg'
,
'count'
,
'spread'
,
'stddev'
,
'sum'
,
'hyperloglog'
]
def
insert_data
(
self
,
column_dict
,
tbname
,
row_num
):
insert_sql
=
self
.
setsql
.
set_insertsql
(
column_dict
,
tbname
)
for
i
in
range
(
row_num
):
insert_list
=
[]
self
.
setsql
.
insert_values
(
column_dict
,
i
,
insert_sql
,
insert_list
,
self
.
ts
)
def
wrong_topic
(
self
):
tdSql
.
prepare
()
tdSql
.
execute
(
'use db'
)
stbname
=
f
'db.
{
tdCom
.
getLongName
(
5
,
"letters"
)
}
'
tag_dict
=
{
't0'
:
'int'
}
tag_values
=
[
f
'1'
]
tdSql
.
execute
(
self
.
setsql
.
set_create_stable_sql
(
stbname
,
self
.
column_dict
,
tag_dict
))
tdSql
.
execute
(
f
"create table
{
stbname
}
_tb1 using
{
stbname
}
tags(
{
tag_values
[
0
]
}
)"
)
self
.
insert_data
(
self
.
column_dict
,
f
'
{
stbname
}
_tb1'
,
self
.
rowNum
)
for
column
in
self
.
column_dict
.
keys
():
for
func
in
self
.
error_topic
:
if
func
.
lower
()
!=
'count'
and
column
.
lower
()
!=
'ts'
:
tdSql
.
error
(
f
'create topic tpn as select
{
func
}
(
{
column
}
) from
{
stbname
}
'
)
elif
func
.
lower
()
==
'count'
:
tdSql
.
error
(
f
'create topic tpn as select
{
func
}
(*) from
{
stbname
}
'
)
for
column
in
self
.
column_dict
.
keys
():
if
column
.
lower
()
!=
'ts'
:
tdSql
.
error
(
f
'create topic tpn as select apercentile(
{
column
}
,50) from
{
stbname
}
'
)
tdSql
.
error
(
f
'create topic tpn as select leastquares(
{
column
}
,1,1) from
{
stbname
}
_tb1'
)
tdSql
.
error
(
f
'create topic tpn as select HISTOGRAM(
{
column
}
,user_input,[1,3,5,7],0) from
{
stbname
}
'
)
tdSql
.
error
(
f
'create topic tpn as select percentile(
{
column
}
,1) from
{
stbname
}
_tb1'
)
pass
def
run
(
self
):
self
.
wrong_topic
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
\ No newline at end of file
tests/system-test/fulltest.sh
浏览文件 @
177fe40e
...
...
@@ -2,7 +2,7 @@
set
-e
set
-x
#
python3 ./test.py -f 0-others/taosShell.py
python3 ./test.py
-f
0-others/taosShell.py
python3 ./test.py
-f
0-others/taosShellError.py
python3 ./test.py
-f
0-others/taosShellNetChk.py
python3 ./test.py
-f
0-others/telemetry.py
...
...
@@ -18,7 +18,7 @@ python3 ./test.py -f 0-others/sysinfo.py
python3 ./test.py
-f
0-others/user_control.py
python3 ./test.py
-f
0-others/fsync.py
python3 ./test.py
-f
0-others/compatibility.py
python3 ./test.py
-f
1-insert/alter_database.py
python3 ./test.py
-f
1-insert/influxdb_line_taosc_insert.py
python3 ./test.py
-f
1-insert/opentsdb_telnet_line_taosc_insert.py
python3 ./test.py
-f
1-insert/opentsdb_json_taosc_insert.py
...
...
@@ -279,7 +279,7 @@ python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_stop_
python3 test.py
-f
6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py
-N
4
-M
1
# python3 test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups_stopOne.py -N 4 -M 1
python3 ./test.py
-f
7-tmq/create_wrong_topic.py
python3 ./test.py
-f
7-tmq/dropDbR3ConflictTransaction.py
-N
3
python3 ./test.py
-f
7-tmq/basic5.py
python3 ./test.py
-f
7-tmq/subscribeDb.py
...
...
tools/shell/src/shellTire.c
浏览文件 @
177fe40e
...
...
@@ -243,8 +243,8 @@ void enumAllWords(STireNode** nodes, char* prefix, SMatch* match) {
continue
;
}
else
{
// combine word string
memset
(
word
,
0
,
sizeof
(
word
));
str
cpy
(
word
,
prefix
);
memset
(
word
,
0
,
tListLen
(
word
));
str
ncpy
(
word
,
prefix
,
len
);
word
[
len
]
=
FIRST_ASCII
+
i
;
// append current char
// chain middle node
...
...
@@ -315,8 +315,7 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
}
}
// return
return
;
taosMemoryFree
(
root
);
}
SMatch
*
matchPrefix
(
STire
*
tire
,
char
*
prefix
,
SMatch
*
match
)
{
...
...
taosws-rs
@
7a94ffab
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
utils/test/c/sml_test.c
浏览文件 @
177fe40e
...
...
@@ -119,7 +119,7 @@ int smlProcess_json1_Test() {
"
\"
dc
\"
:
\"
lga
\"
"
" }"
" }"
"]"
};
"]"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -159,7 +159,7 @@ int smlProcess_json2_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -227,7 +227,7 @@ int smlProcess_json3_Test() {
" },"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
@@ -286,7 +286,7 @@ int smlProcess_json4_Test() {
"
\"
t9
\"
: false,"
"
\"
id
\"
:
\"
d1001
\"
"
" }"
"}"
};
"}"
,
};
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_JSON_PROTOCOL
,
TSDB_SML_TIMESTAMP_NANO_SECONDS
);
printf
(
"%s result:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
...
...
utils/test/c/tmqSim.c
浏览文件 @
177fe40e
...
...
@@ -155,7 +155,7 @@ static void printHelp() {
printf
(
"%s%s
\n
"
,
indent
,
"-l"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s%s
%d
\n
"
,
indent
,
indent
,
"run duration unit is minutes, default is "
,
g_stConfInfo
.
runDurationMinutes
);
printf
(
"%s%s
\n
"
,
indent
,
"-p"
);
printf
(
"%s%s%s
\n
"
,
indent
,
indent
,
"producer thread number, default is 0"
);
printf
(
"%s%s
\n
"
,
indent
,
"-b"
);
...
...
@@ -238,7 +238,7 @@ void saveConfigToLogFile() {
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" expect rows: %
d
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
taosFprintfFile
(
g_fp
,
" expect rows: %
"
PRIx64
"
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
expectMsgCnt
);
}
char
tmpString
[
128
];
...
...
@@ -263,11 +263,11 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp
();
exit
(
0
);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
dbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
dbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-w"
)
==
0
)
{
strcpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
]
);
tstrncpy
(
g_stConfInfo
.
cdbName
,
argv
[
++
i
],
sizeof
(
g_stConfInfo
.
cdbName
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
strcpy
(
configDir
,
argv
[
++
i
]
);
tstrncpy
(
configDir
,
argv
[
++
i
],
PATH_MAX
);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
)
{
g_stConfInfo
.
showMsgFlag
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
)
{
...
...
@@ -279,9 +279,9 @@ void parseArgument(int32_t argc, char* argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
g_stConfInfo
.
useSnapshot
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
)
{
char
tmpBuf
[
56
]
;
strcpy
(
tmpBuf
,
argv
[
++
i
]
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
char
tmpBuf
[
56
]
=
{
0
}
;
tstrncpy
(
tmpBuf
,
argv
[
++
i
],
sizeof
(
tmpBuf
)
);
sprintf
(
g_stConfInfo
.
topic
,
"`%s`"
,
tmpBuf
);
}
else
if
(
strcmp
(
argv
[
i
],
"-x"
)
==
0
)
{
g_stConfInfo
.
numOfThread
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
)
{
...
...
@@ -294,6 +294,10 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo
.
producerRate
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
)
{
g_stConfInfo
.
payloadLen
=
atol
(
argv
[
++
i
]);
if
(
g_stConfInfo
.
payloadLen
<=
0
||
g_stConfInfo
.
payloadLen
>
1024
*
1024
*
1024
){
pError
(
"%s calloc size is too large: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
}
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
...
...
@@ -354,8 +358,8 @@ void ltrim(char* str) {
int
queryDB
(
TAOS
*
taos
,
char
*
command
)
{
int
retryCnt
=
10
;
int
code
;
TAOS_RES
*
pRes
;
int
code
=
0
;
TAOS_RES
*
pRes
=
NULL
;
while
(
retryCnt
--
)
{
pRes
=
taos_query
(
taos
,
command
);
...
...
@@ -363,10 +367,11 @@ int queryDB(TAOS* taos, char* command) {
if
(
code
!=
0
)
{
taosSsleep
(
1
);
taos_free_result
(
pRes
);
pRes
=
NULL
;
continue
;
}
taos_free_result
(
pRes
);
return
0
;
return
0
;
}
pError
(
"failed to reason:%s, sql: %s"
,
tstrerror
(
code
),
command
);
...
...
@@ -418,7 +423,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char
sqlStr
[
1100
]
=
{
0
};
if
(
strlen
(
buf
)
>
1024
)
{
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
strlen
(
buf
));
taosFprintfFile
(
g_fp
,
"The length of one row[%d] is overflow 1024
\n
"
,
(
int
)
strlen
(
buf
));
taosCloseFile
(
&
g_fp
);
return
-
1
;
}
...
...
@@ -592,7 +597,7 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -644,7 +649,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t
vgroupId
=
tmq_get_vgroup_id
(
msg
);
const
char
*
dbName
=
tmq_get_db_name
(
msg
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
"
PRId64
"
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"consumerId: %d, msg index:%
d
\n
"
,
pInfo
->
consumerId
,
msgIndex
);
taosFprintfFile
(
g_fp
,
"dbName: %s, topic: %s, vgroupId: %d
\n
"
,
dbName
!=
NULL
?
dbName
:
"invalid table"
,
tmq_get_topic_name
(
msg
),
vgroupId
);
...
...
@@ -960,7 +965,7 @@ void parseConsumeInfo() {
ltrim
(
pstr
);
char
*
ret
=
strchr
(
pstr
,
ch
);
memcpy
(
g_stConfInfo
.
stThreads
[
i
].
key
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
pstr
,
ret
-
pstr
);
strcpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
);
tstrncpy
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
],
ret
+
1
,
sizeof
(
g_stConfInfo
.
stThreads
[
i
].
value
[
g_stConfInfo
.
stThreads
[
i
].
numOfKey
])
);
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo
.
stThreads
[
i
].
numOfKey
++
;
...
...
@@ -1268,25 +1273,26 @@ void* ombProduceThreadFunc(void* param) {
for
(
int
i
=
0
;
i
<
batchPerTblTimes
;
++
i
)
{
uint32_t
msgsOfSql
=
g_stConfInfo
.
batchSize
;
if
((
i
==
batchPerTblTimes
-
1
)
&&
(
0
!=
remainder
))
{
msgsOfSql
=
remainder
;
msgsOfSql
=
remainder
;
}
int
len
=
0
;
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"insert into %s values "
,
ctbName
);
for
(
int
j
=
0
;
j
<
msgsOfSql
;
j
++
)
{
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
int64_t
timeStamp
=
taosGetTimestampNs
();
len
+=
snprintf
(
sqlBuf
+
len
,
MAX_SQL_LEN
-
len
,
"(%"
PRId64
",
\"
%s
\"
)"
,
timeStamp
,
g_payload
);
sendMsgs
++
;
pInfo
->
totalProduceMsgs
++
;
}
totalMsgLen
+=
len
;
totalMsgLen
+=
len
;
pInfo
->
totalMsgsLen
+=
len
;
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
int64_t
affectedRows
=
queryDbExec
(
pInfo
->
taos
,
sqlBuf
,
INSERT_TYPE
);
if
(
affectedRows
<
0
)
{
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
return
NULL
;
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
affectedRowsTotal
+=
affectedRows
;
...
...
@@ -1322,6 +1328,7 @@ void* ombProduceThreadFunc(void* param) {
printf
(
"affectedRowsTotal: %"
PRId64
"
\n
"
,
affectedRowsTotal
);
taos_close
(
pInfo
->
taos
);
pInfo
->
taos
=
NULL
;
taosMemoryFree
(
sqlBuf
);
return
NULL
;
}
...
...
utils/test/c/tmq_taosx_ci.c
浏览文件 @
177fe40e
...
...
@@ -663,7 +663,7 @@ void initLogFile() {
int
main
(
int
argc
,
char
*
argv
[])
{
for
(
int32_t
i
=
1
;
i
<
argc
;
i
++
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
){
strcpy
(
g_conf
.
dir
,
argv
[
++
i
]
);
tstrncpy
(
g_conf
.
dir
,
argv
[
++
i
],
sizeof
(
g_conf
.
dir
)
);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
){
g_conf
.
snapShot
=
true
;
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
){
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录