Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f9158b3e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f9158b3e
编写于
5月 19, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/qnode
上级
fb944e85
e0f53dfc
变更
40
展开全部
隐藏空白更改
内联
并排
Showing
40 changed file
with
1258 addition
and
1151 deletion
+1258
-1151
example/src/tmq.c
example/src/tmq.c
+2
-2
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+15
-15
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/common/src/trow.c
source/common/src/trow.c
+1
-1
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
+1
-1
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
+2
-3
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
+1
-1
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+1
-1
source/dnode/mgmt/mgmt_snode/src/smWorker.c
source/dnode/mgmt/mgmt_snode/src/smWorker.c
+1
-1
source/dnode/mgmt/node_mgmt/src/dmProc.c
source/dnode/mgmt/node_mgmt/src/dmProc.c
+1
-1
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+17
-40
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+11
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+4
-4
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+3
-5
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-0
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+1
-1
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+6
-3
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+18
-2
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-0
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+886
-880
source/libs/parser/test/parSelectTest.cpp
source/libs/parser/test/parSelectTest.cpp
+6
-0
source/libs/planner/inc/planInt.h
source/libs/planner/inc/planInt.h
+7
-6
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+4
-8
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+50
-26
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+71
-37
source/libs/planner/test/planJoinTest.cpp
source/libs/planner/test/planJoinTest.cpp
+8
-0
source/libs/planner/test/planSetOpTest.cpp
source/libs/planner/test/planSetOpTest.cpp
+24
-2
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+51
-53
source/libs/tdb/inc/tdb.h
source/libs/tdb/inc/tdb.h
+28
-28
source/libs/tdb/src/db/tdbDb.c
source/libs/tdb/src/db/tdbDb.c
+11
-11
source/libs/tdb/src/db/tdbTable.c
source/libs/tdb/src/db/tdbTable.c
+3
-3
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+2
-2
tests/script/tsim/tstream/basic1.sim
tests/script/tsim/tstream/basic1.sim
+8
-8
未找到文件。
example/src/tmq.c
浏览文件 @
f9158b3e
...
...
@@ -106,8 +106,8 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as abc1"
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
f9158b3e
...
...
@@ -248,6 +248,7 @@ typedef struct SSetOperator {
SNode
*
pRight
;
SNodeList
*
pOrderByList
;
// SOrderByExprNode
SNode
*
pLimit
;
char
stmtName
[
TSDB_TABLE_NAME_LEN
];
}
SSetOperator
;
typedef
enum
ESqlClause
{
...
...
include/libs/qcom/query.h
浏览文件 @
f9158b3e
...
...
@@ -220,23 +220,23 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
taosPrintLog("QRY ", DEBUG_INFO, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebug(...)
\
do {
\
if (qDebugFlag & DEBUG_DEBUG) {
\
taosPrintLog("QRY ", DEBUG_DEBUG,
tsLogEmbedded ? 255 :
qDebugFlag, __VA_ARGS__); \
}
\
#define qDebug(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qTrace(...)
\
do {
\
if (qDebugFlag & DEBUG_TRACE) {
\
taosPrintLog("QRY ", DEBUG_TRACE,
tsLogEmbedded ? 255 :
qDebugFlag, __VA_ARGS__); \
}
\
#define qTrace(...) \
do { \
if (qDebugFlag & DEBUG_TRACE) { \
taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define qDebugL(...)
\
do {
\
if (qDebugFlag & DEBUG_DEBUG) {
\
taosPrintLongString("QRY ", DEBUG_DEBUG,
tsLogEmbedded ? 255 :
qDebugFlag, __VA_ARGS__); \
}
\
#define qDebugL(...) \
do { \
if (qDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define QRY_ERR_RET(c) \
...
...
include/util/taoserror.h
浏览文件 @
f9158b3e
...
...
@@ -646,6 +646,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_ALTER_TABLE TAOS_DEF_ERROR_CODE(0, 0x2649)
#define TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x264A)
#define TSDB_CODE_PAR_INVALID_MODIFY_COL TAOS_DEF_ERROR_CODE(0, 0x264B)
#define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...
...
source/common/src/trow.c
浏览文件 @
f9158b3e
...
...
@@ -1063,7 +1063,7 @@ bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t fl
int32_t
tdGetColDataOfRow
(
SCellVal
*
pVal
,
SDataCol
*
pCol
,
int32_t
row
,
int8_t
bitmapMode
)
{
if
(
isAllRowsNone
(
pCol
))
{
pVal
->
valType
=
TD_VTYPE_N
ULL
;
pVal
->
valType
=
TD_VTYPE_N
ONE
;
#ifdef TD_SUPPORT_READ2
pVal
->
val
=
(
void
*
)
getNullValue
(
pCol
->
type
);
#else
...
...
source/dnode/mgmt/mgmt_bnode/src/bmWorker.c
浏览文件 @
f9158b3e
...
...
@@ -38,9 +38,9 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
static
inline
void
bmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
tmsgSendRsp
(
&
rsp
);
}
...
...
source/dnode/mgmt/mgmt_dnode/src/dmHandle.c
浏览文件 @
f9158b3e
...
...
@@ -116,8 +116,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp
statusRsp
=
{
0
};
SMonMloadInfo
minfo
=
{
0
};
dmGetMnodeLoads
(
pMgmt
,
&
minfo
);
if
(
minfo
.
isMnode
&&
minfo
.
load
.
syncState
!=
TAOS_SYNC_STATE_LEADER
&&
minfo
.
load
.
syncState
!=
TAOS_SYNC_STATE_CANDIDATE
)
{
if
(
minfo
.
isMnode
&&
minfo
.
load
.
syncState
==
TAOS_SYNC_STATE_ERROR
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"mnode sync state is %s"
,
syncStr
(
minfo
.
load
.
syncState
));
return
;
...
...
@@ -127,7 +126,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
dmGetVnodeLoads
(
pMgmt
,
&
vinfo
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
vinfo
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
vinfo
.
pVloads
,
i
);
if
(
pLoad
->
syncState
!=
TAOS_SYNC_STATE_LEADER
&&
pLoad
->
syncState
!=
TAOS_SYNC_STATE_FOLLOWE
R
)
{
if
(
pLoad
->
syncState
==
TAOS_SYNC_STATE_ERRO
R
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"vnode:%d sync state is %s"
,
pLoad
->
vgId
,
syncStr
(
pLoad
->
syncState
));
...
...
source/dnode/mgmt/mgmt_dnode/src/dmWorker.c
浏览文件 @
f9158b3e
...
...
@@ -153,9 +153,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
SRpcMsg
rsp
=
{
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
rpcSendResponse
(
&
rsp
);
}
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
f9158b3e
...
...
@@ -19,9 +19,9 @@
static
inline
void
mmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
tmsgSendRsp
(
&
rsp
);
}
...
...
source/dnode/mgmt/mgmt_snode/src/smWorker.c
浏览文件 @
f9158b3e
...
...
@@ -19,9 +19,9 @@
static
inline
void
smSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
tmsgSendRsp
(
&
rsp
);
}
...
...
source/dnode/mgmt/node_mgmt/src/dmProc.c
浏览文件 @
f9158b3e
...
...
@@ -433,7 +433,7 @@ void dmCloseProcRpcHandles(SProc *proc) {
SRpcHandleInfo
*
pInfo
=
taosHashIterate
(
proc
->
hash
,
NULL
);
while
(
pInfo
!=
NULL
)
{
dError
(
"node:%s, the child process dies and send an offline rsp to handle:%p"
,
proc
->
name
,
pInfo
->
handle
);
SRpcMsg
rpcMsg
=
{.
info
=
*
pInfo
,
.
code
=
TSDB_CODE_NODE_OFFLINE
};
SRpcMsg
rpcMsg
=
{.
code
=
TSDB_CODE_NODE_OFFLINE
,
.
info
=
*
pInfo
};
rpcSendResponse
(
&
rpcMsg
);
pInfo
=
taosHashIterate
(
proc
->
hash
,
pInfo
);
}
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
f9158b3e
...
...
@@ -37,7 +37,7 @@ int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray
void
*
mndBuildCreateVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildDropVnodeReq
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SD
nodeObj
*
pDnode
,
SD
bObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
f9158b3e
...
...
@@ -261,8 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease
(
pSdb
,
pDb
);
}
static
int32_t
mndAddCreateVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
,
bool
isRedo
)
{
static
int32_t
mndAddCreateVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
)
{
STransAction
action
=
{
0
};
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
...
...
@@ -279,48 +278,29 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
action
.
msgType
=
TDMT_DND_CREATE_VNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
if
(
isRedo
)
{
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
else
{
if
(
mndTransAppendUndoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndAddAlterVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVgid
,
bool
isRedo
)
{
static
int32_t
mndAddAlterVnodeAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
STransAction
action
=
{
0
};
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
return
-
1
;
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildAlterVnodeReq
(
pMnode
,
pD
node
,
pD
b
,
pVgroup
,
&
contLen
);
void
*
pReq
=
mndBuildAlterVnodeReq
(
pMnode
,
pDb
,
pVgroup
,
&
contLen
);
if
(
pReq
==
NULL
)
return
-
1
;
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_ALTER_VNODE
;
if
(
isRedo
)
{
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
else
{
if
(
mndTransAppendUndoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
return
0
;
...
...
@@ -487,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
pVgid
,
true
)
!=
0
)
{
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
pVgid
)
!=
0
)
{
return
-
1
;
}
}
...
...
@@ -726,11 +706,8 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
static
int32_t
mndBuildAlterVgroupAction
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
SArray
*
pArray
)
{
if
(
pVgroup
->
replica
<=
0
||
pVgroup
->
replica
==
pDb
->
cfg
.
replications
)
{
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
,
pVgid
,
true
)
!=
0
)
{
return
-
1
;
}
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
pVgroup
)
!=
0
)
{
return
-
1
;
}
}
else
{
SVgObj
newVgroup
=
{
0
};
...
...
@@ -744,9 +721,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return
-
1
;
}
newVgroup
.
replica
=
pDb
->
cfg
.
replications
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
0
],
true
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
1
]
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
2
]
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
1
])
!=
0
)
return
-
1
;
if
(
mndAddCreateVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
2
])
!=
0
)
return
-
1
;
}
else
{
mInfo
(
"db:%s, vgId:%d, will remove 2 vnodes"
,
pVgroup
->
dbName
,
pVgroup
->
vgId
);
...
...
@@ -757,7 +734,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return
-
1
;
}
newVgroup
.
replica
=
pDb
->
cfg
.
replications
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
newVgroup
.
vnodeGid
[
0
],
true
)
!=
0
)
return
-
1
;
if
(
mndAddAlterVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
del1
,
true
)
!=
0
)
return
-
1
;
if
(
mndAddDropVnodeAction
(
pMnode
,
pTrans
,
pDb
,
&
newVgroup
,
&
del2
,
true
)
!=
0
)
return
-
1
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
f9158b3e
...
...
@@ -206,7 +206,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static
int32_t
mndDoRebalance
(
SMnode
*
pMnode
,
const
SMqRebInputObj
*
pInput
,
SMqRebOutputObj
*
pOutput
)
{
int32_t
totalVgNum
=
pOutput
->
pSub
->
vgNum
;
mInfo
(
"mq rebalance subscription: %s, vgNum: %d"
,
pOutput
->
pSub
->
key
,
pOutput
->
pSub
->
vgNum
);
mInfo
(
"mq rebalance
:
subscription: %s, vgNum: %d"
,
pOutput
->
pSub
->
key
,
pOutput
->
pSub
->
vgNum
);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -231,6 +231,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld"
,
pVgEp
->
vgId
,
consumerId
);
}
taosHashRemove
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
// put into removed
...
...
@@ -250,6 +251,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
rebOutput
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from unassigned"
,
pVgEp
->
vgId
);
}
}
...
...
@@ -263,6 +265,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt
=
totalVgNum
/
afterRebConsumerNum
;
imbConsumerNum
=
totalVgNum
%
afterRebConsumerNum
;
}
mInfo
(
"mq rebalance: %d consumer after rebalance, at least %d vg each, %d consumer has more vg"
,
afterRebConsumerNum
,
minVgCnt
,
imbConsumerNum
);
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t
imbCnt
=
0
;
...
...
@@ -290,6 +294,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld (first scan)"
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
}
...
...
@@ -303,6 +308,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"mq rebalance: remove vg %d from consumer %ld (first scan)"
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
}
...
...
@@ -319,6 +325,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosHashPut
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
),
&
newConsumerEp
,
sizeof
(
SMqConsumerEp
));
taosArrayPush
(
pOutput
->
newConsumers
,
&
consumerId
);
mInfo
(
"mq rebalance: add new consumer %ld"
,
consumerId
);
}
}
...
...
@@ -343,6 +350,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush
(
pConsumerEp
->
vgs
,
&
pRebVg
->
pVgEp
);
pRebVg
->
newConsumerId
=
pConsumerEp
->
consumerId
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebVg
);
mInfo
(
"mq rebalance: add vg %d to consumer %ld (second scan)"
,
pRebVg
->
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
...
...
@@ -360,6 +368,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush
(
pConsumerEp
->
vgs
,
&
pRebVg
->
pVgEp
);
pRebVg
->
newConsumerId
=
pConsumerEp
->
consumerId
;
taosArrayPush
(
pOutput
->
rebVgs
,
pRebVg
);
mInfo
(
"mq rebalance: add vg %d to consumer %ld (second scan)"
,
pRebVg
->
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
else
{
// if all consumer is removed, put all vg into unassigned
...
...
@@ -372,6 +381,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
ASSERT
(
pRebOutput
->
newConsumerId
==
-
1
);
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
taosArrayPush
(
pOutput
->
rebVgs
,
pRebOutput
);
mInfo
(
"mq rebalance: unassign vg %d (second scan)"
,
pRebOutput
->
pVgEp
->
vgId
);
}
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
f9158b3e
...
...
@@ -842,13 +842,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
taosMemoryFree
(
pTrans
->
rpcRsp
);
mDebug
(
"trans:%d, send rsp, code:0x%x stage:%d app:%p"
,
pTrans
->
id
,
code
&
0xFFFF
,
pTrans
->
stage
,
pTrans
->
rpcInfo
.
ahandle
);
mDebug
(
"trans:%d, send rsp, code:0x%x stage:%d app:%p"
,
pTrans
->
id
,
code
,
pTrans
->
stage
,
pTrans
->
rpcInfo
.
ahandle
);
SRpcMsg
rspMsg
=
{
.
info
=
pTrans
->
rpcInfo
,
.
code
=
code
,
.
pCont
=
rpcCont
,
.
contLen
=
pTrans
->
rpcRspLen
,
.
info
=
pTrans
->
rpcInfo
,
};
tmsgSendRsp
(
&
rspMsg
);
pTrans
->
rpcInfo
.
handle
=
NULL
;
...
...
@@ -986,7 +985,8 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
memcpy
(
rpcMsg
.
pCont
,
pAction
->
pCont
,
pAction
->
contLen
);
if
(
tmsgSendReq
(
&
pAction
->
epSet
,
&
rpcMsg
)
==
0
)
{
mDebug
(
"trans:%d, action:%d is sent"
,
pTrans
->
id
,
action
);
mDebug
(
"trans:%d, action:%d is sent to %s:%u"
,
pTrans
->
id
,
action
,
pAction
->
epSet
.
eps
[
pAction
->
epSet
.
inUse
].
fqdn
,
pAction
->
epSet
.
eps
[
pAction
->
epSet
.
inUse
].
port
);
pAction
->
msgSent
=
1
;
pAction
->
msgReceived
=
0
;
pAction
->
errCode
=
0
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
f9158b3e
...
...
@@ -256,7 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return
pReq
;
}
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SD
nodeObj
*
pDnode
,
SD
bObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
void
*
mndBuildAlterVnodeReq
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
,
int32_t
*
pContLen
)
{
SAlterVnodeReq
alterReq
=
{
0
};
alterReq
.
vgVersion
=
pVgroup
->
version
;
alterReq
.
buffer
=
pDb
->
cfg
.
buffer
;
...
...
@@ -285,16 +285,14 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgO
pReplica
->
port
=
pVgidDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pVgidDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
alterReq
.
selfIndex
=
v
;
}
}
#if 0
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
#endif
int32_t
contLen
=
tSerializeSAlterVnodeReq
(
NULL
,
0
,
&
alterReq
);
if
(
contLen
<
0
)
{
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
f9158b3e
...
...
@@ -489,7 +489,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
tstrncpy
(
desc
.
status
,
"ready"
,
sizeof
(
desc
.
status
));
pClusterInfo
->
vgroups_alive
++
;
}
if
(
pVgid
->
role
==
TAOS_SYNC_STATE_LEADER
||
pVgid
->
role
==
TAOS_SYNC_STATE_CANDIDATE
)
{
if
(
pVgid
->
role
!=
TAOS_SYNC_STATE_ERROR
)
{
pClusterInfo
->
vnodes_alive
++
;
}
pClusterInfo
->
vnodes_total
++
;
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
f9158b3e
...
...
@@ -179,6 +179,7 @@ struct STQ {
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
// TDB* pTdb;
};
typedef
struct
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f9158b3e
...
...
@@ -32,6 +32,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
path
=
strdup
(
path
);
pTq
->
pVnode
=
pVnode
;
pTq
->
pWal
=
pWal
;
/*if (tdbOpen(path, 4096, 1, &pTq->pTdb) < 0) {*/
/*ASSERT(0);*/
/*}*/
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
f9158b3e
...
...
@@ -330,12 +330,12 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
ASSERT
(
pReadh
->
pDCols
[
0
]
->
bitmapMode
!=
0
);
}
if
(
mergeBitmap
&&
!
tdDataColsIsBitmapI
(
pReadh
->
pDCols
[
0
]))
{
for
(
int
i
=
0
;
i
<
numOfColsIds
;
++
i
)
{
SDataCol
*
pDataCol
=
pReadh
->
pDCols
[
0
]
->
cols
+
i
;
if
(
pDataCol
->
bitmap
)
{
if
(
pDataCol
->
len
>
0
&&
pDataCol
->
bitmap
)
{
ASSERT
(
pDataCol
->
colId
!=
PRIMARYKEY_TIMESTAMP_COL_ID
);
ASSERT
(
pDataCol
->
pBitmap
);
tdMergeBitmap
(
pDataCol
->
pBitmap
,
pReadh
->
pDCols
[
0
]
->
numOfRows
,
pDataCol
->
pBitmap
);
tdDataColsSetBitmapI
(
pReadh
->
pDCols
[
0
]);
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
f9158b3e
...
...
@@ -106,7 +106,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
pMsg->contentLen = pMsg->contentLen;
#endif
qDebugL
(
"stream task string %s"
,
(
const
char
*
)
msg
);
/*qDebugL("stream task string %s", (const char*)msg);*/
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f9158b3e
...
...
@@ -5311,4 +5311,4 @@ int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t ke
pCatchSup
->
pWindowHashTable
=
taosHashInit
(
10000
,
hashFn
,
true
,
HASH_NO_LOCK
);;
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/libs/parser/inc/sql.y
浏览文件 @
f9158b3e
...
...
@@ -621,6 +621,7 @@ column_reference(A) ::= table_name(B) NK_DOT column_name(C).
pseudo_column(A) ::= ROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= TBNAME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= table_name(B) NK_DOT TBNAME(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNode(pCxt, &C, createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)))); }
pseudo_column(A) ::= QSTARTTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= QENDTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= WSTARTTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
f9158b3e
...
...
@@ -155,9 +155,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t
return
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
;
}
static
bool
checkDbName
(
SAstCreateContext
*
pCxt
,
SToken
*
pDbName
,
bool
query
)
{
static
bool
checkDbName
(
SAstCreateContext
*
pCxt
,
SToken
*
pDbName
,
bool
demandDb
)
{
if
(
NULL
==
pDbName
)
{
if
(
query
&&
NULL
==
pCxt
->
pQueryCxt
->
db
)
{
if
(
demandDb
&&
NULL
==
pCxt
->
pQueryCxt
->
db
)
{
pCxt
->
errCode
=
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_DB_NOT_SPECIFIED
);
}
}
else
{
...
...
@@ -457,6 +457,8 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok
}
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pSubquery
))
{
strcpy
(((
SSelectStmt
*
)
pSubquery
)
->
stmtName
,
tempTable
->
table
.
tableAlias
);
}
else
if
(
QUERY_NODE_SET_OPERATOR
==
nodeType
(
pSubquery
))
{
strcpy
(((
SSetOperator
*
)
pSubquery
)
->
stmtName
,
tempTable
->
table
.
tableAlias
);
}
return
(
SNode
*
)
tempTable
;
}
...
...
@@ -637,6 +639,7 @@ SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode*
setOp
->
opType
=
type
;
setOp
->
pLeft
=
pLeft
;
setOp
->
pRight
=
pRight
;
sprintf
(
setOp
->
stmtName
,
"%p"
,
setOp
);
return
(
SNode
*
)
setOp
;
}
...
...
@@ -1140,7 +1143,7 @@ SNode* createAlterDnodeStmt(SAstCreateContext* pCxt, const SToken* pDnode, const
SNode
*
createCreateIndexStmt
(
SAstCreateContext
*
pCxt
,
EIndexType
type
,
bool
ignoreExists
,
SToken
*
pIndexName
,
SToken
*
pTableName
,
SNodeList
*
pCols
,
SNode
*
pOptions
)
{
if
(
!
checkIndexName
(
pCxt
,
pIndexName
)
||
!
checkTableName
(
pCxt
,
pTableName
))
{
if
(
!
checkIndexName
(
pCxt
,
pIndexName
)
||
!
checkTableName
(
pCxt
,
pTableName
)
||
!
checkDbName
(
pCxt
,
NULL
,
true
)
)
{
return
NULL
;
}
SCreateIndexStmt
*
pStmt
=
nodesMakeNode
(
QUERY_NODE_CREATE_INDEX_STMT
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
f9158b3e
...
...
@@ -766,6 +766,20 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
pCxt
->
pCurrStmt
->
hasRepeatScanFuncs
=
true
;
}
}
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
&&
fmIsScanPseudoColumnFunc
(
pFunc
->
funcId
))
{
if
(
0
==
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
if
(
QUERY_NODE_REAL_TABLE
!=
nodeType
(
pCxt
->
pCurrStmt
->
pFromTable
))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_INVALID_TBNAME
);
}
}
else
{
SValueNode
*
pVal
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
STableNode
*
pTable
=
NULL
;
pCxt
->
errCode
=
findTable
(
pCxt
,
pVal
->
literal
,
&
pTable
);
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
&&
(
NULL
==
pTable
||
QUERY_NODE_REAL_TABLE
!=
nodeType
(
pTable
)))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_INVALID_TBNAME
);
}
}
}
return
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
?
DEAL_RES_CONTINUE
:
DEAL_RES_ERROR
;
}
...
...
@@ -1758,12 +1772,13 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
return
code
;
}
static
SNode
*
createSetOperProject
(
SNode
*
pNode
)
{
static
SNode
*
createSetOperProject
(
const
char
*
pTableAlias
,
SNode
*
pNode
)
{
SColumnNode
*
pCol
=
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
return
NULL
;
}
pCol
->
node
.
resType
=
((
SExprNode
*
)
pNode
)
->
resType
;
strcpy
(
pCol
->
tableAlias
,
pTableAlias
);
strcpy
(
pCol
->
colName
,
((
SExprNode
*
)
pNode
)
->
aliasName
);
strcpy
(
pCol
->
node
.
aliasName
,
pCol
->
colName
);
return
(
SNode
*
)
pCol
;
...
...
@@ -1817,7 +1832,8 @@ static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* p
}
strcpy
(
pRightExpr
->
aliasName
,
pLeftExpr
->
aliasName
);
pRightExpr
->
aliasName
[
strlen
(
pLeftExpr
->
aliasName
)]
=
'\0'
;
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pSetOperator
->
pProjectionList
,
createSetOperProject
(
pLeft
)))
{
if
(
TSDB_CODE_SUCCESS
!=
nodesListMakeStrictAppend
(
&
pSetOperator
->
pProjectionList
,
createSetOperProject
(
pSetOperator
->
stmtName
,
pLeft
)))
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
f9158b3e
...
...
@@ -158,6 +158,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return
"Primary timestamp column cannot be dropped"
;
case
TSDB_CODE_PAR_INVALID_MODIFY_COL
:
return
"Only binary/nchar column length could be modified"
;
case
TSDB_CODE_PAR_INVALID_TBNAME
:
return
"Invalid tbname pseudo column"
;
case
TSDB_CODE_OUT_OF_MEMORY
:
return
"Out of memory"
;
default:
...
...
source/libs/parser/src/sql.c
浏览文件 @
f9158b3e
此差异已折叠。
点击以展开。
source/libs/parser/test/parSelectTest.cpp
浏览文件 @
f9158b3e
...
...
@@ -70,6 +70,12 @@ TEST_F(ParserSelectTest, pseudoColumn) {
run
(
"SELECT _WSTARTTS, _WENDTS, COUNT(*) FROM t1 INTERVAL(10s)"
);
}
TEST_F
(
ParserSelectTest
,
pseudoColumnSemanticCheck
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT TBNAME FROM (SELECT * FROM st1s1)"
,
TSDB_CODE_PAR_INVALID_TBNAME
,
PARSER_STAGE_TRANSLATE
);
}
TEST_F
(
ParserSelectTest
,
multiResFunc
)
{
useDb
(
"root"
,
"test"
);
...
...
source/libs/planner/inc/planInt.h
浏览文件 @
f9158b3e
...
...
@@ -27,12 +27,13 @@ extern "C" {
#define QUERY_POLICY_HYBRID 2
#define QUERY_POLICY_QNODE 3
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
#define planFatal(param, ...) qFatal("PLAN: " param, __VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, __VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, __VA_ARGS__)
#define planInfo(param, ...) qInfo("PLAN: " param, __VA_ARGS__)
#define planDebug(param, ...) qDebug("PLAN: " param, __VA_ARGS__)
#define planDebugL(param, ...) qDebugL("PLAN: " param, __VA_ARGS__)
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t
generateUsageErrMsg
(
char
*
pBuf
,
int32_t
len
,
int32_t
errCode
,
...);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
f9158b3e
...
...
@@ -310,12 +310,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
static
int32_t
createSubqueryLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
STempTableNode
*
pTable
,
SLogicNode
**
pLogicNode
)
{
int32_t
code
=
createQueryLogicNode
(
pCxt
,
pTable
->
pSubquery
,
pLogicNode
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SNode
*
pNode
;
FOREACH
(
pNode
,
(
*
pLogicNode
)
->
pTargets
)
{
strcpy
(((
SColumnNode
*
)
pNode
)
->
tableAlias
,
pTable
->
table
.
tableAlias
);
}
}
return
code
;
return
createQueryLogicNode
(
pCxt
,
pTable
->
pSubquery
,
pLogicNode
);
}
static
int32_t
createJoinLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SJoinTableNode
*
pJoinTable
,
...
...
@@ -879,7 +874,8 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createColumnByProjections
(
pCxt
,
NULL
,
pSetOperator
->
pProjectionList
,
&
pProject
->
node
.
pTargets
);
code
=
createColumnByProjections
(
pCxt
,
pSetOperator
->
stmtName
,
pSetOperator
->
pProjectionList
,
&
pProject
->
node
.
pTargets
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -933,7 +929,7 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
code
=
createSetOpAggLogicNode
(
pCxt
,
pSetOperator
,
&
pSetOp
);
break
;
default:
code
=
-
1
;
code
=
TSDB_CODE_FAILED
;
break
;
}
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
f9158b3e
...
...
@@ -598,39 +598,63 @@ static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
return
false
;
}
static
int32_t
cpdCheckOpCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNode
*
pOnCond
)
{
if
(
!
cpdIsPrimaryKeyEqualCond
(
pJoin
,
pOnCond
))
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdCheckLogicCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SLogicConditionNode
*
pOnCond
)
{
if
(
LOGIC_COND_TYPE_AND
!=
pOnCond
->
condType
)
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL
);
}
bool
hasPrimaryKeyEqualCond
=
false
;
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pOnCond
->
pParameterList
)
{
if
(
cpdIsPrimaryKeyEqualCond
(
pJoin
,
pCond
))
{
hasPrimaryKeyEqualCond
=
true
;
static
bool
cpdContainPrimaryKeyEqualCond
(
SJoinLogicNode
*
pJoin
,
SNode
*
pCond
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pCond
))
{
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)
pCond
;
if
(
LOGIC_COND_TYPE_AND
!=
pLogicCond
->
condType
)
{
return
false
;
}
}
if
(
!
hasPrimaryKeyEqualCond
)
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL
);
}
return
TSDB_CODE_SUCCESS
;
}
bool
hasPrimaryKeyEqualCond
=
false
;
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pLogicCond
->
pParameterList
)
{
if
(
cpdContainPrimaryKeyEqualCond
(
pJoin
,
pCond
))
{
hasPrimaryKeyEqualCond
=
true
;
break
;
}
}
return
hasPrimaryKeyEqualCond
;
}
else
{
return
cpdIsPrimaryKeyEqualCond
(
pJoin
,
pCond
);
}
}
// static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
// if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// return TSDB_CODE_SUCCESS;
// }
// static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
// if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// bool hasPrimaryKeyEqualCond = false;
// SNode* pCond = NULL;
// FOREACH(pCond, pOnCond->pParameterList) {
// if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
// hasPrimaryKeyEqualCond = true;
// }
// }
// if (!hasPrimaryKeyEqualCond) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// return TSDB_CODE_SUCCESS;
// }
static
int32_t
cpdCheckJoinOnCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
pOnConditions
)
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pJoin
->
pOnConditions
))
{
return
cpdCheckLogicCond
(
pCxt
,
pJoin
,
(
SLogicConditionNode
*
)
pJoin
->
pOnConditions
);
}
else
{
return
cpdCheckOpCond
(
pCxt
,
pJoin
,
pJoin
->
pOnConditions
);
if
(
!
cpdContainPrimaryKeyEqualCond
(
pJoin
,
pJoin
->
pOnConditions
))
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL
);
}
return
TSDB_CODE_SUCCESS
;
// if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
// return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
// } else {
// return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
// }
}
static
int32_t
cpdPushJoinCondition
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
f9158b3e
...
...
@@ -204,6 +204,75 @@ static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
code
;
}
static
bool
unionIsChildSubplan
(
SLogicNode
*
pLogicNode
,
int32_t
groupId
)
{
if
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
==
nodeType
(
pLogicNode
))
{
return
((
SExchangeLogicNode
*
)
pLogicNode
)
->
srcGroupId
==
groupId
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pLogicNode
->
pChildren
)
{
bool
isChild
=
unionIsChildSubplan
((
SLogicNode
*
)
pChild
,
groupId
);
if
(
isChild
)
{
return
isChild
;
}
}
return
false
;
}
static
int32_t
unionMountSubplan
(
SLogicSubplan
*
pParent
,
SNodeList
*
pChildren
)
{
SNode
*
pChild
=
NULL
;
WHERE_EACH
(
pChild
,
pChildren
)
{
if
(
unionIsChildSubplan
(
pParent
->
pNode
,
((
SLogicSubplan
*
)
pChild
)
->
id
.
groupId
))
{
int32_t
code
=
nodesListMakeAppend
(
&
pParent
->
pChildren
,
pChild
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
NULL
);
ERASE_NODE
(
pChildren
);
continue
;
}
else
{
return
code
;
}
}
WHERE_NEXT
;
}
return
TSDB_CODE_SUCCESS
;
}
static
SLogicSubplan
*
unionCreateSubplan
(
SSplitContext
*
pCxt
,
SLogicNode
*
pNode
)
{
SLogicSubplan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
NULL
;
}
pSubplan
->
id
.
groupId
=
pCxt
->
groupId
;
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
pSubplan
->
pNode
=
pNode
;
return
pSubplan
;
}
static
int32_t
unionSplitSubplan
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pUnionSubplan
,
SLogicNode
*
pSplitNode
)
{
SNodeList
*
pSubplanChildren
=
pUnionSubplan
->
pChildren
;
pUnionSubplan
->
pChildren
=
NULL
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pSplitNode
->
pChildren
)
{
SLogicSubplan
*
pNewSubplan
=
unionCreateSubplan
(
pCxt
,
(
SLogicNode
*
)
pChild
);
code
=
nodesListMakeStrictAppend
(
&
pUnionSubplan
->
pChildren
,
pNewSubplan
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
NULL
);
code
=
unionMountSubplan
(
pNewSubplan
,
pSubplanChildren
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
nodesDestroyList
(
pSubplanChildren
);
DESTORY_LIST
(
pSplitNode
->
pChildren
);
}
return
code
;
}
static
SLogicNode
*
uaMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_PROJECT
==
nodeType
(
pNode
)
&&
LIST_LENGTH
(
pNode
->
pChildren
)
>
1
)
{
return
pNode
;
...
...
@@ -227,17 +296,6 @@ static bool uaFindSplitNode(SLogicSubplan* pSubplan, SUaInfo* pInfo) {
return
NULL
!=
pSplitNode
;
}
static
SLogicSubplan
*
uaCreateSubplan
(
SSplitContext
*
pCxt
,
SLogicNode
*
pNode
)
{
SLogicSubplan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
NULL
;
}
pSubplan
->
id
.
groupId
=
pCxt
->
groupId
;
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
pSubplan
->
pNode
=
pNode
;
return
pSubplan
;
}
static
int32_t
uaCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SProjectLogicNode
*
pProject
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
...
...
@@ -276,20 +334,8 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
info
.
pProject
->
node
.
pChildren
)
{
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
uaCreateSubplan
(
pCxt
,
(
SLogicNode
*
)
pChild
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
NULL
);
}
else
{
break
;
}
}
int32_t
code
=
unionSplitSubplan
(
pCxt
,
info
.
pSubplan
,
(
SLogicNode
*
)
info
.
pProject
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
nodesClearList
(
info
.
pProject
->
node
.
pChildren
);
info
.
pProject
->
node
.
pChildren
=
NULL
;
code
=
uaCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pProject
);
}
++
(
pCxt
->
groupId
);
...
...
@@ -343,20 +389,8 @@ static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
info
.
pAgg
->
node
.
pChildren
)
{
code
=
nodesListMakeStrictAppend
(
&
info
.
pSubplan
->
pChildren
,
uaCreateSubplan
(
pCxt
,
(
SLogicNode
*
)
pChild
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
REPLACE_NODE
(
NULL
);
}
else
{
break
;
}
}
int32_t
code
=
unionSplitSubplan
(
pCxt
,
info
.
pSubplan
,
(
SLogicNode
*
)
info
.
pAgg
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
nodesClearList
(
info
.
pAgg
->
node
.
pChildren
);
info
.
pAgg
->
node
.
pChildren
=
NULL
;
code
=
unCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pAgg
);
}
++
(
pCxt
->
groupId
);
...
...
source/libs/planner/test/planJoinTest.cpp
浏览文件 @
f9158b3e
...
...
@@ -30,6 +30,14 @@ TEST_F(PlanJoinTest, basic) {
run
(
"SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts"
);
}
TEST_F
(
PlanJoinTest
,
complex
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 "
"WHERE t1.ts = t2.ts AND t1.c1 BETWEEN -10 AND 10 AND t2.c1 BETWEEN -100 AND 100 AND "
"(t1.c2 LIKE 'nchar%' OR t1.c1 = 0 OR t2.c2 LIKE 'nchar%' OR t2.c1 = 0)"
);
}
TEST_F
(
PlanJoinTest
,
withWhere
)
{
useDb
(
"root"
,
"test"
);
...
...
source/libs/planner/test/planSetOpTest.cpp
浏览文件 @
f9158b3e
...
...
@@ -23,11 +23,33 @@ class PlanSetOpTest : public PlannerTestBase {};
TEST_F
(
PlanSetOpTest
,
unionAll
)
{
useDb
(
"root"
,
"test"
);
run
(
"select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20"
);
run
(
"SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20"
);
}
TEST_F
(
PlanSetOpTest
,
unionAllSubquery
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT * FROM (SELECT c1, c2 FROM t1 UNION ALL SELECT c1, c2 FROM t1)"
);
}
TEST_F
(
PlanSetOpTest
,
union
)
{
useDb
(
"root"
,
"test"
);
run
(
"select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20"
);
run
(
"SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION SELECT c1, c2 FROM t1 WHERE c1 > 20"
);
}
TEST_F
(
PlanSetOpTest
,
unionContainJoin
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT t1.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts "
"WHERE t1.c1 IS NOT NULL GROUP BY t1.c1 HAVING t1.c1 IS NOT NULL "
"UNION "
"SELECT t1.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts "
"WHERE t1.c1 IS NOT NULL GROUP BY t1.c1 HAVING t1.c1 IS NOT NULL"
);
}
TEST_F
(
PlanSetOpTest
,
unionSubquery
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT * FROM (SELECT c1, c2 FROM t1 UNION SELECT c1, c2 FROM t1)"
);
}
source/libs/qworker/src/qworker.c
浏览文件 @
f9158b3e
...
...
@@ -9,11 +9,11 @@
#include "tmsg.h"
#include "tname.h"
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
SQWDebug
gQWDebug
=
{.
statusEnable
=
true
,
.
dumpEnable
=
true
};
SQWorkerMgmt
gQwMgmt
=
{
.
lock
=
0
,
.
qwRef
=
-
1
,
.
qwNum
=
0
,
.
lock
=
0
,
.
qwRef
=
-
1
,
.
qwNum
=
0
,
};
int32_t
qwDbgValidateStatus
(
QW_FPARAMS_DEF
,
int8_t
oriStatus
,
int8_t
newStatus
,
bool
*
ignore
)
{
...
...
@@ -110,9 +110,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_LOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_DUMP
(
"total remain schduler num:%d"
,
taosHashGetSize
(
mgmt
->
schHash
));
/*QW_DUMP("total remain schduler num:%d", taosHashGetSize(mgmt->schHash));*/
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
SQWSchStatus
*
sch
=
NULL
;
...
...
@@ -127,7 +127,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
QW_UNLOCK
(
QW_READ
,
&
mgmt
->
schLock
);
QW_DUMP
(
"total remain ctx num:%d"
,
taosHashGetSize
(
mgmt
->
ctxHash
));
/*QW_DUMP("total remain ctx num:%d", taosHashGetSize(mgmt->ctxHash));*/
}
char
*
qwPhaseStr
(
int32_t
phase
)
{
...
...
@@ -462,7 +462,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
}
int32_t
qwDropTaskStatus
(
QW_FPARAMS_DEF
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -499,7 +499,7 @@ _return:
}
int32_t
qwUpdateTaskStatus
(
QW_FPARAMS_DEF
,
int8_t
status
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -550,11 +550,11 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
queryEnd
)
{
int32_t
code
=
0
;
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
SSDataBlock
*
pRes
=
NULL
;
uint64_t
useconds
=
0
;
int32_t
i
=
0
;
int32_t
execNum
=
0
;
qTaskInfo_t
*
taskHandle
=
&
ctx
->
taskHandle
;
qTaskInfo_t
*
taskHandle
=
&
ctx
->
taskHandle
;
DataSinkHandle
sinkHandle
=
ctx
->
sinkHandle
;
while
(
true
)
{
...
...
@@ -632,7 +632,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo)
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
STaskStatus
status
=
{
0
};
...
...
@@ -719,8 +719,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
}
int32_t
qwHandlePrePhaseEvents
(
QW_FPARAMS_DEF
,
int8_t
phase
,
SQWPhaseInput
*
input
,
SQWPhaseOutput
*
output
)
{
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWTaskCtx
*
ctx
=
NULL
;
SRpcHandleInfo
*
dropConnection
=
NULL
;
SRpcHandleInfo
*
cancelConnection
=
NULL
;
...
...
@@ -925,13 +925,13 @@ _return:
}
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
bool
queryRsped
=
false
;
SSubplan
*
plan
=
NULL
;
SQWPhaseInput
input
=
{
0
};
qTaskInfo_t
pTaskInfo
=
NULL
;
DataSinkHandle
sinkHandle
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
QW_ERR_JRET
(
qwRegisterQueryBrokenLinkArg
(
QW_FPARAMS
(),
&
qwMsg
->
connInfo
));
...
...
@@ -944,7 +944,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
ctx
->
ctrlConnInfo
=
qwMsg
->
connInfo
;
QW_TASK_DLOGL
(
"subplan json string, len:%d, %s"
,
qwMsg
->
msgLen
,
qwMsg
->
msg
);
/*QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);*/
code
=
qStringToSubplan
(
qwMsg
->
msg
,
&
plan
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
...
...
@@ -1055,10 +1055,10 @@ _return:
}
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
SQWTaskCtx
*
ctx
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
int32_t
code
=
0
;
SQWPhaseInput
input
=
{
0
};
void
*
rsp
=
NULL
;
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
bool
queryEnd
=
false
;
...
...
@@ -1138,8 +1138,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
int32_t
code
=
0
;
int32_t
dataLen
=
0
;
bool
locked
=
false
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
SQWTaskCtx
*
ctx
=
NULL
;
void
*
rsp
=
NULL
;
SQWPhaseInput
input
=
{
0
};
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_FETCH
,
&
input
,
NULL
));
...
...
@@ -1274,7 +1274,7 @@ _return:
int32_t
qwProcessHbLinkBroken
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
QW_ERR_RET
(
qwAcquireAddScheduler
(
mgmt
,
req
->
sId
,
QW_READ
,
&
sch
));
...
...
@@ -1300,7 +1300,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
int32_t
qwProcessHb
(
SQWorker
*
mgmt
,
SQWMsg
*
qwMsg
,
SSchedulerHbReq
*
req
)
{
int32_t
code
=
0
;
SSchedulerHbRsp
rsp
=
{
0
};
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
if
(
qwMsg
->
code
)
{
QW_RET
(
qwProcessHbLinkBroken
(
mgmt
,
qwMsg
,
req
));
...
...
@@ -1338,28 +1338,28 @@ _return:
qwMsg
->
connInfo
.
handle
=
NULL
;
}
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s"
,
qwMsg
->
connInfo
.
handle
,
code
,
tstrerror
(
code
));
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));*/
QW_RET
(
TSDB_CODE_SUCCESS
);
}
void
qwProcessHbTimerEvent
(
void
*
param
,
void
*
tmrId
)
{
SQWHbParam
*
hbParam
=
(
SQWHbParam
*
)
param
;
SQWHbParam
*
hbParam
=
(
SQWHbParam
*
)
param
;
if
(
hbParam
->
qwrId
!=
atomic_load_32
(
&
gQwMgmt
.
qwRef
))
{
return
;
}
int64_t
refId
=
hbParam
->
refId
;
int64_t
refId
=
hbParam
->
refId
;
SQWorker
*
mgmt
=
qwAcquire
(
refId
);
if
(
NULL
==
mgmt
)
{
QW_DLOG
(
"qwAcquire %"
PRIx64
"failed"
,
refId
);
taosMemoryFree
(
param
);
return
;
}
SQWSchStatus
*
sch
=
NULL
;
int32_t
taskNum
=
0
;
SQWHbInfo
*
rspList
=
NULL
;
SQWHbInfo
*
rspList
=
NULL
;
int32_t
code
=
0
;
qwDbgDumpMgmtInfo
(
mgmt
);
...
...
@@ -1383,7 +1383,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
return
;
}
void
*
key
=
NULL
;
void
*
key
=
NULL
;
size_t
keyLen
=
0
;
int32_t
i
=
0
;
...
...
@@ -1413,29 +1413,27 @@ _return:
for
(
int32_t
j
=
0
;
j
<
i
;
++
j
)
{
qwBuildAndSendHbRsp
(
&
rspList
[
j
].
connInfo
,
&
rspList
[
j
].
rsp
,
code
);
QW_DLOG
(
"hb rsp send, handle:%p, code:%x - %s, taskNum:%d"
,
rspList
[
j
].
connInfo
.
handle
,
code
,
tstrerror
(
code
),
(
rspList
[
j
].
rsp
.
taskStatus
?
(
int32_t
)
taosArrayGetSize
(
rspList
[
j
].
rsp
.
taskStatus
)
:
0
));
/*QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),*/
/*(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));*/
tFreeSSchedulerHbRsp
(
&
rspList
[
j
].
rsp
);
}
taosMemoryFreeClear
(
rspList
);
taosTmrReset
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
param
,
mgmt
->
timer
,
&
mgmt
->
hbTimer
);
qwRelease
(
refId
);
qwRelease
(
refId
);
}
void
qwCloseRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
atomic_load_32
(
&
gQwMgmt
.
qwNum
)
<=
0
&&
gQwMgmt
.
qwRef
>=
0
)
{
taosCloseRef
(
gQwMgmt
.
qwRef
);
gQwMgmt
.
qwRef
=
-
1
;
gQwMgmt
.
qwRef
=
-
1
;
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
}
void
qwDestroySchStatus
(
SQWSchStatus
*
pStatus
)
{
taosHashCleanup
(
pStatus
->
tasksHash
);
}
void
qwDestroySchStatus
(
SQWSchStatus
*
pStatus
)
{
taosHashCleanup
(
pStatus
->
tasksHash
);
}
void
qwDestroyImpl
(
void
*
pMgmt
)
{
SQWorker
*
mgmt
=
(
SQWorker
*
)
pMgmt
;
...
...
@@ -1454,12 +1452,12 @@ void qwDestroyImpl(void *pMgmt) {
SQWSchStatus
*
sch
=
(
SQWSchStatus
*
)
pIter
;
qwDestroySchStatus
(
sch
);
pIter
=
taosHashIterate
(
mgmt
->
schHash
,
pIter
);
}
}
taosHashCleanup
(
mgmt
->
schHash
);
taosMemoryFree
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
qwCloseRef
();
}
...
...
@@ -1467,7 +1465,7 @@ void qwDestroyImpl(void *pMgmt) {
int32_t
qwOpenRef
(
void
)
{
taosWLockLatch
(
&
gQwMgmt
.
lock
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
gQwMgmt
.
qwRef
=
taosOpenRef
(
100
,
qwDestroyImpl
);
if
(
gQwMgmt
.
qwRef
<
0
)
{
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
qError
(
"init qworker ref failed"
);
...
...
@@ -1475,14 +1473,14 @@ int32_t qwOpenRef(void) {
}
}
taosWUnLockLatch
(
&
gQwMgmt
.
lock
);
return
TSDB_CODE_SUCCESS
;
}
void
qwSetHbParam
(
int64_t
refId
,
SQWHbParam
**
pParam
)
{
int32_t
paramIdx
=
0
;
int32_t
newParamIdx
=
0
;
while
(
true
)
{
paramIdx
=
atomic_load_32
(
&
gQwMgmt
.
paramIdx
);
if
(
paramIdx
==
tListLen
(
gQwMgmt
.
param
))
{
...
...
@@ -1490,7 +1488,7 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
}
else
{
newParamIdx
=
paramIdx
+
1
;
}
if
(
paramIdx
==
atomic_val_compare_exchange_32
(
&
gQwMgmt
.
paramIdx
,
paramIdx
,
newParamIdx
))
{
break
;
}
...
...
@@ -1577,12 +1575,12 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
SQWHbParam
*
param
=
NULL
;
qwSetHbParam
(
mgmt
->
refId
,
&
param
);
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
(
void
*
)
param
,
mgmt
->
timer
);
mgmt
->
hbTimer
=
taosTmrStart
(
qwProcessHbTimerEvent
,
QW_DEFAULT_HEARTBEAT_MSEC
,
(
void
*
)
param
,
mgmt
->
timer
);
if
(
NULL
==
mgmt
->
hbTimer
)
{
qError
(
"start hb timer failed"
);
QW_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
*
qWorkerMgmt
=
mgmt
;
qDebug
(
"qworker initialized for node, type:%d, id:%d, handle:%p"
,
mgmt
->
nodeType
,
mgmt
->
nodeId
,
mgmt
);
...
...
@@ -1599,9 +1597,9 @@ _return:
taosTmrCleanUp
(
mgmt
->
timer
);
taosMemoryFreeClear
(
mgmt
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
atomic_sub_fetch_32
(
&
gQwMgmt
.
qwNum
,
1
);
}
QW_RET
(
code
);
}
...
...
@@ -1678,7 +1676,7 @@ int32_t qwUpdateSchLastAccess(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64
}
int32_t
qwGetTaskStatus
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
,
int8_t
*
taskStatus
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
@@ -1705,7 +1703,7 @@ int32_t qwGetTaskStatus(SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
}
int32_t
qwCancelTask
(
SQWorker
*
mgmt
,
uint64_t
sId
,
uint64_t
qId
,
uint64_t
tId
)
{
SQWSchStatus
*
sch
=
NULL
;
SQWSchStatus
*
sch
=
NULL
;
SQWTaskStatus
*
task
=
NULL
;
int32_t
code
=
0
;
...
...
source/libs/tdb/inc/tdb.h
浏览文件 @
f9158b3e
...
...
@@ -22,7 +22,7 @@
extern
"C"
{
#endif
typedef
int
(
*
tdb_cmpr_fn_t
)(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
in
t
kLen2
);
typedef
int
(
*
tdb_cmpr_fn_t
)(
const
void
*
pKey1
,
int
32_t
kLen1
,
const
void
*
pKey2
,
int32_
t
kLen2
);
// exposed types
typedef
struct
STDB
TDB
;
...
...
@@ -31,42 +31,42 @@ typedef struct STBC TBC;
typedef
struct
STxn
TXN
;
// TDB
int
tdbOpen
(
const
char
*
rootDir
,
int
szPage
,
int
pages
,
TDB
**
ppDb
);
int
tdbClose
(
TDB
*
pDb
);
int
tdbBegin
(
TDB
*
pDb
,
TXN
*
pTxn
);
int
tdbCommit
(
TDB
*
pDb
,
TXN
*
pTxn
);
int
32_t
tdbOpen
(
const
char
*
dbname
,
int
szPage
,
int
pages
,
TDB
**
ppDb
);
int
32_t
tdbClose
(
TDB
*
pDb
);
int
32_t
tdbBegin
(
TDB
*
pDb
,
TXN
*
pTxn
);
int
32_t
tdbCommit
(
TDB
*
pDb
,
TXN
*
pTxn
);
// TTB
int
tdbTbOpen
(
const
char
*
f
name
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
);
int
tdbTbClose
(
TTB
*
pTb
);
int
tdbTbDrop
(
TTB
*
pTb
);
int
tdbTbInsert
(
TTB
*
pTb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
int
tdbTbDelete
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
);
int
tdbTbUpsert
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
tdbTbGet
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbTbPGet
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
int
32_t
tdbTbOpen
(
const
char
*
tb
name
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
);
int
32_t
tdbTbClose
(
TTB
*
pTb
);
int
32_t
tdbTbDrop
(
TTB
*
pTb
);
int
32_t
tdbTbInsert
(
TTB
*
pTb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
int
32_t
tdbTbDelete
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
TXN
*
pTxn
);
int
32_t
tdbTbUpsert
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
32_t
tdbTbGet
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
32_t
tdbTbPGet
(
TTB
*
pTb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
// TBC
int
tdbTbcOpen
(
TTB
*
pTb
,
TBC
**
ppTbc
,
TXN
*
pTxn
);
int
tdbTbcClose
(
TBC
*
pTbc
);
int
tdbTbcIsValid
(
TBC
*
pTbc
);
int
tdbTbcMoveTo
(
TBC
*
pTbc
,
const
void
*
pKey
,
int
kLen
,
int
*
c
);
int
tdbTbcMoveToFirst
(
TBC
*
pTbc
);
int
tdbTbcMoveToLast
(
TBC
*
pTbc
);
int
tdbTbcMoveToNext
(
TBC
*
pTbc
);
int
tdbTbcMoveToPrev
(
TBC
*
pTbc
);
int
tdbTbcGet
(
TBC
*
pTbc
,
const
void
**
ppKey
,
int
*
pkLen
,
const
void
**
ppVal
,
int
*
pvLen
);
int
tdbTbcDelete
(
TBC
*
pTbc
);
int
tdbTbcNext
(
TBC
*
pTbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbTbcUpsert
(
TBC
*
pTbc
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
int
insert
);
int
32_t
tdbTbcOpen
(
TTB
*
pTb
,
TBC
**
ppTbc
,
TXN
*
pTxn
);
int
32_t
tdbTbcClose
(
TBC
*
pTbc
);
int
32_t
tdbTbcIsValid
(
TBC
*
pTbc
);
int
32_t
tdbTbcMoveTo
(
TBC
*
pTbc
,
const
void
*
pKey
,
int
kLen
,
int
*
c
);
int
32_t
tdbTbcMoveToFirst
(
TBC
*
pTbc
);
int
32_t
tdbTbcMoveToLast
(
TBC
*
pTbc
);
int
32_t
tdbTbcMoveToNext
(
TBC
*
pTbc
);
int
32_t
tdbTbcMoveToPrev
(
TBC
*
pTbc
);
int
32_t
tdbTbcGet
(
TBC
*
pTbc
,
const
void
**
ppKey
,
int
*
pkLen
,
const
void
**
ppVal
,
int
*
pvLen
);
int
32_t
tdbTbcDelete
(
TBC
*
pTbc
);
int
32_t
tdbTbcNext
(
TBC
*
pTbc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
32_t
tdbTbcUpsert
(
TBC
*
pTbc
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
int
insert
);
// TXN
#define TDB_TXN_WRITE 0x1
#define TDB_TXN_READ_UNCOMMITTED 0x2
int
tdbTxnOpen
(
TXN
*
pTxn
,
int64_t
txnid
,
void
*
(
*
xMalloc
)(
void
*
,
size_t
),
void
(
*
xFree
)(
void
*
,
void
*
),
void
*
xArg
,
int
flags
);
int
tdbTxnClose
(
TXN
*
pTxn
);
int
32_t
tdbTxnOpen
(
TXN
*
pTxn
,
int64_t
txnid
,
void
*
(
*
xMalloc
)(
void
*
,
size_t
),
void
(
*
xFree
)(
void
*
,
void
*
)
,
void
*
xArg
,
int
flags
);
int
32_t
tdbTxnClose
(
TXN
*
pTxn
);
// other
void
tdbFree
(
void
*
);
...
...
source/libs/tdb/src/db/tdbDb.c
浏览文件 @
f9158b3e
...
...
@@ -15,7 +15,7 @@
#include "tdbInt.h"
int
tdbOpen
(
const
char
*
rootDir
,
int
szPage
,
in
t
pages
,
TDB
**
ppDb
)
{
int
32_t
tdbOpen
(
const
char
*
dbname
,
int32_t
szPage
,
int32_
t
pages
,
TDB
**
ppDb
)
{
TDB
*
pDb
;
int
dsize
;
int
zsize
;
...
...
@@ -25,7 +25,7 @@ int tdbOpen(const char *rootDir, int szPage, int pages, TDB **ppDb) {
*
ppDb
=
NULL
;
dsize
=
strlen
(
rootDir
);
dsize
=
strlen
(
dbname
);
zsize
=
sizeof
(
*
pDb
)
+
dsize
*
2
+
strlen
(
TDB_JOURNAL_NAME
)
+
3
;
pPtr
=
(
uint8_t
*
)
tdbOsCalloc
(
1
,
zsize
);
...
...
@@ -36,16 +36,16 @@ int tdbOpen(const char *rootDir, int szPage, int pages, TDB **ppDb) {
pDb
=
(
TDB
*
)
pPtr
;
pPtr
+=
sizeof
(
*
pDb
);
// pDb->rootDir
pDb
->
rootDir
=
pPtr
;
memcpy
(
pDb
->
rootDir
,
rootDir
,
dsize
);
pDb
->
rootDir
[
dsize
]
=
'\0'
;
pDb
->
dbName
=
pPtr
;
memcpy
(
pDb
->
dbName
,
dbname
,
dsize
);
pDb
->
dbName
[
dsize
]
=
'\0'
;
pPtr
=
pPtr
+
dsize
+
1
;
// pDb->jfname
pDb
->
j
fn
ame
=
pPtr
;
memcpy
(
pDb
->
j
fname
,
rootDir
,
dsize
);
pDb
->
j
fn
ame
[
dsize
]
=
'/'
;
memcpy
(
pDb
->
j
fn
ame
+
dsize
+
1
,
TDB_JOURNAL_NAME
,
strlen
(
TDB_JOURNAL_NAME
));
pDb
->
j
fn
ame
[
dsize
+
1
+
strlen
(
TDB_JOURNAL_NAME
)]
=
'\0'
;
pDb
->
j
nN
ame
=
pPtr
;
memcpy
(
pDb
->
j
nName
,
dbname
,
dsize
);
pDb
->
j
nN
ame
[
dsize
]
=
'/'
;
memcpy
(
pDb
->
j
nN
ame
+
dsize
+
1
,
TDB_JOURNAL_NAME
,
strlen
(
TDB_JOURNAL_NAME
));
pDb
->
j
nN
ame
[
dsize
+
1
+
strlen
(
TDB_JOURNAL_NAME
)]
=
'\0'
;
pDb
->
jfd
=
-
1
;
...
...
@@ -62,7 +62,7 @@ int tdbOpen(const char *rootDir, int szPage, int pages, TDB **ppDb) {
}
memset
(
pDb
->
pgrHash
,
0
,
tsize
);
mkdir
(
rootDir
,
0755
);
mkdir
(
dbname
,
0755
);
*
ppDb
=
pDb
;
return
0
;
...
...
source/libs/tdb/src/db/tdbTable.c
浏览文件 @
f9158b3e
...
...
@@ -24,7 +24,7 @@ struct STBC {
SBTC
btc
;
};
int
tdbTbOpen
(
const
char
*
f
name
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
)
{
int
tdbTbOpen
(
const
char
*
tb
name
,
int
keyLen
,
int
valLen
,
tdb_cmpr_fn_t
keyCmprFn
,
TDB
*
pEnv
,
TTB
**
ppTb
)
{
TTB
*
pTb
;
SPager
*
pPager
;
int
ret
;
...
...
@@ -42,9 +42,9 @@ int tdbTbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn
// pTb->pEnv
pTb
->
pEnv
=
pEnv
;
pPager
=
tdbEnvGetPager
(
pEnv
,
f
name
);
pPager
=
tdbEnvGetPager
(
pEnv
,
tb
name
);
if
(
pPager
==
NULL
)
{
snprintf
(
fFullName
,
TDB_FILENAME_LEN
,
"%s/%s"
,
pEnv
->
rootDir
,
f
name
);
snprintf
(
fFullName
,
TDB_FILENAME_LEN
,
"%s/%s"
,
pEnv
->
dbName
,
tb
name
);
ret
=
tdbPagerOpen
(
pEnv
->
pCache
,
fFullName
,
&
pPager
);
if
(
ret
<
0
)
{
return
-
1
;
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
f9158b3e
...
...
@@ -335,8 +335,8 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
}
struct
STDB
{
char
*
rootDir
;
char
*
j
fn
ame
;
char
*
dbName
;
char
*
j
nN
ame
;
int
jfd
;
SPCache
*
pCache
;
SPager
*
pgrList
;
...
...
tests/script/tsim/tstream/basic1.sim
浏览文件 @
f9158b3e
...
...
@@ -24,7 +24,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 1000
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
...
...
@@ -137,7 +137,7 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
...
...
@@ -250,7 +250,7 @@ endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
...
...
@@ -280,7 +280,7 @@ endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
...
...
@@ -312,7 +312,7 @@ sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
...
...
@@ -344,7 +344,7 @@ sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
...
...
@@ -374,7 +374,7 @@ endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
...
...
@@ -404,7 +404,7 @@ endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select
_wstartts
, c1, c2 ,c3 ,c4, c5 from streamt;
sql select
`_wstartts`
, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录