Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d66fdb94
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d66fdb94
编写于
4月 28, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy
上级
7f218009
91780376
变更
43
显示空白变更内容
内联
并排
Showing
43 changed file
with
342 addition
and
198 deletion
+342
-198
contrib/CMakeLists.txt
contrib/CMakeLists.txt
+2
-2
include/client/taos.h
include/client/taos.h
+5
-4
include/common/tmsg.h
include/common/tmsg.h
+7
-3
include/common/trow.h
include/common/trow.h
+18
-3
include/os/os.h
include/os/os.h
+1
-0
include/os/osMath.h
include/os/osMath.h
+13
-19
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tencode.h
include/util/tencode.h
+39
-18
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+0
-2
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+12
-5
source/client/src/taos.def
source/client/src/taos.def
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+7
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+46
-9
source/common/src/trow.c
source/common/src/trow.c
+1
-1
source/common/src/ttypes.c
source/common/src/ttypes.c
+6
-6
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-1
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+2
-2
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+6
-2
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+6
-14
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+10
-1
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+6
-2
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+7
-9
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+20
-27
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+11
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+20
-12
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+1
-2
source/libs/parser/src/parInsertData.c
source/libs/parser/src/parInsertData.c
+3
-2
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+10
-9
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+3
-3
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+1
-1
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+1
-1
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+2
-3
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+12
-2
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+4
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+0
-1
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+12
-8
source/os/src/osSemaphore.c
source/os/src/osSemaphore.c
+6
-0
source/util/src/terror.c
source/util/src/terror.c
+1
-0
source/util/test/encodeTest.cpp
source/util/test/encodeTest.cpp
+4
-4
tests/system-test/0-others/taosShell.py
tests/system-test/0-others/taosShell.py
+28
-10
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-0
tools/shell/inc/shellInt.h
tools/shell/inc/shellInt.h
+0
-1
未找到文件。
contrib/CMakeLists.txt
浏览文件 @
d66fdb94
...
...
@@ -220,7 +220,7 @@ endif(${BUILD_WITH_NURAFT})
if
(
${
BUILD_PTHREAD
}
)
set
(
CMAKE_BUILD_TYPE release
)
add_definitions
(
-DPTW32_STATIC_LIB
)
add_subdirectory
(
pthread
EXCLUDE_FROM_ALL
)
add_subdirectory
(
pthread
)
set_target_properties
(
libpthreadVC3 PROPERTIES OUTPUT_NAME pthread
)
add_library
(
pthread STATIC IMPORTED GLOBAL
)
SET_PROPERTY
(
TARGET pthread PROPERTY IMPORTED_LOCATION
${
LIBRARY_OUTPUT_PATH
}
/pthread.lib
)
...
...
include/client/taos.h
浏览文件 @
d66fdb94
...
...
@@ -121,6 +121,7 @@ typedef struct setConfRet {
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_l
(
const
char
*
ip
,
int
ipLen
,
const
char
*
user
,
int
userLen
,
const
char
*
pass
,
int
passLen
,
const
char
*
db
,
int
dbLen
,
uint16_t
port
);
...
...
include/common/tmsg.h
浏览文件 @
d66fdb94
...
...
@@ -1521,8 +1521,8 @@ typedef struct {
char
*
qmsg2
;
// pAst2:qmsg2:SRetention2 => trigger aggr task2
}
SRSmaParam
;
int
tEncodeSRSmaParam
(
SCoder
*
pCoder
,
const
SRSmaParam
*
pRSmaParam
);
int
tDecodeSRSmaParam
(
SCoder
*
pCoder
,
SRSmaParam
*
pRSmaParam
);
int
32_t
tEncodeSRSmaParam
(
SCoder
*
pCoder
,
const
SRSmaParam
*
pRSmaParam
);
int
32_t
tDecodeSRSmaParam
(
SCoder
*
pCoder
,
SRSmaParam
*
pRSmaParam
);
typedef
struct
SVCreateStbReq
{
const
char
*
name
;
...
...
@@ -1538,6 +1538,10 @@ int tDecodeSVCreateStbReq(SCoder* pCoder, SVCreateStbReq* pReq);
typedef
struct
SVDropStbReq
{
// data
#ifdef WINDOWS
size_t
avoidCompilationErrors
;
#endif
}
SVDropStbReq
;
typedef
struct
SVCreateStbRsp
{
...
...
@@ -2117,7 +2121,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
nCols
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pSW
->
sver
)
<
0
)
return
-
1
;
pSW
->
pSchema
=
(
SSchema
*
)
TCODER_MALLOC
(
pDecoder
,
sizeof
(
SSchema
)
*
pSW
->
nCols
);
pSW
->
pSchema
=
(
SSchema
*
)
tCoderMalloc
(
pDecoder
,
sizeof
(
SSchema
)
*
pSW
->
nCols
);
if
(
pSW
->
pSchema
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
if
(
tDecodeSSchema
(
pDecoder
,
&
pSW
->
pSchema
[
i
])
<
0
)
return
-
1
;
...
...
include/common/trow.h
浏览文件 @
d66fdb94
...
...
@@ -214,6 +214,14 @@ STSRow *tdRowDup(STSRow *row);
static
FORCE_INLINE
SKvRowIdx
*
tdKvRowColIdxAt
(
STSRow
*
pRow
,
col_id_t
idx
)
{
return
(
SKvRowIdx
*
)
TD_ROW_COL_IDX
(
pRow
)
+
idx
;
}
static
FORCE_INLINE
int16_t
tdKvRowColIdAt
(
STSRow
*
pRow
,
col_id_t
idx
)
{
ASSERT
(
idx
>=
0
);
if
(
idx
==
0
)
{
return
PRIMARYKEY_TIMESTAMP_COL_ID
;
}
return
((
SKvRowIdx
*
)
TD_ROW_COL_IDX
(
pRow
)
+
idx
-
1
)
->
colId
;
}
static
FORCE_INLINE
void
*
tdKVRowColVal
(
STSRow
*
pRow
,
SKvRowIdx
*
pIdx
)
{
return
POINTER_SHIFT
(
pRow
,
pIdx
->
offset
);
}
#define TD_ROW_OFFSET(p) ((p)->toffset); // During ParseInsert when without STSchema, how to get the offset for STpRow?
...
...
@@ -668,7 +676,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
case
TD_ROW_KV
:
#ifdef TD_SUPPORT_BITMAP
pBuilder
->
pBitmap
=
tdGetBitmapAddrKv
(
pBuilder
->
pBuf
,
pBuilder
->
nBoundCols
);
memset
(
pBuilder
->
pBitmap
,
TD_VTYPE_NONE_BYTE_II
,
pBuilder
->
nBitmaps
);
memset
(
pBuilder
->
pBitmap
,
TD_VTYPE_NONE_BYTE_II
,
pBuilder
->
nB
oundB
itmaps
);
#endif
len
=
TD_ROW_HEAD_LEN
+
TD_ROW_NCOLS_LEN
+
(
pBuilder
->
nBoundCols
-
1
)
*
sizeof
(
SKvRowIdx
)
+
pBuilder
->
nBoundBitmaps
;
// add
...
...
@@ -1092,7 +1100,7 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
STSRow
*
pRow
=
pIter
->
pRow
;
SKvRowIdx
*
pKvIdx
=
NULL
;
bool
colFound
=
false
;
col_id_t
kvNCols
=
tdRowGetNCols
(
pRow
);
col_id_t
kvNCols
=
tdRowGetNCols
(
pRow
)
-
1
;
while
(
*
nIdx
<
kvNCols
)
{
pKvIdx
=
(
SKvRowIdx
*
)
POINTER_SHIFT
(
TD_ROW_COL_IDX
(
pRow
),
*
nIdx
*
sizeof
(
SKvRowIdx
));
if
(
pKvIdx
->
colId
==
colId
)
{
...
...
@@ -1108,7 +1116,14 @@ static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId,
}
}
if
(
!
colFound
)
return
false
;
if
(
!
colFound
)
{
if
(
colId
<=
pIter
->
maxColId
)
{
pVal
->
valType
=
TD_VTYPE_NONE
;
return
true
;
}
else
{
return
false
;
}
}
#ifdef TD_SUPPORT_BITMAP
int16_t
colIdx
=
-
1
;
...
...
include/os/os.h
浏览文件 @
d66fdb94
...
...
@@ -59,6 +59,7 @@ extern "C" {
#include <winsock.h>
#endif
#define __typeof(a) auto
#endif
...
...
include/os/osMath.h
浏览文件 @
d66fdb94
...
...
@@ -23,27 +23,21 @@ extern "C" {
#define TPOW2(x) ((x) * (x))
#define TABS(x) ((x) > 0 ? (x) : -(x))
#ifdef WINDOWS
#define TSWAP(a, b, c) \
#define TSWAP(a, b) \
do { \
c __tmp = (c)
(a); \
(a) = (c)(b);
\
__typeof(a) __tmp =
(a); \
(a) = (b);
\
(b) = __tmp; \
} while (0)
#ifdef WINDOWS
#define TMAX(a, b) (((a) > (b)) ? (a) : (b))
#define TMIN(a, b) (((a) < (b)) ? (a) : (b))
#define TRANGE(aa, bb, cc) ((aa) = TMAX((aa), (bb)),(aa) = TMIN((aa), (cc)))
#else
#define TSWAP(a, b, c) \
do { \
__typeof(a) __tmp = (a); \
(a) = (b); \
(b) = __tmp; \
} while (0)
#define TMAX(a, b) \
({ \
__typeof(a) __a = (a); \
...
...
@@ -51,7 +45,7 @@ extern "C" {
(__a > __b) ? __a : __b; \
})
#define TMIN(a, b) \
#define TMIN(a, b) \
({ \
__typeof(a) __a = (a); \
__typeof(b) __b = (b); \
...
...
include/util/taoserror.h
浏览文件 @
d66fdb94
...
...
@@ -62,6 +62,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0014)
#define TSDB_CODE_RPC_FQDN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0015)
#define TSDB_CODE_RPC_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x0016)
#define TSDB_CODE_RPC_PORT_EADDRINUSE TAOS_DEF_ERROR_CODE(0, 0x0017)
//common & util
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0100)
...
...
include/util/tencode.h
浏览文件 @
d66fdb94
...
...
@@ -79,31 +79,52 @@ typedef struct {
#define TD_CODER_CURRENT(CODER) ((CODER)->data + (CODER)->pos)
#define TD_CODER_MOVE_POS(CODER, MOVE) ((CODER)->pos += (MOVE))
#define TD_CODER_CHECK_CAPACITY_FAILED(CODER, EXPSIZE) (((CODER)->size - (CODER)->pos) < (EXPSIZE))
#define TCODER_MALLOC(PCODER, SIZE) \
({ \
void* ptr = NULL; \
SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \
if (pMem) { \
pMem->next = (PCODER)->mList; \
(PCODER)->mList = pMem; \
ptr = (void*)&pMem[1]; \
} \
ptr; \
})
// #define TCODER_MALLOC(PCODER, SIZE) \
// ({ \
// void* ptr = NULL; \
// SCoderMem* pMem = (SCoderMem*)taosMemoryMalloc(sizeof(*pMem) + (SIZE)); \
// if (pMem) { \
// pMem->next = (PCODER)->mList; \
// (PCODER)->mList = pMem; \
// ptr = (void*)&pMem[1]; \
// } \
// ptr; \
// })
static
FORCE_INLINE
void
*
tCoderMalloc
(
SCoder
*
pCoder
,
int32_t
size
)
{
void
*
ptr
=
NULL
;
SCoderMem
*
pMem
=
(
SCoderMem
*
)
taosMemoryMalloc
(
sizeof
(
SCoderMem
*
)
+
size
);
if
(
pMem
)
{
pMem
->
next
=
pCoder
->
mList
;
pCoder
->
mList
=
pMem
;
ptr
=
(
void
*
)
&
pMem
[
1
];
}
return
ptr
;
}
#define tEncodeSize(E, S, SIZE
)
\
({
\
#define tEncodeSize(E, S, SIZE
, RET)
\
do{
\
SCoder coder = {0}; \
int ret = 0; \
tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \
if ((E)(&coder, S) == 0) { \
SIZE = coder.pos; \
} else { \
ret
= -1; \
RET
= -1; \
} \
tCoderClear(&coder); \
ret; \
})
}while(0)
// #define tEncodeSize(E, S, SIZE) \
// ({ \
// SCoder coder = {0}; \
// int ret = 0; \
// tCoderInit(&coder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); \
// if ((E)(&coder, S) == 0) { \
// SIZE = coder.pos; \
// } else { \
// ret = -1; \
// } \
// tCoderClear(&coder); \
// ret; \
// })
void
tCoderInit
(
SCoder
*
pCoder
,
td_endian_t
endian
,
uint8_t
*
data
,
int32_t
size
,
td_coder_t
type
);
void
tCoderClear
(
SCoder
*
pCoder
);
...
...
source/client/inc/clientInt.h
浏览文件 @
d66fdb94
...
...
@@ -254,8 +254,6 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int
genericRspCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pReqObj
);
int
taos_init
();
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
...
...
source/client/src/clientImpl.c
浏览文件 @
d66fdb94
...
...
@@ -188,8 +188,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResPrecision
(
&
pRequest
->
body
.
resInfo
,
(
*
pQuery
)
->
precision
);
}
TSWAP
(
pRequest
->
dbList
,
(
*
pQuery
)
->
pDbList
,
SArray
*
);
TSWAP
(
pRequest
->
tableList
,
(
*
pQuery
)
->
pTableList
,
SArray
*
);
TSWAP
(
pRequest
->
dbList
,
(
*
pQuery
)
->
pDbList
);
TSWAP
(
pRequest
->
tableList
,
(
*
pQuery
)
->
pTableList
);
}
return
code
;
...
...
@@ -358,8 +358,15 @@ SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SQuery
*
pQuery
=
NULL
;
int32_t
code
=
buildRequest
(
pTscObj
,
sql
,
sqlLen
,
&
pRequest
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
}
code
=
parseSql
(
pRequest
,
false
,
&
pQuery
,
NULL
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pRequest
->
code
=
code
;
return
pRequest
;
}
return
launchQueryImpl
(
pRequest
,
pQuery
,
code
,
false
);
...
...
@@ -410,7 +417,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
while
(
retryNum
++
<
REQUEST_MAX_TRY_TIMES
)
{
pRequest
=
launchQuery
(
pTscObj
,
sql
,
sqlLen
);
if
(
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
!
NEED_CLIENT_HANDLE_ERROR
(
pRequest
->
code
))
{
if
(
pRequest
==
NULL
||
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
!
NEED_CLIENT_HANDLE_ERROR
(
pRequest
->
code
))
{
break
;
}
...
...
source/client/src/taos.def
浏览文件 @
d66fdb94
taos_cleanup
taos_options
taos_set_config
taos_init
taos_connect
taos_connect_l
taos_connect_auth
...
...
source/common/src/tdatablock.c
浏览文件 @
d66fdb94
...
...
@@ -225,13 +225,17 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
// Handle the bitmap
char
*
p
=
taosMemoryRealloc
(
pColumnInfoData
->
varmeta
.
offset
,
sizeof
(
int32_t
)
*
(
numOfRow1
+
numOfRow2
));
if
(
p
==
NULL
)
{
// TODO
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pColumnInfoData
->
varmeta
.
offset
=
(
int32_t
*
)
p
;
for
(
int32_t
i
=
0
;
i
<
numOfRow2
;
++
i
)
{
if
(
pSource
->
varmeta
.
offset
[
i
]
==
-
1
)
{
pColumnInfoData
->
varmeta
.
offset
[
i
+
numOfRow1
]
=
-
1
;
}
else
{
pColumnInfoData
->
varmeta
.
offset
[
i
+
numOfRow1
]
=
pSource
->
varmeta
.
offset
[
i
]
+
pColumnInfoData
->
varmeta
.
length
;
}
}
// copy data
uint32_t
len
=
pSource
->
varmeta
.
length
;
...
...
@@ -239,7 +243,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
if
(
pColumnInfoData
->
varmeta
.
allocLen
<
len
+
oldLen
)
{
char
*
tmp
=
taosMemoryRealloc
(
pColumnInfoData
->
pData
,
len
+
oldLen
);
if
(
tmp
==
NULL
)
{
return
TSDB_CODE_
VND_
OUT_OF_MEMORY
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pColumnInfoData
->
pData
=
tmp
;
...
...
source/common/src/tmsg.c
浏览文件 @
d66fdb94
...
...
@@ -3231,7 +3231,7 @@ int32_t tEncodeSMqCMCommitOffsetReq(SCoder *encoder, const SMqCMCommitOffsetReq
int32_t
tDecodeSMqCMCommitOffsetReq
(
SCoder
*
decoder
,
SMqCMCommitOffsetReq
*
pReq
)
{
if
(
tStartDecode
(
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
decoder
,
&
pReq
->
num
)
<
0
)
return
-
1
;
pReq
->
offsets
=
(
SMqOffset
*
)
TCODER_MALLOC
(
decoder
,
sizeof
(
SMqOffset
)
*
pReq
->
num
);
pReq
->
offsets
=
(
SMqOffset
*
)
tCoderMalloc
(
decoder
,
sizeof
(
SMqOffset
)
*
pReq
->
num
);
if
(
pReq
->
offsets
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
num
;
i
++
)
{
tDecodeSMqOffset
(
decoder
,
&
pReq
->
offsets
[
i
]);
...
...
@@ -3514,7 +3514,7 @@ int tDecodeSVCreateTbBatchRsp(SCoder *pCoder, SVCreateTbBatchRsp *pRsp) {
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRsp
->
nRsps
)
<
0
)
return
-
1
;
pRsp
->
pRsps
=
(
SVCreateTbRsp
*
)
TCODER_MALLOC
(
pCoder
,
sizeof
(
*
pRsp
->
pRsps
)
*
pRsp
->
nRsps
);
pRsp
->
pRsps
=
(
SVCreateTbRsp
*
)
tCoderMalloc
(
pCoder
,
sizeof
(
*
pRsp
->
pRsps
)
*
pRsp
->
nRsps
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
nRsps
;
i
++
)
{
if
(
tDecodeSVCreateTbRsp
(
pCoder
,
pRsp
->
pRsps
+
i
)
<
0
)
return
-
1
;
}
...
...
@@ -3621,6 +3621,43 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear
(
pReq
->
ast
);
}
int32_t
tEncodeSRSmaParam
(
SCoder
*
pCoder
,
const
SRSmaParam
*
pRSmaParam
)
{
if
(
tEncodeFloat
(
pCoder
,
pRSmaParam
->
xFilesFactor
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pCoder
,
pRSmaParam
->
delay
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pCoder
,
pRSmaParam
->
qmsg1Len
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pCoder
,
pRSmaParam
->
qmsg2Len
)
<
0
)
return
-
1
;
if
(
pRSmaParam
->
qmsg1Len
>
0
)
{
if
(
tEncodeBinary
(
pCoder
,
pRSmaParam
->
qmsg1
,
(
uint64_t
)
pRSmaParam
->
qmsg1Len
)
<
0
)
// qmsg1Len contains len of '\0'
return
-
1
;
}
if
(
pRSmaParam
->
qmsg2Len
>
0
)
{
if
(
tEncodeBinary
(
pCoder
,
pRSmaParam
->
qmsg2
,
(
uint64_t
)
pRSmaParam
->
qmsg2Len
)
<
0
)
// qmsg2Len contains len of '\0'
return
-
1
;
}
return
0
;
}
int32_t
tDecodeSRSmaParam
(
SCoder
*
pCoder
,
SRSmaParam
*
pRSmaParam
)
{
if
(
tDecodeFloat
(
pCoder
,
&
pRSmaParam
->
xFilesFactor
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRSmaParam
->
delay
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRSmaParam
->
qmsg1Len
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pRSmaParam
->
qmsg2Len
)
<
0
)
return
-
1
;
if
(
pRSmaParam
->
qmsg1Len
>
0
)
{
uint64_t
len
;
if
(
tDecodeBinaryAlloc
(
pCoder
,
(
void
**
)
&
pRSmaParam
->
qmsg1
,
&
len
)
<
0
)
return
-
1
;
// qmsg1Len contains len of '\0'
}
else
{
pRSmaParam
->
qmsg1
=
NULL
;
}
if
(
pRSmaParam
->
qmsg2Len
>
0
)
{
uint64_t
len
;
if
(
tDecodeBinaryAlloc
(
pCoder
,
(
void
**
)
&
pRSmaParam
->
qmsg2
,
&
len
)
<
0
)
return
-
1
;
// qmsg2Len contains len of '\0'
}
else
{
pRSmaParam
->
qmsg2
=
NULL
;
}
return
0
;
}
int
tEncodeSVCreateStbReq
(
SCoder
*
pCoder
,
const
SVCreateStbReq
*
pReq
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
...
...
@@ -3629,9 +3666,9 @@ int tEncodeSVCreateStbReq(SCoder *pCoder, const SVCreateStbReq *pReq) {
if
(
tEncodeI8
(
pCoder
,
pReq
->
rollup
)
<
0
)
return
-
1
;
if
(
tEncodeSSchemaWrapper
(
pCoder
,
&
pReq
->
schema
)
<
0
)
return
-
1
;
if
(
tEncodeSSchemaWrapper
(
pCoder
,
&
pReq
->
schemaTag
)
<
0
)
return
-
1
;
//
if (pReq->rollup) {
// if (tEncodeSRSmaParam(pCoder,
pReq->pRSmaParam) < 0) return -1;
//
}
if
(
pReq
->
rollup
)
{
if
(
tEncodeSRSmaParam
(
pCoder
,
&
pReq
->
pRSmaParam
)
<
0
)
return
-
1
;
}
tEndEncode
(
pCoder
);
return
0
;
...
...
@@ -3645,9 +3682,9 @@ int tDecodeSVCreateStbReq(SCoder *pCoder, SVCreateStbReq *pReq) {
if
(
tDecodeI8
(
pCoder
,
&
pReq
->
rollup
)
<
0
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pCoder
,
&
pReq
->
schema
)
<
0
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pCoder
,
&
pReq
->
schemaTag
)
<
0
)
return
-
1
;
//
if (pReq->rollup) {
// if (tDecodeSRSmaParam(pCoder,
pReq->pRSmaParam) < 0) return -1;
//
}
if
(
pReq
->
rollup
)
{
if
(
tDecodeSRSmaParam
(
pCoder
,
&
pReq
->
pRSmaParam
)
<
0
)
return
-
1
;
}
tEndDecode
(
pCoder
);
return
0
;
...
...
@@ -3743,7 +3780,7 @@ int tDecodeSVCreateTbBatchReq(SCoder *pCoder, SVCreateTbBatchReq *pReq) {
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pCoder
,
&
pReq
->
nReqs
)
<
0
)
return
-
1
;
pReq
->
pReqs
=
(
SVCreateTbReq
*
)
TCODER_MALLOC
(
pCoder
,
sizeof
(
SVCreateTbReq
)
*
pReq
->
nReqs
);
pReq
->
pReqs
=
(
SVCreateTbReq
*
)
tCoderMalloc
(
pCoder
,
sizeof
(
SVCreateTbReq
)
*
pReq
->
nReqs
);
if
(
pReq
->
pReqs
==
NULL
)
return
-
1
;
for
(
int
iReq
=
0
;
iReq
<
pReq
->
nReqs
;
iReq
++
)
{
if
(
tDecodeSVCreateTbReq
(
pCoder
,
pReq
->
pReqs
+
iReq
)
<
0
)
return
-
1
;
...
...
source/common/src/trow.c
浏览文件 @
d66fdb94
...
...
@@ -220,7 +220,7 @@ static uint8_t tdGetMergedBitmapByte(uint8_t byte) {
}
/**
* @brief Merge bitmap from 2 bits to 1 bit
s
, and the memory buffer should be guaranteed by the invoker.
* @brief Merge bitmap from 2 bits to 1 bit, and the memory buffer should be guaranteed by the invoker.
*
* @param srcBitmap
* @param nBits
...
...
source/common/src/ttypes.c
浏览文件 @
d66fdb94
...
...
@@ -651,35 +651,35 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void *buf
switch
(
type
)
{
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
{
TSWAP
(
*
(
int32_t
*
)(
pLeft
),
*
(
int32_t
*
)(
pRight
)
,
int32_t
);
TSWAP
(
*
(
int32_t
*
)(
pLeft
),
*
(
int32_t
*
)(
pRight
));
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
case
TSDB_DATA_TYPE_UBIGINT
:
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
TSWAP
(
*
(
int64_t
*
)(
pLeft
),
*
(
int64_t
*
)(
pRight
)
,
int64_t
);
TSWAP
(
*
(
int64_t
*
)(
pLeft
),
*
(
int64_t
*
)(
pRight
));
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
TSWAP
(
*
(
double
*
)(
pLeft
),
*
(
double
*
)(
pRight
)
,
double
);
TSWAP
(
*
(
double
*
)(
pLeft
),
*
(
double
*
)(
pRight
));
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
case
TSDB_DATA_TYPE_USMALLINT
:
{
TSWAP
(
*
(
int16_t
*
)(
pLeft
),
*
(
int16_t
*
)(
pRight
)
,
int16_t
);
TSWAP
(
*
(
int16_t
*
)(
pLeft
),
*
(
int16_t
*
)(
pRight
));
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
TSWAP
(
*
(
float
*
)(
pLeft
),
*
(
float
*
)(
pRight
)
,
float
);
TSWAP
(
*
(
float
*
)(
pLeft
),
*
(
float
*
)(
pRight
));
break
;
}
case
TSDB_DATA_TYPE_BOOL
:
case
TSDB_DATA_TYPE_TINYINT
:
case
TSDB_DATA_TYPE_UTINYINT
:
{
TSWAP
(
*
(
int8_t
*
)(
pLeft
),
*
(
int8_t
*
)(
pRight
)
,
int8_t
);
TSWAP
(
*
(
int8_t
*
)(
pLeft
),
*
(
int8_t
*
)(
pRight
));
break
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
d66fdb94
...
...
@@ -395,7 +395,9 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
}
}
// get length
if
(
tEncodeSize
(
tEncodeSVCreateStbReq
,
&
req
,
contLen
)
<
0
)
{
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateStbReq
,
&
req
,
contLen
,
ret
);
if
(
ret
<
0
)
{
return
NULL
;
}
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
d66fdb94
...
...
@@ -230,8 +230,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
memcpy
(
pOld
->
pass
,
pNew
->
pass
,
TSDB_PASSWORD_LEN
);
pOld
->
updateTime
=
pNew
->
updateTime
;
TSWAP
(
pOld
->
readDbs
,
pNew
->
readDbs
,
(
void
*
)
);
TSWAP
(
pOld
->
writeDbs
,
pNew
->
writeDbs
,
(
void
*
)
);
TSWAP
(
pOld
->
readDbs
,
pNew
->
readDbs
);
TSWAP
(
pOld
->
writeDbs
,
pNew
->
writeDbs
);
return
0
;
}
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
d66fdb94
...
...
@@ -77,21 +77,25 @@ typedef struct {
tb_uid_t
uid
;
}
STbDbKey
;
typedef
struct
__attribute__
((
__packed__
))
{
#pragma pack(push, 1)
typedef
struct
{
tb_uid_t
uid
;
int32_t
sver
;
}
SSkmDbKey
;
#pragma pack(pop)
typedef
struct
{
tb_uid_t
suid
;
tb_uid_t
uid
;
}
SCtbIdxKey
;
typedef
struct
__attribute__
((
__packed__
))
{
#pragma pack(push, 1)
typedef
struct
{
tb_uid_t
suid
;
int16_t
cid
;
char
data
[];
}
STagIdxKey
;
#pragma pack(pop)
typedef
struct
{
int64_t
dtime
;
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
d66fdb94
...
...
@@ -22,13 +22,13 @@
extern
"C"
{
#endif
typedef
int32_t
(
*
__tb_ddl_fn_t
)(
void
*
ahandle
,
void
**
result
,
void
*
p1
,
void
*
p2
);
//
typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2);
struct
STbDdlH
{
void
*
ahandle
;
void
*
result
;
__tb_ddl_fn_t
fp
;
};
//
struct STbDdlH {
//
void *ahandle;
//
void *result;
//
__tb_ddl_fn_t fp;
//
};
static
FORCE_INLINE
int32_t
tsdbUidStoreInit
(
STbUidStore
**
pStore
)
{
ASSERT
(
*
pStore
==
NULL
);
...
...
@@ -40,14 +40,6 @@ static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
);
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateTbReq
*
pReq
);
int32_t
tsdbFetchTbUidList
(
void
*
pTsdb
,
void
**
result
,
void
*
suid
,
void
*
uid
);
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
d66fdb94
...
...
@@ -107,6 +107,15 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
// sma
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
);
int32_t
tsdbFetchTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
);
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pUidStore
);
void
tsdbUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
);
int32_t
tsdbTriggerRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
void
*
pMsg
,
int32_t
inputType
);
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
...
...
@@ -162,7 +171,7 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef
struct
STbDdlH
STbDdlH
;
//
typedef struct STbDdlH STbDdlH;
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
d66fdb94
...
...
@@ -158,7 +158,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
pKey
=
&
tbDbKey
;
kLen
=
sizeof
(
tbDbKey
);
if
(
tEncodeSize
(
metaEncodeEntry
,
pME
,
vLen
)
<
0
)
{
int32_t
ret
=
0
;
tEncodeSize
(
metaEncodeEntry
,
pME
,
vLen
,
ret
);
if
(
ret
<
0
)
{
goto
_err
;
}
...
...
@@ -250,7 +252,9 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
skmDbKey
.
sver
=
pSW
->
sver
;
// encode schema
if
(
tEncodeSize
(
tEncodeSSchemaWrapper
,
pSW
,
vLen
)
<
0
)
return
-
1
;
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSSchemaWrapper
,
pSW
,
vLen
,
ret
);
if
(
ret
<
0
)
return
-
1
;
pVal
=
taosMemoryMalloc
(
vLen
);
if
(
pVal
==
NULL
)
{
rcode
=
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d66fdb94
...
...
@@ -454,7 +454,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
if
(
emptyQueryTimewindow
(
pTsdbReadHandle
))
{
if
(
pCond
->
order
!=
pTsdbReadHandle
->
order
)
{
pTsdbReadHandle
->
order
=
pCond
->
order
;
TSWAP
(
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
int64_t
);
TSWAP
(
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
);
}
return
;
...
...
@@ -924,7 +924,7 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
pHandle
->
cur
.
mixBlock
=
true
;
if
(
!
ASCENDING_TRAVERSE
(
pHandle
->
order
))
{
TSWAP
(
win
->
skey
,
win
->
ekey
,
TSKEY
);
TSWAP
(
win
->
skey
,
win
->
ekey
);
}
return
true
;
...
...
@@ -1203,7 +1203,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
// update the last key value
pCheckInfo
->
lastKey
=
cur
->
win
.
ekey
+
step
;
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
TSWAP
(
cur
->
win
.
skey
,
cur
->
win
.
ekey
,
TSKEY
);
TSWAP
(
cur
->
win
.
skey
,
cur
->
win
.
ekey
);
}
cur
->
mixBlock
=
true
;
...
...
@@ -1519,8 +1519,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
}
else
if
(
isRow1DataRow
)
{
colIdOfRow1
=
pSchema1
->
columns
[
j
].
colId
;
}
else
{
SKvRowIdx
*
pColIdx
=
tdKvRowColIdxAt
(
row1
,
j
);
colIdOfRow1
=
pColIdx
->
colId
;
colIdOfRow1
=
tdKvRowColIdAt
(
row1
,
j
);
}
int32_t
colIdOfRow2
;
...
...
@@ -1529,8 +1528,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
}
else
if
(
isRow2DataRow
)
{
colIdOfRow2
=
pSchema2
->
columns
[
k
].
colId
;
}
else
{
SKvRowIdx
*
pColIdx
=
tdKvRowColIdxAt
(
row2
,
k
);
colIdOfRow2
=
pColIdx
->
colId
;
colIdOfRow2
=
tdKvRowColIdAt
(
row2
,
k
);
}
if
(
colIdOfRow1
==
colIdOfRow2
)
{
...
...
@@ -1701,7 +1699,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
int32_t
end
=
endPos
;
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
TSWAP
(
start
,
end
,
int32_t
);
TSWAP
(
start
,
end
);
}
assert
(
pTsdbReadHandle
->
outputCapacity
>=
(
end
-
start
+
1
));
...
...
@@ -1932,7 +1930,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
((
pos
<
endPos
||
cur
->
lastKey
<
pTsdbReadHandle
->
window
.
ekey
)
&&
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
)));
if
(
!
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
))
{
TSWAP
(
cur
->
win
.
skey
,
cur
->
win
.
ekey
,
TSKEY
);
TSWAP
(
cur
->
win
.
skey
,
cur
->
win
.
ekey
);
}
moveDataToFront
(
pTsdbReadHandle
,
numOfRows
,
numOfCols
);
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
d66fdb94
...
...
@@ -173,6 +173,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[])
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
static
int32_t
tsdbInsertRSmaDataImpl
(
STsdb
*
pTsdb
,
const
char
*
msg
);
static
FORCE_INLINE
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
FORCE_INLINE
int32_t
tsdbUpdateTbUidListImpl
(
STsdb
*
pTsdb
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
// mgmt interface
static
int32_t
tsdbDropTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
...
...
@@ -1692,18 +1693,16 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
* @param pReq
* @return int32_t
*/
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateTbReq
*
pReq
)
{
#if 0
SRSmaParam *param = pReq->stbCfg.pRSmaParam;
if (!param) {
tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name,
pReq->stbCfg.suid);
int32_t
tsdbRegisterRSma
(
STsdb
*
pTsdb
,
SMeta
*
pMeta
,
SVCreateStbReq
*
pReq
)
{
if
(
!
pReq
->
rollup
)
{
tsdbDebug
(
"vgId:%d return directly since no rollup for stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
SRSmaParam
*
param
=
&
pReq
->
pRSmaParam
;
if
((
param
->
qmsg1Len
==
0
)
&&
(
param
->
qmsg2Len
==
0
))
{
tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->s
tbCfg.s
uid);
tsdbWarn
(
"vgId:%d no qmsg1/qmsg2 for rollup stable %s %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1716,9 +1715,9 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->s
tbCfg.s
uid, sizeof(tb_uid_t));
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->s
tbCfg.s
uid);
tsdbWarn
(
"vgId:%d rsma info already exists for stb: %s, %"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1758,14 +1757,13 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
}
}
if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->s
tbCfg.s
uid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
if
(
taosHashPut
(
SMA_STAT_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_FAILED
;
}
else
{
tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.
suid);
tsdbDebug
(
"vgId:%d register rsma info succeed for suid:%"
PRIi64
,
REPO_ID
(
pTsdb
),
pReq
->
suid
);
}
#endif
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1777,7 +1775,7 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
* @param uid
* @return int32_t
*/
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
)
{
static
int32_t
tsdbUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
)
{
// prefer to store suid/uids in array
if
((
suid
==
pStore
->
suid
)
||
(
pStore
->
suid
==
0
))
{
if
(
pStore
->
suid
==
0
)
{
...
...
@@ -1833,6 +1831,7 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
if
(
pStore
)
{
if
(
pStore
->
uidHash
)
{
if
(
pStore
->
tbUids
)
{
// When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys.
void
*
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
NULL
);
while
(
pIter
)
{
SArray
*
arr
=
*
(
SArray
**
)
pIter
;
...
...
@@ -1847,8 +1846,10 @@ void tsdbUidStoreDestory(STbUidStore *pStore) {
}
void
*
tsdbUidStoreFree
(
STbUidStore
*
pStore
)
{
if
(
pStore
)
{
tsdbUidStoreDestory
(
pStore
);
taosMemoryFree
(
pStore
);
}
return
NULL
;
}
...
...
@@ -1861,7 +1862,7 @@ void *tsdbUidStoreFree(STbUidStore *pStore) {
* @param uid
* @return int32_t
*/
int32_t
tsdbFetchTbUidList
(
void
*
pTsdb
,
void
**
ppStore
,
void
*
suid
,
void
*
uid
)
{
int32_t
tsdbFetchTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
**
ppStore
,
tb_uid_t
suid
,
tb_uid_t
uid
)
{
SSmaEnv
*
pEnv
=
REPO_RSMA_ENV
((
STsdb
*
)
pTsdb
);
// only applicable to rollup SMA ctables
...
...
@@ -1877,7 +1878,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
}
// info cached when create rsma stable and return directly for non-rsma ctables
if
(
!
taosHashGet
(
infoHash
,
suid
,
sizeof
(
tb_uid_t
)))
{
if
(
!
taosHashGet
(
infoHash
,
&
suid
,
sizeof
(
tb_uid_t
)))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1887,7 +1888,7 @@ int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) {
}
}
if
(
tsdbUidStorePut
(
*
ppStore
,
*
(
tb_uid_t
*
)
suid
,
(
tb_uid_t
*
)
uid
)
!=
0
)
{
if
(
tsdbUidStorePut
(
*
ppStore
,
suid
,
&
uid
)
!=
0
)
{
*
ppStore
=
tsdbUidStoreFree
(
*
ppStore
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -1935,12 +1936,10 @@ static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid
int32_t
tsdbUpdateTbUidList
(
STsdb
*
pTsdb
,
STbUidStore
*
pStore
)
{
if
(
!
pStore
||
(
taosArrayGetSize
(
pStore
->
tbUids
)
==
0
))
{
tsdbDebug
(
"vgId:%d no need to update tbUids since empty uidStore"
,
REPO_ID
(
pTsdb
));
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_SUCCESS
;
}
if
(
tsdbUpdateTbUidListImpl
(
pTsdb
,
&
pStore
->
suid
,
pStore
->
tbUids
)
!=
TSDB_CODE_SUCCESS
)
{
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -1951,15 +1950,11 @@ int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) {
if
(
tsdbUpdateTbUidListImpl
(
pTsdb
,
pTbSuid
,
pTbUids
)
!=
TSDB_CODE_SUCCESS
)
{
taosHashCancelIterate
(
pStore
->
uidHash
,
pIter
);
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_FAILED
;
}
pIter
=
taosHashIterate
(
pStore
->
uidHash
,
pIter
);
}
tsdbUidStoreFree
(
pStore
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1971,8 +1966,6 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
STSRow
*
row
=
NULL
;
terrno
=
TSDB_CODE_SUCCESS
;
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if
(
tInitSubmitMsgIterEx
(
pMsg
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d66fdb94
...
...
@@ -300,13 +300,13 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto
_err
;
}
// tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq);
if
(
metaCreateSTable
(
pVnode
->
pMeta
,
version
,
&
req
)
<
0
)
{
pRsp
->
code
=
terrno
;
goto
_err
;
}
tsdbRegisterRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
&
req
);
tCoderClear
(
&
coder
);
return
0
;
...
...
@@ -323,6 +323,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
SVCreateTbBatchRsp
rsp
=
{
0
};
SVCreateTbRsp
cRsp
=
{
0
};
char
tbName
[
TSDB_TABLE_FNAME_LEN
];
STbUidStore
*
pStore
=
NULL
;
pRsp
->
msgType
=
TDMT_VND_CREATE_TABLE_RSP
;
pRsp
->
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -361,6 +362,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
cRsp
.
code
=
terrno
;
}
else
{
cRsp
.
code
=
TSDB_CODE_SUCCESS
;
tsdbFetchTbUidList
(
pVnode
->
pTsdb
,
&
pStore
,
pCreateReq
->
ctb
.
suid
,
pCreateReq
->
uid
);
}
taosArrayPush
(
rsp
.
pArray
,
&
cRsp
);
...
...
@@ -368,8 +370,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tCoderClear
(
&
coder
);
tsdbUpdateTbUidList
(
pVnode
->
pTsdb
,
pStore
);
tsdbUidStoreFree
(
pStore
);
// prepare rsp
tEncodeSize
(
tEncodeSVCreateTbBatchRsp
,
&
rsp
,
pRsp
->
contLen
);
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTbBatchRsp
,
&
rsp
,
pRsp
->
contLen
,
ret
);
pRsp
->
pCont
=
rpcMallocCont
(
pRsp
->
contLen
);
if
(
pRsp
->
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -425,7 +431,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitRsp
rsp
=
{
0
};
pRsp
->
code
=
0
;
tsdbTriggerRSma
(
pVnode
->
pTsdb
,
pVnode
->
pMeta
,
pReq
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
// handle the request
if
(
tsdbInsertData
(
pVnode
->
pTsdb
,
version
,
pSubmitReq
,
&
rsp
)
<
0
)
{
pRsp
->
code
=
terrno
;
...
...
@@ -434,7 +440,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// pRsp->msgType = TDMT_VND_SUBMIT_RSP;
// vnodeProcessSubmitReq(pVnode, ptr, pRsp);
// tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, p
tr
, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, p
Req
, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// encode the response (TODO)
pRsp
->
pCont
=
rpcMallocCont
(
sizeof
(
SSubmitRsp
));
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d66fdb94
...
...
@@ -2100,7 +2100,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
//
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// }
//
// pQueryAttr->needReverseScan = false;
...
...
@@ -2110,7 +2110,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
// if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) {
// pQueryAttr->order.order = TSDB_ORDER_ASC;
// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) {
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// }
//
// pQueryAttr->needReverseScan = false;
...
...
@@ -2135,7 +2135,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
// //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey,
//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
...
...
@@ -2146,7 +2146,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
// //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
...
...
@@ -2162,7 +2162,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
...
...
@@ -2174,7 +2174,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey,
/// pQueryAttr->window.skey);
//
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey
, TSKEY
);
// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey);
// doUpdateLastKey(pQueryAttr);
// }
//
...
...
@@ -2673,7 +2673,7 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo)
return
;
}
// TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey
, TSKEY
);
// TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey);
// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
// SWITCH_ORDER(pTableQueryInfo->cur.order);
...
...
@@ -4994,13 +4994,21 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
pProjectInfo
->
curOffset
=
0
;
}
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
// check for the limitation in each group
if
(
pProjectInfo
->
limit
.
limit
>
0
&&
pProjectInfo
->
curOutput
+
pRes
->
info
.
rows
>=
pProjectInfo
->
limit
.
limit
)
{
pRes
->
info
.
rows
=
(
int32_t
)(
pProjectInfo
->
limit
.
limit
-
pProjectInfo
->
curOutput
);
if
(
pProjectInfo
->
slimit
.
limit
==
-
1
||
pProjectInfo
->
slimit
.
limit
<=
pProjectInfo
->
curGroupOutput
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
return
PROJECT_RETRIEVE_DONE
;
}
// todo optimize performance
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
// they may not belong to the same group the limit/offset value is not valid in this case.
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
||
pProjectInfo
->
slimit
.
offset
!=
-
1
||
pProjectInfo
->
slimit
.
limit
!=
-
1
)
{
return
PROJECT_RETRIEVE_DONE
;
}
else
{
// not full enough, continue to accumulate the output data in the buffer.
return
PROJECT_RETRIEVE_CONTINUE
;
...
...
@@ -6652,7 +6660,7 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS
//todo work around a problem, remove it later
if
((
pCond
->
order
==
TSDB_ORDER_ASC
&&
pCond
->
twindow
.
skey
>
pCond
->
twindow
.
ekey
)
||
(
pCond
->
order
==
TSDB_ORDER_DESC
&&
pCond
->
twindow
.
skey
<
pCond
->
twindow
.
ekey
))
{
TSWAP
(
pCond
->
twindow
.
skey
,
pCond
->
twindow
.
ekey
,
int64_t
);
TSWAP
(
pCond
->
twindow
.
skey
,
pCond
->
twindow
.
ekey
);
}
#endif
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d66fdb94
...
...
@@ -266,7 +266,7 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
// setupQueryRangeForReverseScan(pTableScanInfo);
STimeWindow
*
pTWindow
=
&
pTableScanInfo
->
cond
.
twindow
;
TSWAP
(
pTWindow
->
skey
,
pTWindow
->
ekey
,
int64_t
);
TSWAP
(
pTWindow
->
skey
,
pTWindow
->
ekey
);
pTableScanInfo
->
cond
.
order
=
TSDB_ORDER_DESC
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
d66fdb94
...
...
@@ -307,7 +307,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
taosHashGetDup
(
pCxt
->
pVgroupsHashObj
,
(
const
char
*
)
&
src
->
vgId
,
sizeof
(
src
->
vgId
),
&
dst
->
vg
);
dst
->
numOfTables
=
src
->
numOfTables
;
dst
->
size
=
src
->
size
;
TSWAP
(
dst
->
pData
,
src
->
pData
,
char
*
);
TSWAP
(
dst
->
pData
,
src
->
pData
);
buildMsgHeader
(
src
,
dst
);
taosArrayPush
(
pCxt
->
pOutput
->
pDataBlocks
,
&
dst
);
}
...
...
@@ -1069,7 +1069,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if
(
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)
&&
tbNum
>
0
)
{
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"single table allowed in one stmt"
);
;
}
destroyInsertParseContextForTable
(
pCxt
);
...
...
source/libs/parser/src/parInsertData.c
浏览文件 @
d66fdb94
...
...
@@ -161,7 +161,8 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
char
*
pBuf
;
int32_t
len
;
tEncodeSize
(
tEncodeSVCreateTbReq
,
pCreateTbReq
,
len
);
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTbReq
,
pCreateTbReq
,
len
,
ret
);
if
(
pBlocks
->
nAllocSize
-
pBlocks
->
size
<
len
)
{
pBlocks
->
nAllocSize
+=
len
+
pBlocks
->
rowSize
;
char
*
pTmp
=
taosMemoryRealloc
(
pBlocks
->
pData
,
pBlocks
->
nAllocSize
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
d66fdb94
...
...
@@ -540,8 +540,8 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
}
if
((
TSDB_DATA_TYPE_TIMESTAMP
==
ldt
.
type
&&
TSDB_DATA_TYPE_TIMESTAMP
==
rdt
.
type
)
||
(
TSDB_DATA_TYPE_TIMESTAMP
==
ldt
.
type
&&
IS_VAR_DATA_TYPE
(
rdt
.
type
))
||
(
TSDB_DATA_TYPE_TIMESTAMP
==
rdt
.
type
&&
IS_VAR_DATA_TYPE
(
ldt
.
type
)))
{
(
TSDB_DATA_TYPE_TIMESTAMP
==
ldt
.
type
&&
(
IS_VAR_DATA_TYPE
(
rdt
.
type
)
||
IS_FLOAT_TYPE
(
rdt
.
type
)
))
||
(
TSDB_DATA_TYPE_TIMESTAMP
==
rdt
.
type
&&
(
IS_VAR_DATA_TYPE
(
ldt
.
type
)
||
IS_FLOAT_TYPE
(
ldt
.
type
)
)))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
}
...
...
@@ -1973,10 +1973,10 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
}
strcpy
(
pTable
->
table
.
dbName
,
pInfo
->
pDbName
);
strcpy
(
pTable
->
table
.
tableName
,
pInfo
->
pTableName
);
TSWAP
(
pTable
->
pMeta
,
pInfo
->
pRollupTableMeta
,
STableMeta
*
);
TSWAP
(
pTable
->
pMeta
,
pInfo
->
pRollupTableMeta
);
pSelect
->
pFromTable
=
(
SNode
*
)
pTable
;
TSWAP
(
pSelect
->
pProjectionList
,
pInfo
->
pFuncs
,
SNodeList
*
);
TSWAP
(
pSelect
->
pProjectionList
,
pInfo
->
pFuncs
);
SFunctionNode
*
pFunc
=
nodesMakeNode
(
QUERY_NODE_FUNCTION
);
if
(
NULL
==
pSelect
->
pProjectionList
||
NULL
==
pFunc
)
{
nodesDestroyNode
(
pSelect
);
...
...
@@ -1993,9 +1993,9 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pSelect
->
pWindow
=
(
SNode
*
)
pInterval
;
TSWAP
(
pInterval
->
pInterval
,
pInfo
->
pInterval
,
SNode
*
);
TSWAP
(
pInterval
->
pOffset
,
pInfo
->
pOffset
,
SNode
*
);
TSWAP
(
pInterval
->
pSliding
,
pInfo
->
pSliding
,
SNode
*
);
TSWAP
(
pInterval
->
pInterval
,
pInfo
->
pInterval
);
TSWAP
(
pInterval
->
pOffset
,
pInfo
->
pOffset
);
TSWAP
(
pInterval
->
pSliding
,
pInfo
->
pSliding
);
pInterval
->
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pInterval
->
pCol
)
{
nodesDestroyNode
(
pSelect
);
...
...
@@ -3184,7 +3184,8 @@ static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray*
int
tlen
;
SCoder
coder
=
{
0
};
tEncodeSize
(
tEncodeSVCreateTbBatchReq
,
&
pTbBatch
->
req
,
tlen
);
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTbBatchReq
,
&
pTbBatch
->
req
,
tlen
,
ret
);
tlen
+=
sizeof
(
SMsgHead
);
//+ tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
void
*
buf
=
taosMemoryMalloc
(
tlen
);
if
(
NULL
==
buf
)
{
...
...
@@ -3598,7 +3599,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
default:
pQuery
->
execMode
=
QUERY_EXEC_MODE_RPC
;
if
(
NULL
!=
pCxt
->
pCmdMsg
)
{
TSWAP
(
pQuery
->
pCmdMsg
,
pCxt
->
pCmdMsg
,
SCmdMsgInfo
*
);
TSWAP
(
pQuery
->
pCmdMsg
,
pCxt
->
pCmdMsg
);
pQuery
->
msgType
=
pQuery
->
pCmdMsg
->
msgType
;
}
break
;
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
d66fdb94
...
...
@@ -251,8 +251,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return
TSDB_CODE_OUT_OF_MEMORY
;
}
TSWAP
(
pScan
->
pMeta
,
pRealTable
->
pMeta
,
STableMeta
*
);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
,
SVgroupsInfo
*
);
TSWAP
(
pScan
->
pMeta
,
pRealTable
->
pMeta
);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
);
pScan
->
scanSeq
[
0
]
=
1
;
pScan
->
scanSeq
[
1
]
=
0
;
pScan
->
scanRange
=
TSWINDOW_INITIALIZER
;
...
...
@@ -954,7 +954,7 @@ static int32_t createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpS
if
(
NULL
==
pModif
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
TSWAP
(
pModif
->
pDataBlocks
,
pStmt
->
pDataBlocks
,
SArray
*
);
TSWAP
(
pModif
->
pDataBlocks
,
pStmt
->
pDataBlocks
);
pModif
->
msgType
=
getMsgType
(
pStmt
->
sqlNodeType
);
*
pLogicNode
=
(
SLogicNode
*
)
pModif
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
d66fdb94
...
...
@@ -246,7 +246,7 @@ static int32_t cpdMergeConds(SNode** pDst, SNodeList** pSrc) {
static
int32_t
cpdCondAppend
(
SNode
**
pCond
,
SNode
**
pAdditionalCond
)
{
if
(
NULL
==
*
pCond
)
{
TSWAP
(
*
pCond
,
*
pAdditionalCond
,
SNode
*
);
TSWAP
(
*
pCond
,
*
pAdditionalCond
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
d66fdb94
...
...
@@ -1097,7 +1097,7 @@ static int32_t createDataInserter(SPhysiPlanContext* pCxt, SVgDataBlocks* pBlock
pInserter
->
numOfTables
=
pBlocks
->
numOfTables
;
pInserter
->
size
=
pBlocks
->
size
;
TSWAP
(
pInserter
->
pData
,
pBlocks
->
pData
,
char
*
);
TSWAP
(
pInserter
->
pData
,
pBlocks
->
pData
);
*
pSink
=
(
SDataSinkNode
*
)
pInserter
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
d66fdb94
...
...
@@ -65,7 +65,7 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
pSubplan
->
id
.
groupId
=
pCxt
->
groupId
;
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
pSubplan
->
pNode
=
(
SLogicNode
*
)
nodesCloneNode
(
pScan
);
TSWAP
(
pSubplan
->
pVgroupList
,
((
SScanLogicNode
*
)
pSubplan
->
pNode
)
->
pVgroupList
,
SVgroupsInfo
*
);
TSWAP
(
pSubplan
->
pVgroupList
,
((
SScanLogicNode
*
)
pSubplan
->
pNode
)
->
pVgroupList
);
SPLIT_FLAG_SET_MASK
(
pSubplan
->
splitFlag
,
flag
);
return
pSubplan
;
}
...
...
@@ -406,8 +406,7 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan
}
if
(
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
==
nodeType
(
pLogicNode
))
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_MODIFY
;
TSWAP
(((
SVnodeModifLogicNode
*
)
pLogicNode
)
->
pDataBlocks
,
((
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
)
->
pDataBlocks
,
SArray
*
);
TSWAP
(((
SVnodeModifLogicNode
*
)
pLogicNode
)
->
pDataBlocks
,
((
SVnodeModifLogicNode
*
)
pSubplan
->
pNode
)
->
pDataBlocks
);
}
else
{
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
}
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
d66fdb94
...
...
@@ -133,7 +133,12 @@ static int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SS
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
out
[
i
]
=
valFn
(
getValueFn
(
pInputData
->
pData
,
i
));
double
result
=
valFn
(
getValueFn
(
pInputData
->
pData
,
i
));
if
(
isinf
(
result
)
||
isnan
(
result
))
{
colDataAppendNULL
(
pOutputData
,
i
);
}
else
{
out
[
i
]
=
result
;
}
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
@@ -162,7 +167,12 @@ static int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, S
colDataAppendNULL
(
pOutputData
,
i
);
continue
;
}
out
[
i
]
=
valFn
(
getValueFn
[
0
](
pInputData
[
0
]
->
pData
,
i
),
getValueFn
[
1
](
pInputData
[
1
]
->
pData
,
0
));
double
result
=
valFn
(
getValueFn
[
0
](
pInputData
[
0
]
->
pData
,
i
),
getValueFn
[
1
](
pInputData
[
1
]
->
pData
,
0
));
if
(
isinf
(
result
)
||
isnan
(
result
))
{
colDataAppendNULL
(
pOutputData
,
i
);
}
else
{
out
[
i
]
=
result
;
}
}
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
d66fdb94
...
...
@@ -49,6 +49,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
tcphandle
=
(
*
taosInitHandle
[
pRpc
->
connType
])(
0
,
pInit
->
localPort
,
pRpc
->
label
,
pRpc
->
numOfThreads
,
NULL
,
pRpc
);
if
(
pRpc
->
tcphandle
==
NULL
)
{
taosMemoryFree
(
pRpc
);
return
NULL
;
}
pRpc
->
parent
=
pInit
->
parent
;
if
(
pInit
->
user
)
{
memcpy
(
pRpc
->
user
,
pInit
->
user
,
strlen
(
pInit
->
user
));
...
...
source/libs/transport/src/transCli.c
浏览文件 @
d66fdb94
...
...
@@ -912,7 +912,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
cliDestroy
((
uv_handle_t
*
)
pConn
->
stream
);
return
-
1
;
}
}
else
if
(
pCtx
->
retryCount
<
TRANS_RETRY_COUNT_LIMIT
)
{
if
(
pResp
->
contLen
==
0
)
{
pEpSet
->
inUse
=
(
pEpSet
->
inUse
++
)
%
pEpSet
->
numOfEps
;
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
d66fdb94
...
...
@@ -93,6 +93,8 @@ typedef struct SServerObj {
uint32_t
ip
;
uint32_t
port
;
uv_async_t
*
pAcceptAsync
;
// just to quit from from accept thread
bool
inited
;
}
SServerObj
;
// handle
...
...
@@ -143,7 +145,7 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR
static
int32_t
exHandlesMgt
;
void
uvInitE
xHandleMgt
();
void
uvInitE
nv
();
void
uvOpenExHandleMgt
(
int
size
);
void
uvCloseExHandleMgt
();
int64_t
uvAddExHandle
(
void
*
p
);
...
...
@@ -716,6 +718,7 @@ static bool addHandleToAcceptloop(void* arg) {
}
if
((
err
=
uv_listen
((
uv_stream_t
*
)
&
srv
->
server
,
512
,
uvOnAcceptCb
))
!=
0
)
{
tError
(
"failed to listen: %s"
,
uv_err_name
(
err
));
terrno
=
TSDB_CODE_RPC_PORT_EADDRINUSE
;
return
false
;
}
return
true
;
...
...
@@ -800,7 +803,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv
->
port
=
port
;
uv_loop_init
(
srv
->
loop
);
taosThreadOnce
(
&
transModuleInit
,
uvInitE
xHandleMgt
);
taosThreadOnce
(
&
transModuleInit
,
uvInitE
nv
);
transSrvInst
++
;
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
...
...
@@ -844,15 +847,15 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto
End
;
// clear all resource later
}
srv
->
inited
=
true
;
return
srv
;
End:
transCloseServer
(
srv
);
return
NULL
;
}
void
uvInitE
xHandleMgt
()
{
// init exhandle mgt
void
uvInitE
nv
()
{
uv_os_setenv
(
"UV_TCP_SINGLE_ACCEPT"
,
"1"
);
uvOpenExHandleMgt
(
10000
);
}
void
uvOpenExHandleMgt
(
int
size
)
{
...
...
@@ -958,9 +961,10 @@ void transCloseServer(void* arg) {
SServerObj
*
srv
=
arg
;
tDebug
(
"send quit msg to accept thread"
);
if
(
srv
->
inited
)
{
uv_async_send
(
srv
->
pAcceptAsync
);
taosThreadJoin
(
srv
->
thread
,
NULL
);
}
SRV_RELEASE_UV
(
srv
->
loop
);
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
...
...
source/os/src/osSemaphore.c
浏览文件 @
d66fdb94
...
...
@@ -67,6 +67,12 @@ int32_t tsem_wait(tsem_t* sem) {
return
ret
;
}
int32_t
tsem_timewait
(
tsem_t
*
sem
,
int64_t
nanosecs
)
{
int
ret
=
0
;
return
ret
;
}
#elif defined(_TD_DARWIN_64)
/*
...
...
source/util/src/terror.c
浏览文件 @
d66fdb94
...
...
@@ -68,6 +68,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_TIME_STAMP, "Client and server's t
TAOS_DEFINE_ERROR
(
TSDB_CODE_APP_NOT_READY
,
"Database not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_FQDN_ERROR
,
"Unable to resolve FQDN"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_INVALID_VERSION
,
"Invalid app version"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RPC_PORT_EADDRINUSE
,
"port already in use"
)
//common & util
TAOS_DEFINE_ERROR
(
TSDB_CODE_OUT_OF_MEMORY
,
"Out of Memory"
)
...
...
source/util/test/encodeTest.cpp
浏览文件 @
d66fdb94
...
...
@@ -230,7 +230,7 @@ static int32_t tSStructA_v1_decode(SCoder *pCoder, SStructA_v1 *pSAV1) {
const
char
*
tstr
;
uint64_t
len
;
if
(
tDecodeCStrAndLen
(
pCoder
,
&
tstr
,
&
len
)
<
0
)
return
-
1
;
pSAV1
->
A_c
=
(
char
*
)
TCODER_MALLOC
(
pCoder
,
len
+
1
);
pSAV1
->
A_c
=
(
char
*
)
tCoderMalloc
(
pCoder
,
len
+
1
);
memcpy
(
pSAV1
->
A_c
,
tstr
,
len
+
1
);
tEndDecode
(
pCoder
);
...
...
@@ -269,7 +269,7 @@ static int32_t tSStructA_v2_decode(SCoder *pCoder, SStructA_v2 *pSAV2) {
const
char
*
tstr
;
uint64_t
len
;
if
(
tDecodeCStrAndLen
(
pCoder
,
&
tstr
,
&
len
)
<
0
)
return
-
1
;
pSAV2
->
A_c
=
(
char
*
)
TCODER_MALLOC
(
pCoder
,
len
+
1
);
pSAV2
->
A_c
=
(
char
*
)
tCoderMalloc
(
pCoder
,
len
+
1
);
memcpy
(
pSAV2
->
A_c
,
tstr
,
len
+
1
);
// ------------------------NEW FIELDS DECODE-------------------------------
...
...
@@ -305,7 +305,7 @@ static int32_t tSFinalReq_v1_encode(SCoder *pCoder, const SFinalReq_v1 *ps1) {
static
int32_t
tSFinalReq_v1_decode
(
SCoder
*
pCoder
,
SFinalReq_v1
*
ps1
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
ps1
->
pA
=
(
SStructA_v1
*
)
TCODER_MALLOC
(
pCoder
,
sizeof
(
*
(
ps1
->
pA
)));
ps1
->
pA
=
(
SStructA_v1
*
)
tCoderMalloc
(
pCoder
,
sizeof
(
*
(
ps1
->
pA
)));
if
(
tSStructA_v1_decode
(
pCoder
,
ps1
->
pA
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
ps1
->
v_a
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
ps1
->
v_b
)
<
0
)
return
-
1
;
...
...
@@ -339,7 +339,7 @@ static int32_t tSFinalReq_v2_encode(SCoder *pCoder, const SFinalReq_v2 *ps2) {
static
int32_t
tSFinalReq_v2_decode
(
SCoder
*
pCoder
,
SFinalReq_v2
*
ps2
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
ps2
->
pA
=
(
SStructA_v2
*
)
TCODER_MALLOC
(
pCoder
,
sizeof
(
*
(
ps2
->
pA
)));
ps2
->
pA
=
(
SStructA_v2
*
)
tCoderMalloc
(
pCoder
,
sizeof
(
*
(
ps2
->
pA
)));
if
(
tSStructA_v2_decode
(
pCoder
,
ps2
->
pA
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
ps2
->
v_a
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
ps2
->
v_b
)
<
0
)
return
-
1
;
...
...
tests/system-test/0-others/taosShell.py
浏览文件 @
d66fdb94
...
...
@@ -70,11 +70,12 @@ class TDTestCase:
# 'serverPort': 7080, 'firstEp': 'trd02:7080'}
hostname
=
socket
.
gethostname
()
serverPort
=
'7080'
clientCfgDict
=
{
'serverPort'
:
''
,
'firstEp'
:
''
,
'secondEp'
:
''
}
rpcDebugFlagVal
=
'143'
clientCfgDict
=
{
'serverPort'
:
''
,
'firstEp'
:
''
,
'secondEp'
:
''
,
'rpcDebugFlag'
:
'135'
}
clientCfgDict
[
"serverPort"
]
=
serverPort
clientCfgDict
[
"firstEp"
]
=
hostname
+
':'
+
serverPort
clientCfgDict
[
"secondEp"
]
=
hostname
+
':'
+
serverPort
clientCfgDict
[
"rpcDebugFlag"
]
=
rpcDebugFlagVal
updatecfgDict
=
{
'clientCfg'
:
{},
'serverPort'
:
''
,
'firstEp'
:
''
,
'secondEp'
:
''
}
updatecfgDict
[
"clientCfg"
]
=
clientCfgDict
...
...
@@ -109,8 +110,8 @@ class TDTestCase:
# time.sleep(2)
tdSql
.
query
(
"create user testpy pass 'testpy'"
)
hostname
=
socket
.
gethostname
()
tdLog
.
info
(
"hostname: %s"
%
hostname
)
#
hostname = socket.gethostname()
#
tdLog.info ("hostname: %s" % hostname)
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
...
...
@@ -126,8 +127,9 @@ class TDTestCase:
keyDict
=
{
'h'
:
''
,
'P'
:
'6030'
,
'p'
:
'testpy'
,
'u'
:
'testpy'
,
'a'
:
''
,
'A'
:
''
,
'c'
:
''
,
'C'
:
''
,
's'
:
''
,
'r'
:
''
,
'f'
:
''
,
\
'k'
:
''
,
't'
:
''
,
'n'
:
''
,
'l'
:
'1024'
,
'N'
:
'100'
,
'V'
:
''
,
'd'
:
'db'
,
'w'
:
'30'
,
'-help'
:
''
,
'-usage'
:
''
,
'?'
:
''
}
keyDict
[
'h'
]
=
hostname
keyDict
[
'h'
]
=
self
.
hostname
keyDict
[
'c'
]
=
cfgPath
keyDict
[
'P'
]
=
self
.
serverPort
tdLog
.
printNoPrefix
(
"================================ parameter: -h"
)
newDbName
=
"dbh"
...
...
@@ -312,9 +314,25 @@ class TDTestCase:
if
retCode
!=
"TAOS_OK"
:
tdLog
.
exit
(
"taos -C fail"
)
print
(
"-C return content:
\n
"
,
retVal
)
#print ("-C return content:\n ", retVal)
totalCfgItem
=
{
"firstEp"
:[
''
,
''
,
''
],
}
for
line
in
retVal
.
splitlines
():
strList
=
line
.
split
()
if
(
len
(
strList
)
>
2
):
totalCfgItem
[
strList
[
1
]]
=
strList
#print ("dict content:\n ", totalCfgItem)
firstEp
=
keyDict
[
"h"
]
+
':'
+
keyDict
[
'P'
]
if
(
totalCfgItem
[
"firstEp"
][
2
]
!=
firstEp
)
and
(
totalCfgItem
[
"firstEp"
][
0
]
!=
'cfg_file'
):
tdLog
.
exit
(
"taos -C return firstEp error!"
)
if
(
totalCfgItem
[
"rpcDebugFlag"
][
2
]
!=
self
.
rpcDebugFlagVal
)
and
(
totalCfgItem
[
"rpcDebugFlag"
][
0
]
!=
'cfg_file'
):
tdLog
.
exit
(
"taos -C return rpcDebugFlag error!"
)
count
=
os
.
cpu_count
()
if
(
totalCfgItem
[
"numOfCores"
][
2
]
!=
count
)
and
(
totalCfgItem
[
"numOfCores"
][
0
]
!=
'default'
):
tdLog
.
exit
(
"taos -C return numOfCores error!"
)
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/fulltest.sh
浏览文件 @
d66fdb94
#!/bin/bash
set
-e
#python3 ./test.py -f 0-others/taosShell.py
#python3 ./test.py -f 2-query/between.py
#python3 ./test.py -f 2-query/distinct.py
python3 ./test.py
-f
2-query/varchar.py
...
...
tools/shell/inc/shellInt.h
浏览文件 @
d66fdb94
...
...
@@ -111,6 +111,5 @@ void shellTestNetWork();
// shellMain.c
extern
SShellObj
shell
;
extern
void
taos_init
();
#endif
/*_TD_SHELL_INT_H_*/
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录