Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c0a16094
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c0a16094
编写于
12月 30, 2022
作者:
A
Alex Duan
浏览文件
操作
浏览文件
下载
差异文件
fix: merge from 3.0
上级
642cd078
2f9734d5
变更
23
显示空白变更内容
内联
并排
Showing
23 changed file
with
339 addition
and
78 deletion
+339
-78
include/libs/nodes/cmdnodes.h
include/libs/nodes/cmdnodes.h
+1
-0
include/os/osMemory.h
include/os/osMemory.h
+5
-1
include/os/osSystem.h
include/os/osSystem.h
+30
-3
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+8
-8
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+0
-1
source/libs/catalog/src/ctgRemote.c
source/libs/catalog/src/ctgRemote.c
+16
-6
source/libs/command/src/command.c
source/libs/command/src/command.c
+0
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+0
-1
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+7
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+5
-6
source/libs/parser/src/parAstParser.c
source/libs/parser/src/parAstParser.c
+6
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+146
-11
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+1
-1
source/libs/parser/test/parInitialCTest.cpp
source/libs/parser/test/parInitialCTest.cpp
+25
-8
source/libs/planner/test/planOtherTest.cpp
source/libs/planner/test/planOtherTest.cpp
+3
-3
source/libs/qcom/src/queryUtil.c
source/libs/qcom/src/queryUtil.c
+6
-7
source/libs/qworker/inc/qwInt.h
source/libs/qworker/inc/qwInt.h
+8
-8
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+0
-1
source/libs/scheduler/inc/schInt.h
source/libs/scheduler/inc/schInt.h
+8
-8
source/os/CMakeLists.txt
source/os/CMakeLists.txt
+1
-1
source/os/test/CMakeLists.txt
source/os/test/CMakeLists.txt
+24
-0
source/os/test/osTests.cpp
source/os/test/osTests.cpp
+38
-0
未找到文件。
include/libs/nodes/cmdnodes.h
浏览文件 @
c0a16094
...
@@ -407,6 +407,7 @@ typedef struct SCreateStreamStmt {
...
@@ -407,6 +407,7 @@ typedef struct SCreateStreamStmt {
SNode
*
pQuery
;
SNode
*
pQuery
;
SNodeList
*
pTags
;
SNodeList
*
pTags
;
SNode
*
pSubtable
;
SNode
*
pSubtable
;
SNodeList
*
pCols
;
}
SCreateStreamStmt
;
}
SCreateStreamStmt
;
typedef
struct
SDropStreamStmt
{
typedef
struct
SDropStreamStmt
{
...
...
include/os/osMemory.h
浏览文件 @
c0a16094
...
@@ -22,12 +22,16 @@ extern "C" {
...
@@ -22,12 +22,16 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following sectio
// When you want to use this feature, you should find or add the same function in the following sectio
#if !defined(WINDOWS)
#ifndef ALLOW_FORBID_FUNC
#ifndef ALLOW_FORBID_FUNC
#define malloc MALLOC_FUNC_TAOS_FORBID
#define malloc MALLOC_FUNC_TAOS_FORBID
#define calloc CALLOC_FUNC_TAOS_FORBID
#define calloc CALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID
#define realloc REALLOC_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID
#define free FREE_FUNC_TAOS_FORBID
#endif
#endif // ifndef ALLOW_FORBID_FUNC
#endif // if !defined(WINDOWS)
void
*
taosMemoryMalloc
(
int64_t
size
);
void
*
taosMemoryMalloc
(
int64_t
size
);
void
*
taosMemoryCalloc
(
int64_t
num
,
int64_t
size
);
void
*
taosMemoryCalloc
(
int64_t
num
,
int64_t
size
);
...
...
include/os/osSystem.h
浏览文件 @
c0a16094
...
@@ -62,10 +62,37 @@ void taosResetTerminalMode();
...
@@ -62,10 +62,37 @@ void taosResetTerminalMode();
taosMemoryFree(strings); \
taosMemoryFree(strings); \
}
}
#else
#else
#include <windows.h>
#include <dbghelp.h>
#define STACKSIZE 64
#define taosPrintTrace(flags, level, dflag) \
#define taosPrintTrace(flags, level, dflag) \
{ \
{ \
taosPrintLog(flags, level, dflag, \
unsigned int i; \
"backtrace not implemented on windows, so detailed stack information cannot be printed"); \
void* stack[STACKSIZE]; \
unsigned short frames; \
SYMBOL_INFO* symbol; \
HANDLE process; \
\
process = GetCurrentProcess(); \
\
SymInitialize(process, NULL, TRUE); \
\
frames = CaptureStackBackTrace(0, STACKSIZE, stack, NULL); \
symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1); \
if (symbol != NULL) { \
symbol->MaxNameLen = 255; \
symbol->SizeOfStruct = sizeof(SYMBOL_INFO); \
\
if (frames > 0) { \
taosPrintLog(flags, level, dflag, "obtained %d stack frames", frames); \
for (i = 0; i < frames; i++) { \
SymFromAddr(process, (DWORD64)(stack[i]), 0, symbol); \
taosPrintLog(flags, level, dflag, "frame:%i: %s - 0x%0X", frames - i - 1, symbol->Name, symbol->Address); \
} \
} \
free(symbol); \
} \
}
}
#endif
#endif
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
c0a16094
...
@@ -582,34 +582,34 @@ typedef struct SCtgOperation {
...
@@ -582,34 +582,34 @@ typedef struct SCtgOperation {
#define CTG_LOCK(type, _lock) \
#define CTG_LOCK(type, _lock) \
do { \
do { \
if (CTG_READ == (type)) { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock");
\
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
taosRLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0);
\
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock");
\
} else { \
} else { \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock");
\
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
taosWLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock");
\
} \
} \
} while (0)
} while (0)
#define CTG_UNLOCK(type, _lock) \
#define CTG_UNLOCK(type, _lock) \
do { \
do { \
if (CTG_READ == (type)) { \
if (CTG_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0);
\
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock");
\
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
taosRUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock");
\
} else { \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock");
\
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
taosWUnLockLatch(_lock); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock");
\
} \
} \
} while (0)
} while (0)
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
c0a16094
...
@@ -22,7 +22,6 @@ extern SCatalogMgmt gCtgMgmt;
...
@@ -22,7 +22,6 @@ extern SCatalogMgmt gCtgMgmt;
SCtgDebug
gCTGDebug
=
{
0
};
SCtgDebug
gCTGDebug
=
{
0
};
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
void
ctgdUserCallback
(
SMetaData
*
pResult
,
void
*
param
,
int32_t
code
)
{
ASSERT
(
*
(
int32_t
*
)
param
==
1
);
taosMemoryFree
(
param
);
taosMemoryFree
(
param
);
qDebug
(
"async call result: %s"
,
tstrerror
(
code
));
qDebug
(
"async call result: %s"
,
tstrerror
(
code
));
...
...
source/libs/catalog/src/ctgRemote.c
浏览文件 @
c0a16094
...
@@ -40,7 +40,9 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
...
@@ -40,7 +40,9 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
msgNum
=
taosArrayGetSize
(
batchRsp
.
pRsps
);
msgNum
=
taosArrayGetSize
(
batchRsp
.
pRsps
);
}
}
ASSERT
(
taskNum
==
msgNum
||
0
==
msgNum
);
if
(
ASSERTS
(
taskNum
==
msgNum
||
0
==
msgNum
,
"taskNum %d mis-match msgNum %d"
,
taskNum
,
msgNum
))
{
msgNum
=
0
;
}
ctgDebug
(
"QID:0x%"
PRIx64
" ctg got batch %d rsp %s"
,
pJob
->
queryId
,
cbParam
->
batchId
,
ctgDebug
(
"QID:0x%"
PRIx64
" ctg got batch %d rsp %s"
,
pJob
->
queryId
,
cbParam
->
batchId
,
TMSG_INFO
(
cbParam
->
reqType
+
1
));
TMSG_INFO
(
cbParam
->
reqType
+
1
));
...
@@ -58,11 +60,19 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
...
@@ -58,11 +60,19 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
if
(
msgNum
>
0
)
{
if
(
msgNum
>
0
)
{
pRsp
=
taosArrayGet
(
batchRsp
.
pRsps
,
i
);
pRsp
=
taosArrayGet
(
batchRsp
.
pRsps
,
i
);
if
(
ASSERTS
(
pRsp
->
msgIdx
==
*
msgIdx
,
"rsp msgIdx %d mis-match msgIdx %d"
,
pRsp
->
msgIdx
,
*
msgIdx
))
{
pRsp
=
&
rsp
;
pRsp
->
msgIdx
=
*
msgIdx
;
pRsp
->
reqType
=
-
1
;
pRsp
->
rspCode
=
0
;
taskMsg
.
msgType
=
-
1
;
taskMsg
.
pData
=
NULL
;
taskMsg
.
len
=
0
;
}
else
{
taskMsg
.
msgType
=
pRsp
->
reqType
;
taskMsg
.
msgType
=
pRsp
->
reqType
;
taskMsg
.
pData
=
pRsp
->
msg
;
taskMsg
.
pData
=
pRsp
->
msg
;
taskMsg
.
len
=
pRsp
->
msgLen
;
taskMsg
.
len
=
pRsp
->
msgLen
;
}
ASSERT
(
pRsp
->
msgIdx
==
*
msgIdx
);
}
else
{
}
else
{
pRsp
=
&
rsp
;
pRsp
=
&
rsp
;
pRsp
->
msgIdx
=
*
msgIdx
;
pRsp
->
msgIdx
=
*
msgIdx
;
...
...
source/libs/command/src/command.c
浏览文件 @
c0a16094
...
@@ -41,7 +41,6 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
...
@@ -41,7 +41,6 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(
*
pRsp
)
->
numOfCols
=
htonl
(
numOfCols
);
(
*
pRsp
)
->
numOfCols
=
htonl
(
numOfCols
);
int32_t
len
=
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
numOfCols
);
int32_t
len
=
blockEncode
(
pBlock
,
(
*
pRsp
)
->
data
,
numOfCols
);
ASSERT
(
len
==
rspSize
-
sizeof
(
SRetrieveTableRsp
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
source/libs/command/src/explain.c
浏览文件 @
c0a16094
...
@@ -1666,7 +1666,6 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
...
@@ -1666,7 +1666,6 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp
->
numOfRows
=
htobe64
((
int64_t
)
rowNum
);
rsp
->
numOfRows
=
htobe64
((
int64_t
)
rowNum
);
int32_t
len
=
blockEncode
(
pBlock
,
rsp
->
data
,
taosArrayGetSize
(
pBlock
->
pDataBlock
));
int32_t
len
=
blockEncode
(
pBlock
,
rsp
->
data
,
taosArrayGetSize
(
pBlock
->
pDataBlock
));
ASSERT
(
len
==
rspSize
-
sizeof
(
SRetrieveTableRsp
));
rsp
->
compLen
=
htonl
(
len
);
rsp
->
compLen
=
htonl
(
len
);
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
c0a16094
...
@@ -217,7 +217,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
...
@@ -217,7 +217,7 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
SNode
*
createDropFunctionStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pFuncName
);
SNode
*
createDropFunctionStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pFuncName
);
SNode
*
createStreamOptions
(
SAstCreateContext
*
pCxt
);
SNode
*
createStreamOptions
(
SAstCreateContext
*
pCxt
);
SNode
*
createCreateStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pStreamName
,
SNode
*
pRealTable
,
SNode
*
createCreateStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pStreamName
,
SNode
*
pRealTable
,
SNode
*
pOptions
,
SNodeList
*
pTags
,
SNode
*
pSubtable
,
SNode
*
pQuery
);
SNode
*
pOptions
,
SNodeList
*
pTags
,
SNode
*
pSubtable
,
SNode
*
pQuery
,
SNodeList
*
pCols
);
SNode
*
createDropStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pStreamName
);
SNode
*
createDropStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreNotExists
,
const
SToken
*
pStreamName
);
SNode
*
createKillStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pId
);
SNode
*
createKillStmt
(
SAstCreateContext
*
pCxt
,
ENodeType
type
,
const
SToken
*
pId
);
SNode
*
createKillQueryStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pQueryId
);
SNode
*
createKillQueryStmt
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pQueryId
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
c0a16094
...
@@ -544,9 +544,15 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
...
@@ -544,9 +544,15 @@ bufsize_opt(A) ::= BUFSIZE NK_INTEGER(B).
/************************************************ create/drop stream **************************************************/
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) tags_def_opt(F) subtable_opt(G) AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D); }
full_table_name(C) col_list_opt(H) tags_def_opt(F) subtable_opt(G)
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
%type col_list_opt { SNodeList* }
%destructor col_list_opt { nodesDestroyList($$); }
col_list_opt(A) ::= . { A = NULL; }
col_list_opt(A) ::= NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
c0a16094
...
@@ -1777,21 +1777,20 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
...
@@ -1777,21 +1777,20 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
}
}
SNode
*
createCreateStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pStreamName
,
SNode
*
pRealTable
,
SNode
*
createCreateStreamStmt
(
SAstCreateContext
*
pCxt
,
bool
ignoreExists
,
const
SToken
*
pStreamName
,
SNode
*
pRealTable
,
SNode
*
pOptions
,
SNodeList
*
pTags
,
SNode
*
pSubtable
,
SNode
*
pQuery
)
{
SNode
*
pOptions
,
SNodeList
*
pTags
,
SNode
*
pSubtable
,
SNode
*
pQuery
,
SNodeList
*
pCols
)
{
CHECK_PARSER_STATUS
(
pCxt
);
CHECK_PARSER_STATUS
(
pCxt
);
SCreateStreamStmt
*
pStmt
=
(
SCreateStreamStmt
*
)
nodesMakeNode
(
QUERY_NODE_CREATE_STREAM_STMT
);
SCreateStreamStmt
*
pStmt
=
(
SCreateStreamStmt
*
)
nodesMakeNode
(
QUERY_NODE_CREATE_STREAM_STMT
);
CHECK_OUT_OF_MEM
(
pStmt
);
CHECK_OUT_OF_MEM
(
pStmt
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
streamName
,
pStreamName
);
COPY_STRING_FORM_ID_TOKEN
(
pStmt
->
streamName
,
pStreamName
);
if
(
NULL
!=
pRealTable
)
{
strcpy
(
pStmt
->
targetDbName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
dbName
);
strcpy
(
pStmt
->
targetDbName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
dbName
);
strcpy
(
pStmt
->
targetTabName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
tableName
);
strcpy
(
pStmt
->
targetTabName
,
((
SRealTableNode
*
)
pRealTable
)
->
table
.
tableName
);
nodesDestroyNode
(
pRealTable
);
nodesDestroyNode
(
pRealTable
);
}
pStmt
->
ignoreExists
=
ignoreExists
;
pStmt
->
ignoreExists
=
ignoreExists
;
pStmt
->
pOptions
=
(
SStreamOptions
*
)
pOptions
;
pStmt
->
pOptions
=
(
SStreamOptions
*
)
pOptions
;
pStmt
->
pQuery
=
pQuery
;
pStmt
->
pQuery
=
pQuery
;
pStmt
->
pTags
=
pTags
;
pStmt
->
pTags
=
pTags
;
pStmt
->
pSubtable
=
pSubtable
;
pStmt
->
pSubtable
=
pSubtable
;
pStmt
->
pCols
=
pCols
;
return
(
SNode
*
)
pStmt
;
return
(
SNode
*
)
pStmt
;
}
}
...
...
source/libs/parser/src/parAstParser.c
浏览文件 @
c0a16094
...
@@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
...
@@ -355,7 +355,12 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
}
}
static
int32_t
collectMetaKeyFromCreateStream
(
SCollectMetaKeyCxt
*
pCxt
,
SCreateStreamStmt
*
pStmt
)
{
static
int32_t
collectMetaKeyFromCreateStream
(
SCollectMetaKeyCxt
*
pCxt
,
SCreateStreamStmt
*
pStmt
)
{
return
collectMetaKeyFromQuery
(
pCxt
,
pStmt
->
pQuery
);
int32_t
code
=
reserveTableMetaInCache
(
pCxt
->
pParseCxt
->
acctId
,
pStmt
->
targetDbName
,
pStmt
->
targetTabName
,
pCxt
->
pMetaCache
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
collectMetaKeyFromQuery
(
pCxt
,
pStmt
->
pQuery
);
}
return
code
;
}
}
static
int32_t
collectMetaKeyFromShowDnodes
(
SCollectMetaKeyCxt
*
pCxt
,
SShowStmt
*
pStmt
)
{
static
int32_t
collectMetaKeyFromShowDnodes
(
SCollectMetaKeyCxt
*
pCxt
,
SShowStmt
*
pStmt
)
{
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
c0a16094
...
@@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
...
@@ -352,7 +352,7 @@ static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STa
code
=
catalogGetTableMeta
(
pParCxt
->
pCatalog
,
&
conn
,
pName
,
pMeta
);
code
=
catalogGetTableMeta
(
pParCxt
->
pCatalog
,
&
conn
,
pName
,
pMeta
);
}
}
}
}
if
(
TSDB_CODE_SUCCESS
!=
code
&&
TSDB_CODE_
TSC_INVALID_TABLE_NAME
!=
code
)
{
if
(
TSDB_CODE_SUCCESS
!=
code
&&
TSDB_CODE_
PAR_TABLE_NOT_EXIST
!=
code
)
{
parserError
(
"0x%"
PRIx64
" catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s"
,
pCxt
->
pParseCxt
->
requestId
,
parserError
(
"0x%"
PRIx64
" catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s"
,
pCxt
->
pParseCxt
->
requestId
,
tstrerror
(
code
),
pName
->
dbname
,
pName
->
tname
);
tstrerror
(
code
),
pName
->
dbname
,
pName
->
tname
);
}
}
...
@@ -4979,10 +4979,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
...
@@ -4979,10 +4979,10 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
SSchema
*
getColSchema
(
STableMeta
*
pTableMeta
,
const
char
*
pColName
)
{
static
const
SSchema
*
getColSchema
(
const
STableMeta
*
pTableMeta
,
const
char
*
pColName
)
{
int32_t
numOfFields
=
getNumOfTags
(
pTableMeta
)
+
getNumOfColumns
(
pTableMeta
);
int32_t
numOfFields
=
getNumOfTags
(
pTableMeta
)
+
getNumOfColumns
(
pTableMeta
);
for
(
int32_t
i
=
0
;
i
<
numOfFields
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfFields
;
++
i
)
{
SSchema
*
pSchema
=
pTableMeta
->
schema
+
i
;
const
SSchema
*
pSchema
=
pTableMeta
->
schema
+
i
;
if
(
0
==
strcmp
(
pColName
,
pSchema
->
name
))
{
if
(
0
==
strcmp
(
pColName
,
pSchema
->
name
))
{
return
pSchema
;
return
pSchema
;
}
}
...
@@ -5002,7 +5002,8 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
...
@@ -5002,7 +5002,8 @@ static SSchema* getTagSchema(STableMeta* pTableMeta, const char* pTagName) {
return
NULL
;
return
NULL
;
}
}
static
int32_t
checkAlterSuperTableBySchema
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
)
{
static
int32_t
checkAlterSuperTableBySchema
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
const
STableMeta
*
pTableMeta
)
{
SSchema
*
pTagsSchema
=
getTableTagSchema
(
pTableMeta
);
SSchema
*
pTagsSchema
=
getTableTagSchema
(
pTableMeta
);
if
(
getNumOfTags
(
pTableMeta
)
==
1
&&
pTagsSchema
->
type
==
TSDB_DATA_TYPE_JSON
&&
if
(
getNumOfTags
(
pTableMeta
)
==
1
&&
pTagsSchema
->
type
==
TSDB_DATA_TYPE_JSON
&&
(
pStmt
->
alterType
==
TSDB_ALTER_TABLE_ADD_TAG
||
pStmt
->
alterType
==
TSDB_ALTER_TABLE_DROP_TAG
||
(
pStmt
->
alterType
==
TSDB_ALTER_TABLE_ADD_TAG
||
pStmt
->
alterType
==
TSDB_ALTER_TABLE_DROP_TAG
||
...
@@ -5021,7 +5022,7 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
...
@@ -5021,7 +5022,7 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ALTER_TABLE
,
"Table is not super table"
);
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ALTER_TABLE
,
"Table is not super table"
);
}
}
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
const
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
if
(
NULL
==
pSchema
)
{
if
(
NULL
==
pSchema
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
}
else
if
(
!
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
||
pSchema
->
type
!=
pStmt
->
dataType
.
type
||
}
else
if
(
!
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
||
pSchema
->
type
!=
pStmt
->
dataType
.
type
||
...
@@ -5721,6 +5722,139 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
...
@@ -5721,6 +5722,139 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
adjustDataTypeOfProjections
(
STranslateContext
*
pCxt
,
const
STableMeta
*
pMeta
,
SNodeList
*
pProjections
)
{
if
(
getNumOfColumns
(
pMeta
)
!=
LIST_LENGTH
(
pProjections
))
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
,
"Illegal number of columns"
);
}
SSchema
*
pSchemas
=
getTableColumnSchema
(
pMeta
);
int32_t
index
=
0
;
SNode
*
pProj
=
NULL
;
FOREACH
(
pProj
,
pProjections
)
{
SSchema
*
pSchema
=
pSchemas
+
index
;
SDataType
dt
=
{.
type
=
pSchema
->
type
,
.
bytes
=
pSchema
->
bytes
};
if
(
!
dataTypeEqual
(
&
dt
,
&
((
SExprNode
*
)
pProj
)
->
resType
))
{
SNode
*
pFunc
=
NULL
;
int32_t
code
=
createCastFunc
(
pCxt
,
pProj
,
dt
,
&
pFunc
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
}
REPLACE_NODE
(
pFunc
);
}
}
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SProjColPos
{
int32_t
colId
;
SNode
*
pProj
;
}
SProjColPos
;
static
int32_t
projColPosCompar
(
const
void
*
l
,
const
void
*
r
)
{
return
((
SProjColPos
*
)
l
)
->
colId
<
((
SProjColPos
*
)
r
)
->
colId
;
}
static
void
projColPosDelete
(
void
*
p
)
{
taosMemoryFree
(((
SProjColPos
*
)
p
)
->
pProj
);
}
static
int32_t
addProjToProjColPos
(
STranslateContext
*
pCxt
,
const
SSchema
*
pSchema
,
SNode
*
pProj
,
SArray
*
pProjColPos
)
{
SNode
*
pNewProj
=
nodesCloneNode
(
pProj
);
if
(
NULL
==
pNewProj
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SDataType
dt
=
{.
type
=
pSchema
->
type
,
.
bytes
=
pSchema
->
bytes
};
if
(
!
dataTypeEqual
(
&
dt
,
&
((
SExprNode
*
)
pNewProj
)
->
resType
))
{
SNode
*
pFunc
=
NULL
;
code
=
createCastFunc
(
pCxt
,
pNewProj
,
dt
,
&
pFunc
);
pNewProj
=
pFunc
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SProjColPos
pos
=
{.
colId
=
pSchema
->
colId
,
.
pProj
=
pNewProj
};
code
=
(
NULL
==
taosArrayPush
(
pProjColPos
,
&
pos
)
?
TSDB_CODE_OUT_OF_MEMORY
:
TSDB_CODE_SUCCESS
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pNewProj
);
}
return
code
;
}
static
int32_t
adjustOrderOfProjection
(
STranslateContext
*
pCxt
,
SNodeList
*
pCols
,
const
STableMeta
*
pMeta
,
SNodeList
**
pProjections
)
{
if
(
LIST_LENGTH
(
pCols
)
!=
LIST_LENGTH
(
*
pProjections
))
{
return
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
,
"Illegal number of columns"
);
}
SArray
*
pProjColPos
=
taosArrayInit
(
LIST_LENGTH
(
pCols
),
sizeof
(
SProjColPos
));
if
(
NULL
==
pProjColPos
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pCol
=
NULL
;
SNode
*
pProj
=
NULL
;
FORBOTH
(
pCol
,
pCols
,
pProj
,
*
pProjections
)
{
const
SSchema
*
pSchema
=
getColSchema
(
pMeta
,
((
SColumnNode
*
)
pCol
)
->
colName
);
code
=
addProjToProjColPos
(
pCxt
,
pSchema
,
pProj
,
pProjColPos
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
SNodeList
*
pNewProjections
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
taosArraySort
(
pProjColPos
,
projColPosCompar
);
int32_t
num
=
taosArrayGetSize
(
pProjColPos
);
pNewProjections
=
nodesMakeList
();
if
(
NULL
==
pNewProjections
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
for
(
int32_t
i
=
0
;
TSDB_CODE_SUCCESS
==
code
&&
i
<
num
;
++
i
)
{
SProjColPos
*
pPos
=
taosArrayGet
(
pProjColPos
,
i
);
code
=
nodesListStrictAppend
(
pNewProjections
,
pPos
->
pProj
);
pPos
->
pProj
=
NULL
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
taosArrayDestroy
(
pProjColPos
);
nodesDestroyList
(
*
pProjections
);
*
pProjections
=
pNewProjections
;
}
else
{
taosArrayDestroyEx
(
pProjColPos
,
projColPosDelete
);
nodesDestroyList
(
pNewProjections
);
}
return
code
;
}
static
int32_t
adjustStreamQueryForExistTableImpl
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
,
const
STableMeta
*
pMeta
)
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pStmt
->
pQuery
;
if
(
NULL
==
pStmt
->
pCols
)
{
return
adjustDataTypeOfProjections
(
pCxt
,
pMeta
,
pSelect
->
pProjectionList
);
}
return
adjustOrderOfProjection
(
pCxt
,
pStmt
->
pCols
,
pMeta
,
&
pSelect
->
pProjectionList
);
}
static
int32_t
adjustStreamQueryForExistTable
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
,
SCMCreateStreamReq
*
pReq
)
{
STableMeta
*
pMeta
=
NULL
;
int32_t
code
=
getTableMeta
(
pCxt
,
pStmt
->
targetDbName
,
pStmt
->
targetTabName
,
&
pMeta
);
if
(
TSDB_CODE_PAR_TABLE_NOT_EXIST
==
code
)
{
if
(
NULL
!=
pStmt
->
pCols
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_TABLE_NOT_EXIST
,
pStmt
->
targetTabName
);
}
pReq
->
createStb
=
STREAM_CREATE_STABLE_TRUE
;
return
TSDB_CODE_SUCCESS
;
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
adjustStreamQueryForExistTableImpl
(
pCxt
,
pStmt
,
pMeta
);
}
return
code
;
}
static
int32_t
buildCreateStreamQuery
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
,
SCMCreateStreamReq
*
pReq
)
{
static
int32_t
buildCreateStreamQuery
(
STranslateContext
*
pCxt
,
SCreateStreamStmt
*
pStmt
,
SCMCreateStreamReq
*
pReq
)
{
pCxt
->
createStream
=
true
;
pCxt
->
createStream
=
true
;
int32_t
code
=
addWstartTsToCreateStreamQuery
(
pCxt
,
pStmt
->
pQuery
);
int32_t
code
=
addWstartTsToCreateStreamQuery
(
pCxt
,
pStmt
->
pQuery
);
...
@@ -5733,6 +5867,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
...
@@ -5733,6 +5867,9 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkStreamQuery
(
pCxt
,
pStmt
);
code
=
checkStreamQuery
(
pCxt
,
pStmt
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
adjustStreamQueryForExistTable
(
pCxt
,
pStmt
,
pReq
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
getSourceDatabase
(
pStmt
->
pQuery
,
pCxt
->
pParseCxt
->
acctId
,
pReq
->
sourceDB
);
getSourceDatabase
(
pStmt
->
pQuery
,
pCxt
->
pParseCxt
->
acctId
,
pReq
->
sourceDB
);
code
=
nodesNodeToString
(
pStmt
->
pQuery
,
false
,
&
pReq
->
ast
,
NULL
);
code
=
nodesNodeToString
(
pStmt
->
pQuery
,
false
,
&
pReq
->
ast
,
NULL
);
...
@@ -5771,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
...
@@ -5771,8 +5908,6 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq
->
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
pReq
->
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
}
}
pReq
->
createStb
=
STREAM_CREATE_STABLE_TRUE
;
return
code
;
return
code
;
}
}
...
@@ -7375,12 +7510,12 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
...
@@ -7375,12 +7510,12 @@ static int32_t buildAddColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt, S
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
buildDropColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
,
static
int32_t
buildDropColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
const
STableMeta
*
pTableMeta
,
SVAlterTbReq
*
pReq
)
{
SVAlterTbReq
*
pReq
)
{
if
(
2
==
getNumOfColumns
(
pTableMeta
))
{
if
(
2
==
getNumOfColumns
(
pTableMeta
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_DROP_COL
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_DROP_COL
);
}
}
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
const
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
if
(
NULL
==
pSchema
)
{
if
(
NULL
==
pSchema
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
}
else
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pSchema
->
colId
)
{
}
else
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
pSchema
->
colId
)
{
...
@@ -7396,11 +7531,11 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
...
@@ -7396,11 +7531,11 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
buildUpdateColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
STableMeta
*
pTableMeta
,
static
int32_t
buildUpdateColReq
(
STranslateContext
*
pCxt
,
SAlterTableStmt
*
pStmt
,
const
STableMeta
*
pTableMeta
,
SVAlterTbReq
*
pReq
)
{
SVAlterTbReq
*
pReq
)
{
pReq
->
colModBytes
=
calcTypeBytes
(
pStmt
->
dataType
);
pReq
->
colModBytes
=
calcTypeBytes
(
pStmt
->
dataType
);
pReq
->
colModType
=
pStmt
->
dataType
.
type
;
pReq
->
colModType
=
pStmt
->
dataType
.
type
;
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
const
SSchema
*
pSchema
=
getColSchema
(
pTableMeta
,
pStmt
->
colName
);
if
(
NULL
==
pSchema
)
{
if
(
NULL
==
pSchema
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_COLUMN
,
pStmt
->
colName
);
}
else
if
(
!
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
||
pSchema
->
type
!=
pStmt
->
dataType
.
type
||
}
else
if
(
!
IS_VAR_DATA_TYPE
(
pSchema
->
type
)
||
pSchema
->
type
!=
pStmt
->
dataType
.
type
||
...
...
source/libs/parser/test/mockCatalogService.cpp
浏览文件 @
c0a16094
...
@@ -427,7 +427,7 @@ class MockCatalogServiceImpl {
...
@@ -427,7 +427,7 @@ class MockCatalogServiceImpl {
int32_t
copyTableSchemaMeta
(
const
string
&
db
,
const
string
&
tbname
,
std
::
unique_ptr
<
STableMeta
>*
dst
)
const
{
int32_t
copyTableSchemaMeta
(
const
string
&
db
,
const
string
&
tbname
,
std
::
unique_ptr
<
STableMeta
>*
dst
)
const
{
STableMeta
*
src
=
getTableSchemaMeta
(
db
,
tbname
);
STableMeta
*
src
=
getTableSchemaMeta
(
db
,
tbname
);
if
(
nullptr
==
src
)
{
if
(
nullptr
==
src
)
{
return
TSDB_CODE_
TSC_INVALID_TABLE_NAME
;
return
TSDB_CODE_
PAR_TABLE_NOT_EXIST
;
}
}
int32_t
len
=
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
(
src
->
tableInfo
.
numOfTags
+
src
->
tableInfo
.
numOfColumns
);
int32_t
len
=
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
(
src
->
tableInfo
.
numOfTags
+
src
->
tableInfo
.
numOfColumns
);
dst
->
reset
((
STableMeta
*
)
taosMemoryCalloc
(
1
,
len
));
dst
->
reset
((
STableMeta
*
)
taosMemoryCalloc
(
1
,
len
));
...
...
source/libs/parser/test/parInitialCTest.cpp
浏览文件 @
c0a16094
...
@@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) {
...
@@ -755,14 +755,23 @@ TEST_F(ParserInitialCTest, createStream) {
};
};
auto
setCreateStreamReq
=
[
&
](
const
char
*
pStream
,
const
char
*
pSrcDb
,
const
char
*
pSql
,
const
char
*
pDstStb
,
auto
setCreateStreamReq
=
[
&
](
const
char
*
pStream
,
const
char
*
pSrcDb
,
const
char
*
pSql
,
const
char
*
pDstStb
,
int8_t
igExists
=
0
,
int8_t
triggerType
=
STREAM_TRIGGER_AT_ONCE
,
int64_t
maxDelay
=
0
,
int8_t
createStb
=
STREAM_CREATE_STABLE_TRUE
,
int8_t
igExists
=
0
)
{
int64_t
watermark
=
0
,
int8_t
igExpired
=
STREAM_DEFAULT_IGNORE_EXPIRED
,
int8_t
fillHistory
=
STREAM_DEFAULT_FILL_HISTORY
)
{
snprintf
(
expect
.
name
,
sizeof
(
expect
.
name
),
"0.%s"
,
pStream
);
snprintf
(
expect
.
name
,
sizeof
(
expect
.
name
),
"0.%s"
,
pStream
);
snprintf
(
expect
.
sourceDB
,
sizeof
(
expect
.
sourceDB
),
"0.%s"
,
pSrcDb
);
snprintf
(
expect
.
sourceDB
,
sizeof
(
expect
.
sourceDB
),
"0.%s"
,
pSrcDb
);
snprintf
(
expect
.
targetStbFullName
,
sizeof
(
expect
.
targetStbFullName
),
"0.test.%s"
,
pDstStb
);
snprintf
(
expect
.
targetStbFullName
,
sizeof
(
expect
.
targetStbFullName
),
"0.test.%s"
,
pDstStb
);
expect
.
igExists
=
igExists
;
expect
.
igExists
=
igExists
;
expect
.
sql
=
strdup
(
pSql
);
expect
.
sql
=
strdup
(
pSql
);
expect
.
createStb
=
createStb
;
expect
.
triggerType
=
STREAM_TRIGGER_AT_ONCE
;
expect
.
maxDelay
=
0
;
expect
.
watermark
=
0
;
expect
.
fillHistory
=
STREAM_DEFAULT_FILL_HISTORY
;
expect
.
igExpired
=
STREAM_DEFAULT_IGNORE_EXPIRED
;
};
auto
setStreamOptions
=
[
&
](
int8_t
triggerType
=
STREAM_TRIGGER_AT_ONCE
,
int64_t
maxDelay
=
0
,
int64_t
watermark
=
0
,
int8_t
igExpired
=
STREAM_DEFAULT_IGNORE_EXPIRED
,
int8_t
fillHistory
=
STREAM_DEFAULT_FILL_HISTORY
)
{
expect
.
triggerType
=
triggerType
;
expect
.
triggerType
=
triggerType
;
expect
.
maxDelay
=
maxDelay
;
expect
.
maxDelay
=
maxDelay
;
expect
.
watermark
=
watermark
;
expect
.
watermark
=
watermark
;
...
@@ -813,19 +822,22 @@ TEST_F(ParserInitialCTest, createStream) {
...
@@ -813,19 +822,22 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ
(
pField
->
flags
,
pExpectField
->
flags
);
ASSERT_EQ
(
pField
->
flags
,
pExpectField
->
flags
);
}
}
}
}
ASSERT_EQ
(
req
.
checkpointFreq
,
expect
.
checkpointFreq
);
ASSERT_EQ
(
req
.
createStb
,
expect
.
createStb
);
tFreeSCMCreateStreamReq
(
&
req
);
tFreeSCMCreateStreamReq
(
&
req
);
});
});
setCreateStreamReq
(
"s1"
,
"test"
,
"create stream s1 into st
1 as select count(*) from t1 interval(10s)"
,
"st1
"
);
setCreateStreamReq
(
"s1"
,
"test"
,
"create stream s1 into st
3 as select count(*) from t1 interval(10s)"
,
"st3
"
);
run
(
"CREATE STREAM s1 INTO st
1
AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"
);
run
(
"CREATE STREAM s1 INTO st
3
AS SELECT COUNT(*) FROM t1 INTERVAL(10S)"
);
clearCreateStreamReq
();
clearCreateStreamReq
();
setCreateStreamReq
(
setCreateStreamReq
(
"s1"
,
"test"
,
"s1"
,
"test"
,
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st
1
"
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st
3
"
"as select count(*) from t1 interval(10s)"
,
"as select count(*) from t1 interval(10s)"
,
"st1"
,
1
,
STREAM_TRIGGER_MAX_DELAY
,
20
*
MILLISECOND_PER_SECOND
,
10
*
MILLISECOND_PER_SECOND
,
0
,
1
);
"st3"
,
1
,
1
);
run
(
"CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS "
setStreamOptions
(
STREAM_TRIGGER_MAX_DELAY
,
20
*
MILLISECOND_PER_SECOND
,
10
*
MILLISECOND_PER_SECOND
,
0
,
1
);
run
(
"CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st3 AS "
"SELECT COUNT(*) "
"SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)"
);
"FROM t1 INTERVAL(10S)"
);
clearCreateStreamReq
();
clearCreateStreamReq
();
...
@@ -839,6 +851,11 @@ TEST_F(ParserInitialCTest, createStream) {
...
@@ -839,6 +851,11 @@ TEST_F(ParserInitialCTest, createStream) {
run
(
"CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
run
(
"CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"
);
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 PARTITION BY TBNAME tname, tag1 id INTERVAL(10S)"
);
clearCreateStreamReq
();
clearCreateStreamReq
();
setCreateStreamReq
(
"s1"
,
"test"
,
"create stream s1 into st1 as select max(c1), c2 from t1 interval(10s)"
,
"st1"
,
STREAM_CREATE_STABLE_FALSE
);
run
(
"CREATE STREAM s1 INTO st1 AS SELECT MAX(c1), c2 FROM t1 INTERVAL(10S)"
);
clearCreateStreamReq
();
}
}
TEST_F
(
ParserInitialCTest
,
createStreamSemanticCheck
)
{
TEST_F
(
ParserInitialCTest
,
createStreamSemanticCheck
)
{
...
...
source/libs/planner/test/planOtherTest.cpp
浏览文件 @
c0a16094
...
@@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) {
...
@@ -30,7 +30,7 @@ TEST_F(PlanOtherTest, createTopic) {
TEST_F
(
PlanOtherTest
,
createStream
)
{
TEST_F
(
PlanOtherTest
,
createStream
)
{
useDb
(
"root"
,
"test"
);
useDb
(
"root"
,
"test"
);
run
(
"create stream if not exists s1 trigger window_close watermark 10s into st
1
as select count(*) from t1 "
run
(
"create stream if not exists s1 trigger window_close watermark 10s into st
3
as select count(*) from t1 "
"interval(10s)"
);
"interval(10s)"
);
run
(
"CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
run
(
"CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
...
@@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) {
...
@@ -43,9 +43,9 @@ TEST_F(PlanOtherTest, createStream) {
TEST_F
(
PlanOtherTest
,
createStreamUseSTable
)
{
TEST_F
(
PlanOtherTest
,
createStreamUseSTable
)
{
useDb
(
"root"
,
"test"
);
useDb
(
"root"
,
"test"
);
run
(
"CREATE STREAM IF NOT EXISTS s1 into st
1
as SELECT COUNT(*) FROM st1 INTERVAL(10s)"
);
run
(
"CREATE STREAM IF NOT EXISTS s1 into st
3
as SELECT COUNT(*) FROM st1 INTERVAL(10s)"
);
run
(
"CREATE STREAM IF NOT EXISTS s1 into st
1
as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"
);
run
(
"CREATE STREAM IF NOT EXISTS s1 into st
3
as SELECT COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)"
);
}
}
TEST_F
(
PlanOtherTest
,
createSmaIndex
)
{
TEST_F
(
PlanOtherTest
,
createSmaIndex
)
{
...
...
source/libs/qcom/src/queryUtil.c
浏览文件 @
c0a16094
...
@@ -116,8 +116,6 @@ int32_t cleanupTaskQueue() {
...
@@ -116,8 +116,6 @@ int32_t cleanupTaskQueue() {
}
}
static
void
execHelper
(
struct
SSchedMsg
*
pSchedMsg
)
{
static
void
execHelper
(
struct
SSchedMsg
*
pSchedMsg
)
{
assert
(
pSchedMsg
!=
NULL
&&
pSchedMsg
->
ahandle
!=
NULL
);
__async_exec_fn_t
execFn
=
(
__async_exec_fn_t
)
pSchedMsg
->
ahandle
;
__async_exec_fn_t
execFn
=
(
__async_exec_fn_t
)
pSchedMsg
->
ahandle
;
int32_t
code
=
execFn
(
pSchedMsg
->
thandle
);
int32_t
code
=
execFn
(
pSchedMsg
->
thandle
);
if
(
code
!=
0
&&
pSchedMsg
->
msg
!=
NULL
)
{
if
(
code
!=
0
&&
pSchedMsg
->
msg
!=
NULL
)
{
...
@@ -126,8 +124,6 @@ static void execHelper(struct SSchedMsg* pSchedMsg) {
...
@@ -126,8 +124,6 @@ static void execHelper(struct SSchedMsg* pSchedMsg) {
}
}
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
)
{
int32_t
taosAsyncExec
(
__async_exec_fn_t
execFn
,
void
*
execParam
,
int32_t
*
code
)
{
assert
(
execFn
!=
NULL
);
SSchedMsg
schedMsg
=
{
0
};
SSchedMsg
schedMsg
=
{
0
};
schedMsg
.
fp
=
execHelper
;
schedMsg
.
fp
=
execHelper
;
schedMsg
.
ahandle
=
execFn
;
schedMsg
.
ahandle
=
execFn
;
...
@@ -138,7 +134,10 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
...
@@ -138,7 +134,10 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
}
}
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
)
{
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
)
{
assert
(
pMsgBody
!=
NULL
);
if
(
NULL
==
pMsgBody
)
{
return
;
}
taosMemoryFreeClear
(
pMsgBody
->
target
.
dbFName
);
taosMemoryFreeClear
(
pMsgBody
->
target
.
dbFName
);
taosMemoryFreeClear
(
pMsgBody
->
msgInfo
.
pData
);
taosMemoryFreeClear
(
pMsgBody
->
msgInfo
.
pData
);
if
(
pMsgBody
->
paramFreeFp
)
{
if
(
pMsgBody
->
paramFreeFp
)
{
...
@@ -394,7 +393,7 @@ char* parseTagDatatoJson(void* p) {
...
@@ -394,7 +393,7 @@ char* parseTagDatatoJson(void* p) {
}
else
if
(
pTagVal
->
nData
==
0
)
{
}
else
if
(
pTagVal
->
nData
==
0
)
{
value
=
cJSON_CreateString
(
""
);
value
=
cJSON_CreateString
(
""
);
}
else
{
}
else
{
ASSERT
(
0
)
;
goto
end
;
}
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
...
@@ -413,7 +412,7 @@ char* parseTagDatatoJson(void* p) {
...
@@ -413,7 +412,7 @@ char* parseTagDatatoJson(void* p) {
}
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
}
else
{
}
else
{
ASSERT
(
0
)
;
goto
end
;
}
}
}
}
string
=
cJSON_PrintUnformatted
(
json
);
string
=
cJSON_PrintUnformatted
(
json
);
...
...
source/libs/qworker/inc/qwInt.h
浏览文件 @
c0a16094
...
@@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
...
@@ -316,34 +316,34 @@ typedef struct SQWorkerMgmt {
#define QW_LOCK(type, _lock) \
#define QW_LOCK(type, _lock) \
do { \
do { \
if (QW_READ == (type)) { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock");
\
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
taosRLockLatch(_lock); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0);
\
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock");
\
} else { \
} else { \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock");
\
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
taosWLockLatch(_lock); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock");
\
} \
} \
} while (0)
} while (0)
#define QW_UNLOCK(type, _lock) \
#define QW_UNLOCK(type, _lock) \
do { \
do { \
if (QW_READ == (type)) { \
if (QW_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0);
\
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock");
\
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
taosRUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock");
\
} else { \
} else { \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock");
\
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
taosWUnLockLatch(_lock); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
QW_LOCK_DEBUG("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock");
\
} \
} \
} while (0)
} while (0)
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
c0a16094
...
@@ -147,7 +147,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
...
@@ -147,7 +147,6 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
size_t
numOfResBlock
=
taosArrayGetSize
(
pResList
);
size_t
numOfResBlock
=
taosArrayGetSize
(
pResList
);
for
(
int32_t
j
=
0
;
j
<
numOfResBlock
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfResBlock
;
++
j
)
{
SSDataBlock
*
pRes
=
taosArrayGetP
(
pResList
,
j
);
SSDataBlock
*
pRes
=
taosArrayGetP
(
pResList
,
j
);
ASSERT
(
pRes
->
info
.
rows
>
0
);
SInputData
inputData
=
{.
pData
=
pRes
};
SInputData
inputData
=
{.
pData
=
pRes
};
code
=
dsPutDataBlock
(
sinkHandle
,
&
inputData
,
&
qcontinue
);
code
=
dsPutDataBlock
(
sinkHandle
,
&
inputData
,
&
qcontinue
);
...
...
source/libs/scheduler/inc/schInt.h
浏览文件 @
c0a16094
...
@@ -478,34 +478,34 @@ extern SSchedulerMgmt schMgmt;
...
@@ -478,34 +478,34 @@ extern SSchedulerMgmt schMgmt;
#define SCH_LOCK(type, _lock) \
#define SCH_LOCK(type, _lock) \
do { \
do { \
if (SCH_READ == (type)) { \
if (SCH_READ == (type)) { \
assert(atomic_load_32(_lock) >= 0);
\
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock");
\
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32(_lock) > 0);
\
ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock");
\
} else { \
} else { \
assert(atomic_load_32(_lock) >= 0);
\
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock");
\
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock");
\
} \
} \
} while (0)
} while (0)
#define SCH_UNLOCK(type, _lock) \
#define SCH_UNLOCK(type, _lock) \
do { \
do { \
if (SCH_READ == (type)) { \
if (SCH_READ == (type)) { \
assert(atomic_load_32((_lock)) > 0);
\
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock");
\
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock");
\
} else { \
} else { \
assert(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY);
\
ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock");
\
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0);
\
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock");
\
} \
} \
} while (0)
} while (0)
...
...
source/os/CMakeLists.txt
浏览文件 @
c0a16094
...
@@ -41,7 +41,7 @@ target_link_libraries(
...
@@ -41,7 +41,7 @@ target_link_libraries(
)
)
if
(
TD_WINDOWS
)
if
(
TD_WINDOWS
)
target_link_libraries
(
target_link_libraries
(
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm crashdump
dbghelp
)
)
elseif
(
TD_DARWIN_64
)
elseif
(
TD_DARWIN_64
)
find_library
(
CORE_FOUNDATION_FRAMEWORK CoreFoundation
)
find_library
(
CORE_FOUNDATION_FRAMEWORK CoreFoundation
)
...
...
source/os/test/CMakeLists.txt
0 → 100644
浏览文件 @
c0a16094
CMAKE_MINIMUM_REQUIRED
(
VERSION 2.8...3.20
)
PROJECT
(
TDengine
)
FIND_PATH
(
HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest
)
FIND_LIBRARY
(
LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib /usr/lib64
)
FIND_LIBRARY
(
LIB_GTEST_SHARED_DIR libgtest.so /usr/lib/ /usr/local/lib /usr/lib64
)
IF
(
HEADER_GTEST_INCLUDE_DIR
AND
(
LIB_GTEST_STATIC_DIR OR LIB_GTEST_SHARED_DIR
))
MESSAGE
(
STATUS
"gTest library found, build os test"
)
INCLUDE_DIRECTORIES
(
${
HEADER_GTEST_INCLUDE_DIR
}
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ENDIF
()
INCLUDE_DIRECTORIES
(
${
TD_SOURCE_DIR
}
/src/util/inc
)
# osTests
add_executable
(
osTests
"osTests.cpp"
)
target_link_libraries
(
osTests os util gtest_main
)
add_test
(
NAME osTests
COMMAND osTests
)
\ No newline at end of file
source/os/test/osTests.cpp
浏览文件 @
c0a16094
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wsign-compare"
#pragma GCC diagnostic ignored "-Wformat"
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#pragma GCC diagnostic ignored "-Wpointer-arith"
#include "os.h"
TEST
(
osTest
,
osSystem
)
{
const
char
*
flags
=
"UTL FATAL "
;
ELogLevel
level
=
DEBUG_FATAL
;
int32_t
dflag
=
255
;
// tsLogEmbedded ? 255 : uDebugFlag
taosPrintTrace
(
flags
,
level
,
dflag
);
}
#pragma GCC diagnostic pop
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录