Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
957550c4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
957550c4
编写于
9月 03, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'refact/tsdb_optimize' of github.com:taosdata/tdengine into refact/tsdb_optimize
上级
4ace5bef
d9d7c818
变更
43
展开全部
隐藏空白更改
内联
并排
Showing
43 changed file
with
799 addition
and
581 deletion
+799
-581
CMakeLists.txt
CMakeLists.txt
+1
-1
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/common/tdatablock.h
include/common/tdatablock.h
+6
-5
include/common/tmsg.h
include/common/tmsg.h
+4
-0
include/util/tdef.h
include/util/tdef.h
+6
-0
include/util/tutil.h
include/util/tutil.h
+3
-0
source/client/src/clientSml.c
source/client/src/clientSml.c
+2
-1
source/common/src/systable.c
source/common/src/systable.c
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+63
-24
source/common/src/tmsg.c
source/common/src/tmsg.c
+8
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+5
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+4
-2
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+11
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+2
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+3
-2
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+12
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+80
-1
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+18
-13
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+9
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+3
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+366
-348
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+5
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+20
-21
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+82
-101
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+37
-37
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+38
-16
tests/script/tsim/valgrind/checkError6.sim
tests/script/tsim/valgrind/checkError6.sim
+2
-0
utils/CMakeLists.txt
utils/CMakeLists.txt
+4
-0
utils/test/c/CMakeLists.txt
utils/test/c/CMakeLists.txt
+1
-0
utils/test/c/createTable.c
utils/test/c/createTable.c
+0
-0
utils/test/c/sdbDump.c
utils/test/c/sdbDump.c
+0
-0
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+0
-0
utils/test/c/tmqDemo.c
utils/test/c/tmqDemo.c
+0
-0
utils/test/c/tmqSim.c
utils/test/c/tmqSim.c
+0
-0
utils/test/c/tmq_taosx_ci.c
utils/test/c/tmq_taosx_ci.c
+0
-0
utils/tsim/CMakeLists.txt
utils/tsim/CMakeLists.txt
+0
-0
utils/tsim/inc/simInt.h
utils/tsim/inc/simInt.h
+0
-0
utils/tsim/inc/simParse.h
utils/tsim/inc/simParse.h
+0
-0
utils/tsim/src/simExe.c
utils/tsim/src/simExe.c
+0
-0
utils/tsim/src/simMain.c
utils/tsim/src/simMain.c
+0
-0
utils/tsim/src/simParse.c
utils/tsim/src/simParse.c
+0
-0
utils/tsim/src/simSystem.c
utils/tsim/src/simSystem.c
+0
-0
未找到文件。
CMakeLists.txt
浏览文件 @
957550c4
...
...
@@ -34,7 +34,7 @@ endif(${BUILD_TEST})
add_subdirectory
(
source
)
add_subdirectory
(
tools
)
add_subdirectory
(
test
s
)
add_subdirectory
(
util
s
)
add_subdirectory
(
examples/c
)
# docs
...
...
cmake/taostools_CMakeLists.txt.in
浏览文件 @
957550c4
...
...
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
f169c0f
GIT_TAG
a4d9b92
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
...
...
include/common/tdatablock.h
浏览文件 @
957550c4
...
...
@@ -184,7 +184,8 @@ static FORCE_INLINE void colDataAppendDouble(SColumnInfoData* pColumnInfoData, u
int32_t
getJsonValueLen
(
const
char
*
data
);
int32_t
colDataAppend
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
bool
isNull
);
int32_t
colDataAppendNItems
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
uint32_t
numOfRows
);
int32_t
colDataAppendNItems
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
uint32_t
numOfRows
);
int32_t
colDataMergeCol
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRow1
,
int32_t
*
capacity
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRow2
);
int32_t
colDataAssign
(
SColumnInfoData
*
pColumnInfoData
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRows
,
...
...
@@ -225,15 +226,16 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
int32_t
blockDataTrimFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
);
int32_t
blockDataKeepFirstNRows
(
SSDataBlock
*
pBlock
,
size_t
n
);
int32_t
assignOneDataBlock
(
SSDataBlock
*
dst
,
const
SSDataBlock
*
src
);
int32_t
copyDataBlock
(
SSDataBlock
*
dst
,
const
SSDataBlock
*
src
);
int32_t
assignOneDataBlock
(
SSDataBlock
*
dst
,
const
SSDataBlock
*
src
);
int32_t
copyDataBlock
(
SSDataBlock
*
dst
,
const
SSDataBlock
*
src
);
SSDataBlock
*
createDataBlock
();
void
*
blockDataDestroy
(
SSDataBlock
*
pBlock
);
void
blockDataFreeRes
(
SSDataBlock
*
pBlock
);
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
,
bool
copyData
);
SSDataBlock
*
createSpecialDataBlock
(
EStreamType
type
);
int32_t
blockDataAppendColInfo
(
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
);
int32_t
blockDataAppendColInfo
(
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
);
SColumnInfoData
createColumnInfoData
(
int16_t
type
,
int32_t
bytes
,
int16_t
colId
);
SColumnInfoData
*
bdGetColumnInfoData
(
const
SSDataBlock
*
pBlock
,
int32_t
index
);
...
...
@@ -249,7 +251,6 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SSDataBlock
*
pDataBlocks
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
suid
);
char
*
buildCtbNameByGroupId
(
const
char
*
stbName
,
uint64_t
groupId
);
static
FORCE_INLINE
int32_t
blockGetEncodeSize
(
const
SSDataBlock
*
pBlock
)
{
...
...
include/common/tmsg.h
浏览文件 @
957550c4
...
...
@@ -785,6 +785,8 @@ typedef struct {
int32_t
walRollPeriod
;
int64_t
walSegmentSize
;
int32_t
sstTrigger
;
int16_t
hashPrefix
;
int16_t
hashSuffix
;
}
SCreateDbReq
;
int32_t
tSerializeSCreateDbReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateDbReq
*
pReq
);
...
...
@@ -1194,6 +1196,8 @@ typedef struct {
int32_t
walRollPeriod
;
int64_t
walSegmentSize
;
int16_t
sstTrigger
;
int16_t
hashPrefix
;
int16_t
hashSuffix
;
}
SCreateVnodeReq
;
int32_t
tSerializeSCreateVnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SCreateVnodeReq
*
pReq
);
...
...
include/util/tdef.h
浏览文件 @
957550c4
...
...
@@ -362,6 +362,12 @@ typedef enum ELogicConditionType {
#define TSDB_MIN_SST_TRIGGER 1
#define TSDB_MAX_SST_TRIGGER 128
#define TSDB_DEFAULT_SST_TRIGGER 8
#define TSDB_MIN_HASH_PREFIX 0
#define TSDB_MAX_HASH_PREFIX 128
#define TSDB_DEFAULT_HASH_PREFIX 0
#define TSDB_MIN_HASH_SUFFIX 0
#define TSDB_MAX_HASH_SUFFIX 128
#define TSDB_DEFAULT_HASH_SUFFIX 0
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_REP_DEF_DB_WAL_RET_PERIOD 0
...
...
include/util/tutil.h
浏览文件 @
957550c4
...
...
@@ -20,6 +20,7 @@
#include "tcrc32c.h"
#include "tdef.h"
#include "tmd5.h"
#include "thash.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -68,6 +69,8 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar
memcpy
(
target
,
buf
,
TSDB_PASSWORD_LEN
);
}
#define taosGetTbHashVal(tbname, tblen, method, prefix, suffix) MurmurHash3_32((tbname), (tblen))
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientSml.c
浏览文件 @
957550c4
...
...
@@ -547,6 +547,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
goto
end
;
}
needCheckMeta
=
true
;
taosHashCleanup
(
hashTmp
);
hashTmp
=
NULL
;
}
else
{
uError
(
"SML:0x%"
PRIx64
" load table meta error: %s"
,
info
->
id
,
tstrerror
(
code
));
goto
end
;
...
...
@@ -576,7 +578,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
sTableData
->
tableMeta
=
pTableMeta
;
tableMetaSml
=
(
SSmlSTableMeta
**
)
taosHashIterate
(
info
->
superTables
,
tableMetaSml
);
taosHashCleanup
(
hashTmp
);
}
return
0
;
...
...
source/common/src/systable.c
浏览文件 @
957550c4
...
...
@@ -90,7 +90,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.
name
=
"minrows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"maxrows"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
true
},
{.
name
=
"comp"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
,
.
sysInfo
=
true
},
{.
name
=
"precision"
,
.
bytes
=
2
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
tru
e
},
{.
name
=
"precision"
,
.
bytes
=
2
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
fals
e
},
{.
name
=
"status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"retentions"
,
.
bytes
=
60
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
true
},
{.
name
=
"single_stable"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_BOOL
,
.
sysInfo
=
true
},
...
...
source/common/src/tdatablock.c
浏览文件 @
957550c4
...
...
@@ -140,7 +140,8 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
return
TSDB_CODE_SUCCESS
;
}
static
void
doCopyNItems
(
struct
SColumnInfoData
*
pColumnInfoData
,
int32_t
currentRow
,
const
char
*
pData
,
int32_t
itemLen
,
int32_t
numOfRows
)
{
static
void
doCopyNItems
(
struct
SColumnInfoData
*
pColumnInfoData
,
int32_t
currentRow
,
const
char
*
pData
,
int32_t
itemLen
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
->
info
.
bytes
>=
itemLen
);
size_t
start
=
1
;
...
...
@@ -148,21 +149,23 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
memcpy
(
pColumnInfoData
->
pData
,
pData
,
itemLen
);
int32_t
t
=
0
;
int32_t
count
=
log
(
numOfRows
)
/
log
(
2
);
while
(
t
<
count
)
{
int32_t
count
=
log
(
numOfRows
)
/
log
(
2
);
while
(
t
<
count
)
{
int32_t
xlen
=
1
<<
t
;
memcpy
(
pColumnInfoData
->
pData
+
start
*
itemLen
+
pColumnInfoData
->
varmeta
.
length
,
pColumnInfoData
->
pData
,
xlen
*
itemLen
);
memcpy
(
pColumnInfoData
->
pData
+
start
*
itemLen
+
pColumnInfoData
->
varmeta
.
length
,
pColumnInfoData
->
pData
,
xlen
*
itemLen
);
t
+=
1
;
start
+=
xlen
;
}
// the tail part
if
(
numOfRows
>
start
)
{
memcpy
(
pColumnInfoData
->
pData
+
start
*
itemLen
+
currentRow
*
itemLen
,
pColumnInfoData
->
pData
,
(
numOfRows
-
start
)
*
itemLen
);
memcpy
(
pColumnInfoData
->
pData
+
start
*
itemLen
+
currentRow
*
itemLen
,
pColumnInfoData
->
pData
,
(
numOfRows
-
start
)
*
itemLen
);
}
if
(
IS_VAR_DATA_TYPE
(
pColumnInfoData
->
info
.
type
))
{
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
pColumnInfoData
->
varmeta
.
offset
[
i
+
currentRow
]
=
pColumnInfoData
->
varmeta
.
length
+
i
*
itemLen
;
}
...
...
@@ -170,7 +173,8 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
}
}
int32_t
colDataAppendNItems
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
uint32_t
numOfRows
)
{
int32_t
colDataAppendNItems
(
SColumnInfoData
*
pColumnInfoData
,
uint32_t
currentRow
,
const
char
*
pData
,
uint32_t
numOfRows
)
{
ASSERT
(
pData
!=
NULL
&&
pColumnInfoData
!=
NULL
);
int32_t
len
=
pColumnInfoData
->
info
.
bytes
;
...
...
@@ -278,7 +282,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
}
else
{
if
(
finalNumOfRows
>
*
capacity
||
(
numOfRow1
==
0
&&
pColumnInfoData
->
info
.
bytes
!=
0
))
{
// all data may be null, when the pColumnInfoData->info.type == 0, bytes == 0;
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
// ASSERT(finalNumOfRows * pColumnInfoData->info.bytes);
char
*
tmp
=
taosMemoryRealloc
(
pColumnInfoData
->
pData
,
finalNumOfRows
*
pColumnInfoData
->
info
.
bytes
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
...
...
@@ -557,7 +561,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
}
int32_t
blockDataFromBuf
(
SSDataBlock
*
pBlock
,
const
char
*
buf
)
{
int32_t
numOfRows
=
*
(
int32_t
*
)
buf
;
int32_t
numOfRows
=
*
(
int32_t
*
)
buf
;
blockDataEnsureCapacity
(
pBlock
,
numOfRows
);
pBlock
->
info
.
rows
=
numOfRows
;
...
...
@@ -676,7 +680,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return
*/
size_t
blockDataGetSerialMetaSize
(
uint32_t
numOfCols
)
{
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
// length |
return
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int8_t
)
+
sizeof
(
int32_t
))
+
numOfCols
*
sizeof
(
int32_t
);
}
...
...
@@ -1302,6 +1307,40 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
return
TSDB_CODE_SUCCESS
;
}
SSDataBlock
*
createSpecialDataBlock
(
EStreamType
type
)
{
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
info
.
hasVarCol
=
false
;
pBlock
->
info
.
groupId
=
0
;
pBlock
->
info
.
rows
=
0
;
pBlock
->
info
.
type
=
type
;
pBlock
->
info
.
rowSize
=
sizeof
(
TSKEY
)
+
sizeof
(
TSKEY
)
+
sizeof
(
uint64_t
)
+
sizeof
(
uint64_t
)
+
sizeof
(
TSKEY
)
+
sizeof
(
TSKEY
);
pBlock
->
info
.
watermark
=
INT64_MIN
;
pBlock
->
pDataBlock
=
taosArrayInit
(
6
,
sizeof
(
SColumnInfoData
));
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
infoData
.
info
.
bytes
=
sizeof
(
TSKEY
);
// window start ts
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
// window end ts
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
infoData
.
info
.
type
=
TSDB_DATA_TYPE_UBIGINT
;
infoData
.
info
.
bytes
=
sizeof
(
uint64_t
);
// uid
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
// group id
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
// calculate start ts
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
// calculate end ts
taosArrayPush
(
pBlock
->
pDataBlock
,
&
infoData
);
return
pBlock
;
}
SSDataBlock
*
createOneDataBlock
(
const
SSDataBlock
*
pDataBlock
,
bool
copyData
)
{
if
(
pDataBlock
==
NULL
)
{
return
NULL
;
...
...
@@ -1426,7 +1465,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
}
void
colDataDestroy
(
SColumnInfoData
*
pColData
)
{
if
(
!
pColData
)
return
;
if
(
!
pColData
)
return
;
if
(
IS_VAR_DATA_TYPE
(
pColData
->
info
.
type
))
{
taosMemoryFreeClear
(
pColData
->
varmeta
.
offset
);
}
else
{
...
...
@@ -1693,7 +1732,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
struct
tm
ptm
=
{
0
};
taosLocalTime
(
&
tt
,
&
ptm
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
&
ptm
);
size_t
pos
=
strftime
(
buf
,
35
,
"%Y-%m-%d %H:%M:%S"
,
&
ptm
);
if
(
precision
==
TSDB_TIME_PRECISION_NANO
)
{
sprintf
(
buf
+
pos
,
".%09d"
,
ms
);
...
...
@@ -1847,20 +1886,20 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
break
;
case
TSDB_DATA_TYPE_VARCHAR
:
{
memset
(
pBuf
,
0
,
sizeof
(
pBuf
));
char
*
pData
=
colDataGetVarData
(
pColInfoData
,
j
);
char
*
pData
=
colDataGetVarData
(
pColInfoData
,
j
);
int32_t
dataSize
=
TMIN
(
sizeof
(
pBuf
),
varDataLen
(
pData
));
memcpy
(
pBuf
,
varDataVal
(
pData
),
dataSize
);
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
pBuf
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
}
break
;
}
break
;
case
TSDB_DATA_TYPE_NCHAR
:
{
char
*
pData
=
colDataGetVarData
(
pColInfoData
,
j
);
char
*
pData
=
colDataGetVarData
(
pColInfoData
,
j
);
int32_t
dataSize
=
TMIN
(
sizeof
(
pBuf
),
varDataLen
(
pData
));
memset
(
pBuf
,
0
,
sizeof
(
pBuf
));
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
pData
),
dataSize
,
pBuf
);
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
pData
),
dataSize
,
pBuf
);
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
" %15s |"
,
pBuf
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
}
break
;
}
break
;
}
}
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"
\n
"
);
...
...
@@ -1877,7 +1916,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
* @param pDataBlocks
* @param vgId
* @param suid
*
*
*/
int32_t
buildSubmitReqFromDataBlock
(
SSubmitReq
**
pReq
,
const
SSDataBlock
*
pDataBlock
,
STSchema
*
pTSchema
,
int32_t
vgId
,
tb_uid_t
suid
)
{
...
...
@@ -1904,8 +1943,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
tdSRowInit
(
&
rb
,
pTSchema
->
version
);
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
int32_t
colNum
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
colNum
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
// int32_t rowSize = pDataBlock->info.rowSize;
// int64_t groupId = pDataBlock->info.groupId;
...
...
@@ -1926,7 +1965,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
msgLen
+=
sizeof
(
SSubmitBlk
);
int32_t
dataLen
=
0
;
for
(
int32_t
j
=
0
;
j
<
rows
;
++
j
)
{
// iterate by row
for
(
int32_t
j
=
0
;
j
<
rows
;
++
j
)
{
// iterate by row
tdSRowResetBuf
(
&
rb
,
POINTER_SHIFT
(
pDataBuf
,
msgLen
+
dataLen
));
// set row buf
bool
isStartKey
=
false
;
int32_t
offset
=
0
;
...
...
@@ -2089,7 +2128,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
// flag segment.
// the inital bit is for column info
int32_t
*
flagSegment
=
(
int32_t
*
)
data
;
*
flagSegment
=
(
1
<<
31
);
*
flagSegment
=
(
1
<<
31
);
data
+=
sizeof
(
int32_t
);
...
...
@@ -2149,7 +2188,7 @@ void blockEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
const
char
*
pData
)
{
const
char
*
pStart
=
pData
;
int32_t
version
=
*
(
int32_t
*
)
pStart
;
int32_t
version
=
*
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
);
ASSERT
(
version
==
1
);
...
...
@@ -2158,7 +2197,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart
+=
sizeof
(
int32_t
);
// total rows sizeof(int32_t)
int32_t
numOfRows
=
*
(
int32_t
*
)
pStart
;
int32_t
numOfRows
=
*
(
int32_t
*
)
pStart
;
pStart
+=
sizeof
(
int32_t
);
// total columns sizeof(int32_t)
...
...
source/common/src/tmsg.c
浏览文件 @
957550c4
...
...
@@ -2027,6 +2027,8 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if
(
tEncodeI32
(
&
encoder
,
pReq
->
walRollPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
walSegmentSize
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
sstTrigger
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pReq
->
hashPrefix
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pReq
->
hashSuffix
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
ignoreExist
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
...
...
@@ -2074,6 +2076,8 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
walRollPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
walSegmentSize
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
sstTrigger
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pReq
->
hashPrefix
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pReq
->
hashSuffix
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
ignoreExist
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
pReq
->
pRetensions
=
taosArrayInit
(
pReq
->
numOfRetensions
,
sizeof
(
SRetention
));
...
...
@@ -3769,6 +3773,8 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if
(
tEncodeI32
(
&
encoder
,
pReq
->
walRollPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
walSegmentSize
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pReq
->
sstTrigger
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pReq
->
hashPrefix
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
&
encoder
,
pReq
->
hashSuffix
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -3842,6 +3848,8 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
walRollPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
pReq
->
walSegmentSize
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pReq
->
sstTrigger
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pReq
->
hashPrefix
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
&
decoder
,
&
pReq
->
hashSuffix
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
957550c4
...
...
@@ -171,6 +171,8 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg
->
hashBegin
=
pCreate
->
hashBegin
;
pCfg
->
hashEnd
=
pCreate
->
hashEnd
;
pCfg
->
hashMethod
=
pCreate
->
hashMethod
;
pCfg
->
hashPrefix
=
pCreate
->
hashPrefix
;
pCfg
->
hashSuffix
=
pCreate
->
hashSuffix
;
pCfg
->
standby
=
pCfg
->
standby
;
pCfg
->
syncCfg
.
myIndex
=
pCreate
->
selfIndex
;
...
...
@@ -220,9 +222,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
d
Debug
(
"vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d"
,
d
Info
(
"vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d"
,
createReq
.
vgId
,
createReq
.
isTsma
,
createReq
.
standby
,
createReq
.
cacheLast
,
createReq
.
cacheLastSize
,
createReq
.
sstTrigger
);
dInfo
(
"vgId:%d, hashMethod:%d begin:%u end:%u prefix:%d surfix:%d"
,
createReq
.
vgId
,
createReq
.
hashMethod
,
createReq
.
hashBegin
,
createReq
.
hashEnd
,
createReq
.
hashPrefix
,
createReq
.
hashSuffix
);
vmGenerateVnodeCfg
(
&
createReq
,
&
vnodeCfg
);
if
(
vmTsmaAdjustDays
(
&
vnodeCfg
,
&
createReq
)
<
0
)
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
957550c4
...
...
@@ -305,13 +305,15 @@ typedef struct {
int8_t
hashMethod
;
// default is 1
int8_t
cacheLast
;
int8_t
schemaless
;
int16_t
hashPrefix
;
int16_t
hashSuffix
;
int16_t
sstTrigger
;
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
int32_t
walRetentionPeriod
;
int64_t
walRetentionSize
;
int32_t
walRollPeriod
;
int64_t
walRetentionSize
;
int64_t
walSegmentSize
;
int16_t
sstTrigger
;
}
SDbCfg
;
typedef
struct
{
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
957550c4
...
...
@@ -30,7 +30,7 @@
#include "systable.h"
#define DB_VER_NUMBER 1
#define DB_RESERVE_SIZE
62
#define DB_RESERVE_SIZE
58
static
SSdbRaw
*
mndDbActionEncode
(
SDbObj
*
pDb
);
static
SSdbRow
*
mndDbActionDecode
(
SSdbRaw
*
pRaw
);
...
...
@@ -125,6 +125,8 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pDb
->
cfg
.
walRollPeriod
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pDb
->
cfg
.
walSegmentSize
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pDb
->
cfg
.
sstTrigger
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pDb
->
cfg
.
hashPrefix
,
_OVER
)
SDB_SET_INT16
(
pRaw
,
dataPos
,
pDb
->
cfg
.
hashSuffix
,
_OVER
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
DB_RESERVE_SIZE
,
_OVER
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
_OVER
)
...
...
@@ -209,6 +211,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pDb
->
cfg
.
walRollPeriod
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pDb
->
cfg
.
walSegmentSize
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
pDb
->
cfg
.
sstTrigger
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
pDb
->
cfg
.
hashPrefix
,
_OVER
)
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
pDb
->
cfg
.
hashSuffix
,
_OVER
)
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
DB_RESERVE_SIZE
,
_OVER
)
taosInitRWLatch
(
&
pDb
->
lock
);
...
...
@@ -334,6 +338,8 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if
(
pCfg
->
walRollPeriod
<
TSDB_DB_MIN_WAL_ROLL_PERIOD
)
return
-
1
;
if
(
pCfg
->
walSegmentSize
<
TSDB_DB_MIN_WAL_SEGMENT_SIZE
)
return
-
1
;
if
(
pCfg
->
sstTrigger
<
TSDB_MIN_SST_TRIGGER
||
pCfg
->
sstTrigger
>
TSDB_MAX_SST_TRIGGER
)
return
-
1
;
if
(
pCfg
->
hashPrefix
<
TSDB_MIN_HASH_PREFIX
||
pCfg
->
hashPrefix
>
TSDB_MAX_HASH_PREFIX
)
return
-
1
;
if
(
pCfg
->
hashSuffix
<
TSDB_MIN_HASH_SUFFIX
||
pCfg
->
hashSuffix
>
TSDB_MAX_HASH_SUFFIX
)
return
-
1
;
terrno
=
0
;
return
terrno
;
...
...
@@ -368,6 +374,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
walRollPeriod
<
0
)
pCfg
->
walRollPeriod
=
TSDB_REPS_DEF_DB_WAL_ROLL_PERIOD
;
if
(
pCfg
->
walSegmentSize
<
0
)
pCfg
->
walSegmentSize
=
TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE
;
if
(
pCfg
->
sstTrigger
<=
0
)
pCfg
->
sstTrigger
=
TSDB_DEFAULT_SST_TRIGGER
;
if
(
pCfg
->
hashPrefix
<
0
)
pCfg
->
hashPrefix
=
TSDB_DEFAULT_HASH_PREFIX
;
if
(
pCfg
->
hashSuffix
<
0
)
pCfg
->
hashSuffix
=
TSDB_DEFAULT_HASH_SUFFIX
;
}
static
int32_t
mndSetCreateDbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
...
...
@@ -485,6 +493,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
.
walRollPeriod
=
pCreate
->
walRollPeriod
,
.
walSegmentSize
=
pCreate
->
walSegmentSize
,
.
sstTrigger
=
pCreate
->
sstTrigger
,
.
hashPrefix
=
pCreate
->
hashPrefix
,
.
hashSuffix
=
pCreate
->
hashSuffix
,
};
dbObj
.
cfg
.
numOfRetensions
=
pCreate
->
numOfRetensions
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
957550c4
...
...
@@ -235,6 +235,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq
.
walRollPeriod
=
pDb
->
cfg
.
walRollPeriod
;
createReq
.
walSegmentSize
=
pDb
->
cfg
.
walSegmentSize
;
createReq
.
sstTrigger
=
pDb
->
cfg
.
sstTrigger
;
createReq
.
hashPrefix
=
pDb
->
cfg
.
hashPrefix
;
createReq
.
hashSuffix
=
pDb
->
cfg
.
hashSuffix
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
createReq
.
replicas
[
v
];
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
957550c4
...
...
@@ -289,6 +289,8 @@ struct SVnodeCfg {
uint32_t
hashBegin
;
uint32_t
hashEnd
;
int16_t
sstTrigger
;
int16_t
hashPrefix
;
int16_t
hashSuffix
;
};
typedef
struct
{
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
957550c4
...
...
@@ -103,7 +103,7 @@ int metaCommit(SMeta* pMeta);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaAlterSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
,
SArray
*
tbUidList
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
,
STableMetaRsp
**
pMetaRsp
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
,
STableMetaRsp
**
pMetaRsp
);
int
metaDropTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVDropTbReq
*
pReq
,
SArray
*
tbUids
);
int
metaTtlDropTable
(
SMeta
*
pMeta
,
int64_t
ttl
,
SArray
*
tbUids
);
int
metaAlterTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVAlterTbReq
*
pReq
,
STableMetaRsp
*
pMetaRsp
);
...
...
@@ -174,7 +174,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t
tqProcessTaskDeployReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskDropReq
(
STQ
*
pTq
,
int64_t
version
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
data
,
int64_t
ver
);
int32_t
tqProcessSubmitReq
(
STQ
*
pTq
,
SSubmitReq
*
data
,
int64_t
ver
);
int32_t
tqProcessDelReq
(
STQ
*
pTq
,
void
*
pReq
,
int32_t
len
,
int64_t
ver
);
int32_t
tqProcessTaskRunReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
bool
exec
);
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
957550c4
...
...
@@ -129,10 +129,16 @@ _err:
bool
metaIsTableExist
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
// query uid.idx
metaRLock
(
pMeta
);
if
(
tdbTbGet
(
pMeta
->
pUidIdx
,
&
uid
,
sizeof
(
uid
),
NULL
,
NULL
)
<
0
)
{
metaULock
(
pMeta
);
return
false
;
}
metaULock
(
pMeta
);
return
true
;
}
...
...
@@ -182,9 +188,14 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
}
int
metaGetTableNameByUid
(
void
*
meta
,
uint64_t
uid
,
char
*
tbName
)
{
int
code
=
0
;
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
(
SMeta
*
)
meta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
code
=
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
code
<
0
)
{
metaReaderClear
(
&
mr
);
return
-
1
;
}
STR_TO_VARSTR
(
tbName
,
mr
.
me
.
name
);
metaReaderClear
(
&
mr
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
957550c4
...
...
@@ -816,7 +816,86 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
return
streamMetaAddSerializedTask
(
pTq
->
pStreamMeta
,
version
,
msg
,
msgLen
);
}
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
pReq
,
int64_t
ver
)
{
int32_t
tqProcessDelReq
(
STQ
*
pTq
,
void
*
pReq
,
int32_t
len
,
int64_t
ver
)
{
bool
failed
=
false
;
SDecoder
*
pCoder
=
&
(
SDecoder
){
0
};
SDeleteRes
*
pRes
=
&
(
SDeleteRes
){
0
};
pRes
->
uidList
=
taosArrayInit
(
0
,
sizeof
(
tb_uid_t
));
if
(
pRes
->
uidList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
failed
=
true
;
}
tDecoderInit
(
pCoder
,
pReq
,
len
);
tDecodeDeleteRes
(
pCoder
,
pRes
);
tDecoderClear
(
pCoder
);
int32_t
sz
=
taosArrayGetSize
(
pRes
->
uidList
);
if
(
sz
==
0
)
{
taosArrayDestroy
(
pRes
->
uidList
);
return
0
;
}
SSDataBlock
*
pDelBlock
=
createSpecialDataBlock
(
STREAM_DELETE_DATA
);
blockDataEnsureCapacity
(
pDelBlock
,
sz
);
pDelBlock
->
info
.
rows
=
sz
;
pDelBlock
->
info
.
version
=
ver
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
// start key column
SColumnInfoData
*
pStartCol
=
taosArrayGet
(
pDelBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
colDataAppend
(
pStartCol
,
i
,
(
const
char
*
)
&
pRes
->
skey
,
false
);
// end key column
SColumnInfoData
*
pEndCol
=
taosArrayGet
(
pDelBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
colDataAppend
(
pEndCol
,
i
,
(
const
char
*
)
&
pRes
->
ekey
,
false
);
// uid column
SColumnInfoData
*
pUidCol
=
taosArrayGet
(
pDelBlock
->
pDataBlock
,
UID_COLUMN_INDEX
);
int64_t
*
pUid
=
taosArrayGet
(
pRes
->
uidList
,
i
);
colDataAppend
(
pUidCol
,
i
,
(
const
char
*
)
pUid
,
false
);
colDataAppendNULL
(
taosArrayGet
(
pDelBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
),
i
);
colDataAppendNULL
(
taosArrayGet
(
pDelBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
),
i
);
colDataAppendNULL
(
taosArrayGet
(
pDelBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
),
i
);
}
taosArrayDestroy
(
pRes
->
uidList
);
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamMeta
->
pTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
*
(
SStreamTask
**
)
pIter
;
if
(
pTask
->
taskLevel
!=
TASK_LEVEL__SOURCE
)
continue
;
qDebug
(
"delete req enqueue stream task: %d, ver: %"
PRId64
,
pTask
->
taskId
,
ver
);
SStreamDataBlock
*
pStreamBlock
=
taosAllocateQitem
(
sizeof
(
SStreamDataBlock
),
DEF_QITEM
);
pStreamBlock
->
type
=
STREAM_INPUT__DATA_BLOCK
;
pStreamBlock
->
blocks
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
SSDataBlock
block
=
{
0
};
assignOneDataBlock
(
&
block
,
pDelBlock
);
block
.
info
.
type
=
STREAM_DELETE_DATA
;
taosArrayPush
(
pStreamBlock
->
blocks
,
&
block
);
if
(
!
failed
)
{
if
(
streamTaskInput
(
pTask
,
(
SStreamQueueItem
*
)
pStreamBlock
)
<
0
)
{
qError
(
"stream task input del failed, task id %d"
,
pTask
->
taskId
);
continue
;
}
if
(
streamSchedExec
(
pTask
)
<
0
)
{
qError
(
"stream task launch failed, task id %d"
,
pTask
->
taskId
);
continue
;
}
}
else
{
streamTaskInputFail
(
pTask
);
}
}
blockDataDestroy
(
pDelBlock
);
return
0
;
}
int32_t
tqProcessSubmitReq
(
STQ
*
pTq
,
SSubmitReq
*
pReq
,
int64_t
ver
)
{
void
*
pIter
=
NULL
;
bool
failed
=
false
;
SStreamDataSubmit
*
pSubmit
=
NULL
;
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
957550c4
...
...
@@ -213,20 +213,25 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
vnodeIsRoleLeader
(
pTq
->
pVnode
)
&&
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamMeta
->
pTasks
)
==
0
)
return
0
;
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tqError
(
"failed to copy data for stream since out of memory"
);
return
-
1
;
if
(
vnodeIsRoleLeader
(
pTq
->
pVnode
))
{
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamMeta
->
pTasks
)
==
0
)
return
0
;
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
tqError
(
"failed to copy data for stream since out of memory"
);
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
data
;
pReq
->
version
=
ver
;
tqProcessSubmitReq
(
pTq
,
data
,
ver
);
}
if
(
msgType
==
TDMT_VND_DELETE
)
{
tqProcessDelReq
(
pTq
,
POINTER_SHIFT
(
msg
,
sizeof
(
SMsgHead
)),
msgLen
-
sizeof
(
SMsgHead
),
ver
);
}
memcpy
(
data
,
msg
,
msgLen
);
SSubmitReq
*
pReq
=
(
SSubmitReq
*
)
data
;
pReq
->
version
=
ver
;
tqProcessStreamTrigger
(
pTq
,
data
,
ver
);
}
return
0
;
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
957550c4
...
...
@@ -14,6 +14,7 @@
*/
#include "vnd.h"
#include "tutil.h"
const
SVnodeCfg
vnodeCfgDefault
=
{.
vgId
=
-
1
,
.
dbname
=
""
,
...
...
@@ -110,6 +111,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if
(
tjsonAddIntegerToObject
(
pJson
,
"hashBegin"
,
pCfg
->
hashBegin
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"hashEnd"
,
pCfg
->
hashEnd
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"hashMethod"
,
pCfg
->
hashMethod
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"hashPrefix"
,
pCfg
->
hashPrefix
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"hashSuffix"
,
pCfg
->
hashSuffix
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"syncCfg.replicaNum"
,
pCfg
->
syncCfg
.
replicaNum
)
<
0
)
return
-
1
;
if
(
tjsonAddIntegerToObject
(
pJson
,
"syncCfg.myIndex"
,
pCfg
->
syncCfg
.
myIndex
)
<
0
)
return
-
1
;
...
...
@@ -214,6 +217,10 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"hashMethod"
,
pCfg
->
hashMethod
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"hashPrefix"
,
pCfg
->
hashPrefix
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"hashSuffix"
,
pCfg
->
hashSuffix
,
code
);
if
(
code
<
0
)
return
-
1
;
tjsonGetNumberValue
(
pJson
,
"syncCfg.replicaNum"
,
pCfg
->
syncCfg
.
replicaNum
,
code
);
if
(
code
<
0
)
return
-
1
;
...
...
@@ -250,7 +257,8 @@ int vnodeValidateTableHash(SVnode *pVnode, char *tableFName) {
switch
(
pVnode
->
config
.
hashMethod
)
{
default:
hashValue
=
MurmurHash3_32
(
tableFName
,
strlen
(
tableFName
));
hashValue
=
taosGetTbHashVal
(
tableFName
,
strlen
(
tableFName
),
pVnode
->
config
.
hashMethod
,
pVnode
->
config
.
hashPrefix
,
pVnode
->
config
.
hashSuffix
);
break
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
957550c4
...
...
@@ -371,7 +371,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
if
(
NULL
==
pMetaRsp
)
{
return
;
}
strcpy
(
pMetaRsp
->
dbFName
,
pVnode
->
config
.
dbname
);
pMetaRsp
->
dbId
=
pVnode
->
config
.
dbId
;
pMetaRsp
->
vgId
=
TD_VID
(
pVnode
);
...
...
@@ -527,7 +527,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
cRsp
.
code
=
TSDB_CODE_SUCCESS
;
tdFetchTbUidList
(
pVnode
->
pSma
,
&
pStore
,
pCreateReq
->
ctb
.
suid
,
pCreateReq
->
uid
);
taosArrayPush
(
tbUids
,
&
pCreateReq
->
uid
);
vnodeUpdateMetaRsp
(
pVnode
,
cRsp
.
pMeta
);
vnodeUpdateMetaRsp
(
pVnode
,
cRsp
.
pMeta
);
}
taosArrayPush
(
rsp
.
pArray
,
&
cRsp
);
...
...
@@ -1107,6 +1107,7 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq
tDecoderInit
(
pCoder
,
pReq
,
len
);
tDecodeDeleteRes
(
pCoder
,
pRes
);
ASSERT
(
taosArrayGetSize
(
pRes
->
uidList
)
==
0
||
(
pRes
->
skey
!=
0
&&
pRes
->
ekey
!=
0
));
for
(
int32_t
iUid
=
0
;
iUid
<
taosArrayGetSize
(
pRes
->
uidList
);
iUid
++
)
{
code
=
tsdbDeleteTableData
(
pVnode
->
pTsdb
,
version
,
pRes
->
suid
,
*
(
uint64_t
*
)
taosArrayGet
(
pRes
->
uidList
,
iUid
),
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
957550c4
此差异已折叠。
点击以展开。
source/libs/executor/src/executor.c
浏览文件 @
957550c4
...
...
@@ -52,7 +52,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
pInfo
->
validBlockIndex
=
0
;
taosArrayClear
(
pInfo
->
pBlockLists
);
if
(
pInfo
->
blockType
==
STREAM_INPUT__DATA_BLOCK
)
{
taosArrayClearP
(
pInfo
->
pBlockLists
,
taosMemoryFree
);
}
else
{
taosArrayClear
(
pInfo
->
pBlockLists
);
}
if
(
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
// ASSERT(numOfBlocks > 1);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
957550c4
...
...
@@ -956,9 +956,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
pTableScanInfo
->
currentGroupId
=
-
1
;
}
static
void
freeArray
(
void
*
array
)
{
taosArrayDestroy
(
array
);
}
static
void
freeArray
(
void
*
array
)
{
taosArrayDestroy
(
array
);
}
static
void
resetTableScanOperator
(
SOperatorInfo
*
pTableScanOp
)
{
STableScanInfo
*
pTableScanInfo
=
pTableScanOp
->
info
;
...
...
@@ -972,15 +970,16 @@ static void resetTableScanOperator(SOperatorInfo* pTableScanOp) {
resetTableScanInfo
(
pTableScanOp
->
info
,
&
win
);
}
static
SSDataBlock
*
readPreVersionData
(
SOperatorInfo
*
pTableScanOp
,
uint64_t
tbUid
,
TSKEY
startTs
,
TSKEY
endTs
,
int64_t
maxVersion
)
{
static
SSDataBlock
*
readPreVersionData
(
SOperatorInfo
*
pTableScanOp
,
uint64_t
tbUid
,
TSKEY
startTs
,
TSKEY
endTs
,
int64_t
maxVersion
)
{
SArray
*
gpTbls
=
pTableScanOp
->
pTaskInfo
->
tableqinfoList
.
pGroupList
;
taosArrayClear
(
gpTbls
);
STableKeyInfo
tblInfo
=
{.
uid
=
tbUid
,
.
groupId
=
0
};
SArray
*
tbls
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
SArray
*
tbls
=
taosArrayInit
(
1
,
sizeof
(
STableKeyInfo
));
taosArrayPush
(
tbls
,
&
tblInfo
);
taosArrayPush
(
gpTbls
,
&
tbls
);
STimeWindow
win
=
{.
skey
=
startTs
,
.
ekey
=
endTs
};
STimeWindow
win
=
{.
skey
=
startTs
,
.
ekey
=
endTs
};
STableScanInfo
*
pTableScanInfo
=
pTableScanOp
->
info
;
pTableScanInfo
->
cond
.
startVersion
=
-
1
;
pTableScanInfo
->
cond
.
endVersion
=
maxVersion
;
...
...
@@ -1069,8 +1068,8 @@ static STimeWindow getSlidingWindow(TSKEY* tsCol, SInterval* pInterval, SDataBlo
if
(
hasGroup
)
{
(
*
pRowIndex
)
+=
1
;
}
else
{
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
pDataBlockInfo
,
tsCol
,
*
pRowIndex
,
endWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
(
*
pRowIndex
)
+=
getNumOfRowsInTimeWindow
(
pDataBlockInfo
,
tsCol
,
*
pRowIndex
,
endWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
}
do
{
preWin
=
endWin
;
...
...
@@ -1111,8 +1110,8 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
for
(
int32_t
j
=
0
;
j
<
pInfo
->
pTableScanOp
->
exprSupp
.
numOfExprs
;
j
++
)
{
SColumnInfoData
*
pSrcCol
=
taosArrayGet
(
tmpBlock
->
pDataBlock
,
j
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pResult
->
pDataBlock
,
j
);
bool
isNull
=
colDataIsNull
(
pSrcCol
,
tmpBlock
->
info
.
rows
,
i
,
NULL
);
char
*
pSrcData
=
colDataGetData
(
pSrcCol
,
i
);
bool
isNull
=
colDataIsNull
(
pSrcCol
,
tmpBlock
->
info
.
rows
,
i
,
NULL
);
char
*
pSrcData
=
colDataGetData
(
pSrcCol
,
i
);
colDataAppend
(
pDestCol
,
pResult
->
info
.
rows
,
pSrcData
,
isNull
);
}
pResult
->
info
.
rows
++
;
...
...
@@ -1153,7 +1152,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData
*
pDestCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pDestCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int32_t
dummy
=
0
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSrcBlock
->
info
.
rows
;
i
++
)
{
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
uidCol
[
i
],
startData
[
i
],
version
);
// gap must be 0.
...
...
@@ -1202,15 +1201,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData
*
pGpCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
GROUPID_COLUMN_INDEX
);
SColumnInfoData
*
pCalStartTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_START_TS_COLUMN_INDEX
);
SColumnInfoData
*
pCalEndTsCol
=
taosArrayGet
(
pDestBlock
->
pDataBlock
,
CALCULATE_END_TS_COLUMN_INDEX
);
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
int64_t
version
=
pSrcBlock
->
info
.
version
-
1
;
for
(
int32_t
i
=
0
;
i
<
rows
;)
{
uint64_t
srcUid
=
srcUidData
[
i
];
uint64_t
groupId
=
getGroupIdByData
(
pInfo
,
srcUid
,
tsCol
[
i
],
version
);
uint64_t
srcGpId
=
srcGp
[
i
];
TSKEY
calStartTs
=
tsCol
[
i
];
TSKEY
calStartTs
=
tsCol
[
i
];
colDataAppend
(
pCalStartTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
calStartTs
),
false
);
STimeWindow
win
=
getSlidingWindow
(
tsCol
,
&
pInfo
->
interval
,
&
pSrcBlock
->
info
,
&
i
,
pInfo
->
partitionSup
.
needCalc
);
TSKEY
calEndTs
=
tsCol
[
i
-
1
];
TSKEY
calEndTs
=
tsCol
[
i
-
1
];
colDataAppend
(
pCalEndTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
calEndTs
),
false
);
colDataAppend
(
pDeUidCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
srcUid
),
false
);
colDataAppend
(
pStartTsCol
,
pDestBlock
->
info
.
rows
,
(
const
char
*
)(
&
win
.
skey
),
false
);
...
...
@@ -1278,10 +1277,10 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool
closedWin
=
isClosed
&&
isSignleIntervalWindow
(
pInfo
)
&&
isDeletedWindow
(
&
win
,
pBlock
->
info
.
groupId
,
pInfo
->
windowSup
.
pIntervalAggSup
);
if
((
update
||
closedWin
)
&&
out
)
{
uint64_t
gpId
=
closedWin
&&
pInfo
->
partitionSup
.
needCalc
?
calGroupIdByData
(
&
pInfo
->
partitionSup
,
pInfo
->
pPartScalarSup
,
pBlock
,
rowId
)
:
0
;
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
&
pBlock
->
info
.
uid
,
&
gpId
);
uint64_t
gpId
=
closedWin
&&
pInfo
->
partitionSup
.
needCalc
?
calGroupIdByData
(
&
pInfo
->
partitionSup
,
pInfo
->
pPartScalarSup
,
pBlock
,
rowId
)
:
0
;
appendOneRow
(
pInfo
->
pUpdateDataRes
,
tsCol
+
rowId
,
tsCol
+
rowId
,
&
pBlock
->
info
.
uid
,
&
gpId
);
}
}
if
(
out
&&
pInfo
->
pUpdateDataRes
->
info
.
rows
>
0
)
{
...
...
@@ -1516,6 +1515,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange
(
pInfo
,
pBlock
,
pInfo
->
pUpdateRes
);
prepareRangeScan
(
pInfo
,
pInfo
->
pUpdateRes
,
&
pInfo
->
updateResIndex
);
copyDataBlock
(
pInfo
->
pDeleteDataRes
,
pInfo
->
pUpdateRes
);
pInfo
->
pDeleteDataRes
->
info
.
type
=
STREAM_DELETE_DATA
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_DATAREADER_RANGE
;
return
pInfo
->
pDeleteDataRes
;
}
break
;
...
...
@@ -1916,7 +1916,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
ASSERT
(
pHandle
->
tqReader
);
pInfo
->
tqReader
=
pHandle
->
tqReader
;
}
pInfo
->
pUpdateInfo
=
NULL
;
pInfo
->
pTableScanOp
=
pTableScanOp
;
pInfo
->
interval
=
pTSInfo
->
pdInfo
.
interval
;
...
...
@@ -1948,8 +1948,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
pUpdateRes
=
createSpecialDataBlock
(
STREAM_CLEAR
);
pInfo
->
pCondition
=
pScanPhyNode
->
node
.
pConditions
;
pInfo
->
scanMode
=
STREAM_SCAN_FROM_READERHANDLE
;
pInfo
->
windowSup
=
(
SWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
,
.
parentType
=
QUERY_NODE_PHYSICAL_PLAN
};
pInfo
->
windowSup
=
(
SWindowSupporter
){.
pStreamAggSup
=
NULL
,
.
gap
=
-
1
,
.
parentType
=
QUERY_NODE_PHYSICAL_PLAN
};
pInfo
->
groupId
=
0
;
pInfo
->
pPullDataRes
=
createSpecialDataBlock
(
STREAM_RETRIEVE
);
pInfo
->
pStreamScanOp
=
pOperator
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
957550c4
此差异已折叠。
点击以展开。
source/libs/function/src/builtinsimpl.c
浏览文件 @
957550c4
...
...
@@ -76,11 +76,11 @@ typedef struct STopBotResItem {
}
STopBotResItem
;
typedef
struct
STopBotRes
{
int32_t
maxSize
;
int16_t
type
;
int32_t
maxSize
;
int16_t
type
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
STopBotResItem
*
pItems
;
}
STopBotRes
;
...
...
@@ -223,14 +223,14 @@ typedef struct SMavgInfo {
}
SMavgInfo
;
typedef
struct
SSampleInfo
{
int32_t
samples
;
int32_t
totalPoints
;
int32_t
numSampled
;
uint8_t
colType
;
int16_t
colBytes
;
int32_t
samples
;
int32_t
totalPoints
;
int32_t
numSampled
;
uint8_t
colType
;
int16_t
colBytes
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
STuplePos
nullTuplePos
;
bool
nullTupleSaved
;
char
*
data
;
STuplePos
*
tuplePos
;
...
...
@@ -1147,7 +1147,7 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
}
static
STuplePos
saveTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
);
static
int32_t
updateTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STuplePos
*
pPos
);
static
int32_t
updateTupleData
(
SqlFunctionCtx
*
pCtx
,
int32_t
rowIndex
,
const
SSDataBlock
*
pSrcBlock
,
STuplePos
*
pPos
);
static
const
char
*
loadTupleData
(
SqlFunctionCtx
*
pCtx
,
const
STuplePos
*
pPos
);
static
int32_t
findRowIndex
(
int32_t
start
,
int32_t
num
,
SColumnInfoData
*
pCol
,
const
char
*
tval
)
{
...
...
@@ -1357,8 +1357,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
numOfElems
+=
1
;
}
}
else
if
(
type
==
TSDB_DATA_TYPE_BIGINT
||
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
}
else
if
(
type
==
TSDB_DATA_TYPE_BIGINT
||
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
int64_t
*
pData
=
(
int64_t
*
)
pCol
->
pData
;
int64_t
*
val
=
(
int64_t
*
)
&
pBuf
->
v
;
...
...
@@ -1581,7 +1580,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
_min_max_over:
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pBuf
->
nullTupleSaved
)
{
if
(
numOfElems
==
0
&&
pCtx
->
subsidiaries
.
num
>
0
&&
!
pBuf
->
nullTupleSaved
)
{
pBuf
->
nullTuplePos
=
saveTupleData
(
pCtx
,
pInput
->
startRowIndex
,
pCtx
->
pSrcBlock
);
pBuf
->
nullTupleSaved
=
true
;
}
...
...
@@ -1601,7 +1600,8 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
}
static
void
setNullSelectivityValue
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
rowIndex
);
static
void
setSelectivityValue
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
STuplePos
*
pTuplePos
,
int32_t
rowIndex
);
static
void
setSelectivityValue
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
STuplePos
*
pTuplePos
,
int32_t
rowIndex
);
int32_t
minmaxFunctionFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
)
{
SResultRowEntryInfo
*
pEntryInfo
=
GET_RES_INFO
(
pCtx
);
...
...
@@ -1651,7 +1651,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
if
(
pCtx
->
saveHandle
.
pBuf
!=
NULL
)
{
if
(
pTuplePos
->
pageId
!=
-
1
)
{
int32_t
numOfCols
=
pCtx
->
subsidiaries
.
num
;
int32_t
numOfCols
=
pCtx
->
subsidiaries
.
num
;
const
char
*
p
=
loadTupleData
(
pCtx
,
pTuplePos
);
bool
*
nullList
=
(
bool
*
)
p
;
...
...
@@ -1660,7 +1660,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
// todo set the offset value to optimize the performance.
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
SqlFunctionCtx
*
pc
=
pCtx
->
subsidiaries
.
pCtx
[
j
];
int32_t
dstSlotId
=
pc
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
dstSlotId
=
pc
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pDstCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
dstSlotId
);
ASSERT
(
pc
->
pExpr
->
base
.
resSchema
.
bytes
==
pDstCol
->
info
.
bytes
);
...
...
@@ -1701,7 +1701,7 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
char
*
pData
=
colDataGetData
(
pSrcCol
,
rowIndex
);
// append to dest col
int32_t
dstSlotId
=
pc
->
pExpr
->
base
.
resSchema
.
slotId
;
int32_t
dstSlotId
=
pc
->
pExpr
->
base
.
resSchema
.
slotId
;
SColumnInfoData
*
pDstCol
=
taosArrayGet
(
pCtx
->
pDstBlock
->
pDataBlock
,
dstSlotId
);
ASSERT
(
pc
->
pExpr
->
base
.
resSchema
.
bytes
==
pDstCol
->
info
.
bytes
);
...
...
@@ -1712,7 +1712,6 @@ void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t pos)
colDataAppend
(
pDstCol
,
pos
,
pData
,
false
);
}
}
}
void
replaceTupleData
(
STuplePos
*
pDestPos
,
STuplePos
*
pSourcePos
)
{
...
...
@@ -2590,8 +2589,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
memcpy
(
pHisto
,
pInput
->
pHisto
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
(
MAX_HISTOGRAM_BIN
+
1
));
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
qDebug
(
"%s merge histo, total:%"
PRId64
", entry:%d, %p"
,
__FUNCTION__
,
pHisto
->
numOfElems
,
pHisto
->
numOfEntries
,
pHisto
);
qDebug
(
"%s merge histo, total:%"
PRId64
", entry:%d, %p"
,
__FUNCTION__
,
pHisto
->
numOfElems
,
pHisto
->
numOfEntries
,
pHisto
);
}
else
{
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
qDebug
(
"%s input histogram, elem:%"
PRId64
", entry:%d, %p"
,
__FUNCTION__
,
pHisto
->
numOfElems
,
...
...
@@ -2601,8 +2600,8 @@ static void apercentileTransferInfo(SAPercentileInfo* pInput, SAPercentileInfo*
memcpy
(
pHisto
,
pRes
,
sizeof
(
SHistogramInfo
)
+
sizeof
(
SHistBin
)
*
MAX_HISTOGRAM_BIN
);
pHisto
->
elems
=
(
SHistBin
*
)((
char
*
)
pHisto
+
sizeof
(
SHistogramInfo
));
qDebug
(
"%s merge histo, total:%"
PRId64
", entry:%d, %p"
,
__FUNCTION__
,
pHisto
->
numOfElems
,
pHisto
->
numOfEntries
,
pHisto
);
qDebug
(
"%s merge histo, total:%"
PRId64
", entry:%d, %p"
,
__FUNCTION__
,
pHisto
->
numOfElems
,
pHisto
->
numOfEntries
,
pHisto
);
tHistogramDestroy
(
&
pRes
);
}
}
...
...
@@ -2629,8 +2628,8 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
}
if
(
pInfo
->
algo
!=
APERCT_ALGO_TDIGEST
)
{
qDebug
(
"%s after merge, total:%d, numOfEntry:%d, %p"
,
__FUNCTION__
,
pInfo
->
pHisto
->
numOfElems
,
pInfo
->
pHisto
->
numOfEntries
,
pInfo
->
pHisto
);
qDebug
(
"%s after merge, total:%d, numOfEntry:%d, %p"
,
__FUNCTION__
,
pInfo
->
pHisto
->
numOfElems
,
pInfo
->
pHisto
->
numOfEntries
,
pInfo
->
pHisto
);
}
SET_VAL
(
pResInfo
,
1
,
1
);
...
...
@@ -2709,7 +2708,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
}
EFuncDataRequired
lastDynDataReq
(
void
*
pRes
,
STimeWindow
*
pTimeWindow
)
{
SResultRowEntryInfo
*
pEntry
=
(
SResultRowEntryInfo
*
)
pRes
;
SResultRowEntryInfo
*
pEntry
=
(
SResultRowEntryInfo
*
)
pRes
;
// not initialized yet, data is required
if
(
pEntry
==
NULL
)
{
...
...
@@ -2752,7 +2751,8 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
return
*
(
TSKEY
*
)
colDataGetData
(
pTsColInfo
,
rowIndex
);
}
static
void
firstlastSaveTupleData
(
const
SSDataBlock
*
pSrcBlock
,
int32_t
rowIndex
,
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInfo
)
{
static
void
firstlastSaveTupleData
(
const
SSDataBlock
*
pSrcBlock
,
int32_t
rowIndex
,
SqlFunctionCtx
*
pCtx
,
SFirstLastRes
*
pInfo
)
{
if
(
pCtx
->
subsidiaries
.
num
<=
0
)
{
return
;
}
...
...
@@ -3176,7 +3176,7 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
static
void
doSetPrevVal
(
SDiffInfo
*
pDiffInfo
,
int32_t
type
,
const
char
*
pv
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_BOOL
:
pDiffInfo
->
prev
.
i64
=
*
(
bool
*
)
pv
?
1
:
0
;
pDiffInfo
->
prev
.
i64
=
*
(
bool
*
)
pv
?
1
:
0
;
break
;
case
TSDB_DATA_TYPE_TINYINT
:
pDiffInfo
->
prev
.
i64
=
*
(
int8_t
*
)
pv
;
...
...
@@ -3537,7 +3537,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
* |(n columns, one bit for each column)| src column #1| src column #2|
* +------------------------------------+--------------+--------------+
*/
void
*
serializeTupleData
(
const
SSDataBlock
*
pSrcBlock
,
int32_t
rowIndex
,
SSubsidiaryResInfo
*
pSubsidiaryies
,
char
*
buf
)
{
void
*
serializeTupleData
(
const
SSDataBlock
*
pSrcBlock
,
int32_t
rowIndex
,
SSubsidiaryResInfo
*
pSubsidiaryies
,
char
*
buf
)
{
char
*
nullList
=
buf
;
char
*
pStart
=
(
char
*
)(
nullList
+
sizeof
(
bool
)
*
pSubsidiaryies
->
num
);
...
...
@@ -3585,7 +3586,7 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
}
}
p
=
(
STuplePos
)
{.
pageId
=
pHandle
->
currentPage
,
.
offset
=
pPage
->
num
};
p
=
(
STuplePos
){.
pageId
=
pHandle
->
currentPage
,
.
offset
=
pPage
->
num
};
memcpy
(
pPage
->
data
+
pPage
->
num
,
pBuf
,
length
);
pPage
->
num
+=
length
;
...
...
@@ -3621,7 +3622,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
setBufPageDirty
(
pPage
,
true
);
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
}
else
{
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3636,7 +3636,7 @@ static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSD
static
char
*
doLoadTupleData
(
SSerializeDataHandle
*
pHandle
,
const
STuplePos
*
pPos
)
{
if
(
pHandle
->
pBuf
!=
NULL
)
{
SFilePage
*
pPage
=
getBufPage
(
pHandle
->
pBuf
,
pPos
->
pageId
);
char
*
p
=
pPage
->
data
+
pPos
->
offset
;
char
*
p
=
pPage
->
data
+
pPos
->
offset
;
releaseBufPage
(
pHandle
->
pBuf
,
pPage
);
return
p
;
}
else
{
...
...
@@ -3980,8 +3980,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
}
if
(
pCtx
->
end
.
key
==
INT64_MIN
)
{
pInfo
->
min
=
(
pInfo
->
min
>
ptsList
[
start
+
pInput
->
numOfRows
-
1
])
?
ptsList
[
start
+
pInput
->
numOfRows
-
1
]
:
pInfo
->
min
;
pInfo
->
min
=
(
pInfo
->
min
>
ptsList
[
start
+
pInput
->
numOfRows
-
1
])
?
ptsList
[
start
+
pInput
->
numOfRows
-
1
]
:
pInfo
->
min
;
}
else
{
pInfo
->
min
=
pCtx
->
end
.
key
;
}
...
...
@@ -3993,8 +3993,8 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) {
}
if
(
pCtx
->
end
.
key
==
INT64_MIN
)
{
pInfo
->
max
=
(
pInfo
->
max
<
ptsList
[
start
+
pInput
->
numOfRows
-
1
])
?
ptsList
[
start
+
pInput
->
numOfRows
-
1
]
:
pInfo
->
max
;
pInfo
->
max
=
(
pInfo
->
max
<
ptsList
[
start
+
pInput
->
numOfRows
-
1
])
?
ptsList
[
start
+
pInput
->
numOfRows
-
1
]
:
pInfo
->
max
;
}
else
{
pInfo
->
max
=
pCtx
->
end
.
key
+
1
;
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
957550c4
...
...
@@ -243,6 +243,36 @@ FAIL:
return
0
;
}
int32_t
streamSearchAndAddBlock
(
SStreamTask
*
pTask
,
SStreamDispatchReq
*
pReqs
,
SSDataBlock
*
pDataBlock
,
int32_t
vgSz
,
int64_t
groupId
)
{
char
*
ctbName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
groupId
);
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
// TODO: get hash function by hashMethod
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
taosMemoryFree
(
ctbName
);
bool
found
=
false
;
// TODO: optimize search
int32_t
j
;
for
(
j
=
0
;
j
<
vgSz
;
j
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
j
);
ASSERT
(
pVgInfo
->
vgId
>
0
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
if
(
streamAddBlockToDispatchMsg
(
pDataBlock
,
&
pReqs
[
j
])
<
0
)
{
return
-
1
;
}
if
(
pReqs
[
j
].
blockNum
==
0
)
{
atomic_add_fetch_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
,
1
);
}
pReqs
[
j
].
blockNum
++
;
found
=
true
;
break
;
}
}
ASSERT
(
found
);
return
0
;
}
int32_t
streamDispatchAllBlocks
(
SStreamTask
*
pTask
,
const
SStreamDataBlock
*
pData
)
{
int32_t
code
=
-
1
;
int32_t
blockNum
=
taosArrayGetSize
(
pData
->
blocks
);
...
...
@@ -317,20 +347,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pData
->
blocks
,
i
);
char
*
ctbName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
pDataBlock
->
info
.
groupId
);
// TODO: get hash function by hashMethod
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
taosMemoryFree
(
ctbName
);
bool
found
=
false
;
// TODO: optimize search
int32_t
j
;
for
(
j
=
0
;
j
<
vgSz
;
j
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
j
);
ASSERT
(
pVgInfo
->
vgId
>
0
);
if
(
hashValue
>=
pVgInfo
->
hashBegin
&&
hashValue
<=
pVgInfo
->
hashEnd
)
{
// TODO: do not use broadcast
if
(
pDataBlock
->
info
.
type
==
STREAM_DELETE_RESULT
)
{
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
if
(
streamAddBlockToDispatchMsg
(
pDataBlock
,
&
pReqs
[
j
])
<
0
)
{
goto
FAIL_SHUFFLE_DISPATCH
;
}
...
...
@@ -338,11 +358,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
atomic_add_fetch_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
,
1
);
}
pReqs
[
j
].
blockNum
++
;
found
=
true
;
break
;
}
continue
;
}
if
(
streamSearchAndAddBlock
(
pTask
,
pReqs
,
pDataBlock
,
vgSz
,
pDataBlock
->
info
.
groupId
)
<
0
)
{
goto
FAIL_SHUFFLE_DISPATCH
;
}
ASSERT
(
found
);
}
for
(
int32_t
i
=
0
;
i
<
vgSz
;
i
++
)
{
...
...
tests/script/tsim/valgrind/checkError6.sim
浏览文件 @
957550c4
...
...
@@ -158,6 +158,8 @@ print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start -v
sleep 1000
sql select avg(tbcol) as c from stb
sql select avg(tbcol) as c from stb where ts <= 1601481840000
sql select avg(tbcol) as c from stb where tgcol < 5 and ts <= 1601481840000
...
...
utils/CMakeLists.txt
0 → 100644
浏览文件 @
957550c4
#ADD_SUBDIRECTORY(examples/c)
ADD_SUBDIRECTORY
(
tsim
)
ADD_SUBDIRECTORY
(
test/c
)
#ADD_SUBDIRECTORY(comparisonTest/tdengine)
test
s/test/c/CMakeLists.txt
→
util
s/test/c/CMakeLists.txt
浏览文件 @
957550c4
add_executable
(
tmq_demo tmqDemo.c
)
add_dependencies
(
tmq_demo taos
)
add_executable
(
tmq_sim tmqSim.c
)
add_executable
(
create_table createTable.c
)
add_executable
(
tmq_taosx_ci tmq_taosx_ci.c
)
...
...
test
s/test/c/createTable.c
→
util
s/test/c/createTable.c
浏览文件 @
957550c4
文件已移动
test
s/test/c/sdbDump.c
→
util
s/test/c/sdbDump.c
浏览文件 @
957550c4
文件已移动
test
s/test/c/sml_test.c
→
util
s/test/c/sml_test.c
浏览文件 @
957550c4
文件已移动
test
s/test/c/tmqDemo.c
→
util
s/test/c/tmqDemo.c
浏览文件 @
957550c4
文件已移动
test
s/test/c/tmqSim.c
→
util
s/test/c/tmqSim.c
浏览文件 @
957550c4
文件已移动
test
s/test/c/tmq_taosx_ci.c
→
util
s/test/c/tmq_taosx_ci.c
浏览文件 @
957550c4
文件已移动
test
s/tsim/CMakeLists.txt
→
util
s/tsim/CMakeLists.txt
浏览文件 @
957550c4
文件已移动
test
s/tsim/inc/simInt.h
→
util
s/tsim/inc/simInt.h
浏览文件 @
957550c4
文件已移动
test
s/tsim/inc/simParse.h
→
util
s/tsim/inc/simParse.h
浏览文件 @
957550c4
文件已移动
test
s/tsim/src/simExe.c
→
util
s/tsim/src/simExe.c
浏览文件 @
957550c4
文件已移动
test
s/tsim/src/simMain.c
→
util
s/tsim/src/simMain.c
浏览文件 @
957550c4
文件已移动
test
s/tsim/src/simParse.c
→
util
s/tsim/src/simParse.c
浏览文件 @
957550c4
文件已移动
test
s/tsim/src/simSystem.c
→
util
s/tsim/src/simSystem.c
浏览文件 @
957550c4
文件已移动
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录