Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d6dd087f
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d6dd087f
编写于
2月 25, 2023
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
merge main
上级
320ad8d1
097723f4
变更
39
展开全部
隐藏空白更改
内联
并排
Showing
39 changed file
with
577 addition
and
523 deletion
+577
-523
examples/c/tmq.c
examples/c/tmq.c
+1
-1
include/common/systable.h
include/common/systable.h
+3
-3
include/libs/qcom/query.h
include/libs/qcom/query.h
+5
-3
include/libs/wal/wal.h
include/libs/wal/wal.h
+3
-2
source/client/src/clientHb.c
source/client/src/clientHb.c
+54
-24
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+16
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+81
-81
source/dnode/mnode/impl/inc/mndTopic.h
source/dnode/mnode/impl/inc/mndTopic.h
+3
-8
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+6
-3
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+18
-6
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-0
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+70
-38
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+6
-2
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+91
-77
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+7
-5
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+7
-6
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+1
-0
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-4
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+0
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-3
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+16
-6
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+47
-67
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+26
-84
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+15
-14
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+2
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+1
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+29
-25
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+2
-10
source/libs/wal/src/walRef.c
source/libs/wal/src/walRef.c
+16
-18
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+16
-12
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+3
-3
source/os/src/osSemaphore.c
source/os/src/osSemaphore.c
+2
-1
source/util/src/tarray.c
source/util/src/tarray.c
+5
-3
source/util/src/tcompare.c
source/util/src/tcompare.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-1
tests/script/tsim/catalog/alterInCurrent.sim
tests/script/tsim/catalog/alterInCurrent.sim
+15
-0
tests/system-test/7-tmq/tmq_taosx.py
tests/system-test/7-tmq/tmq_taosx.py
+1
-0
未找到文件。
examples/c/tmq.c
浏览文件 @
d6dd087f
...
...
@@ -61,7 +61,7 @@ static int32_t init_env() {
printf
(
"create database
\n
"
);
pRes
=
taos_query
(
pConn
,
"drop topic topicname"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in drop t
mqdb
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"error in drop t
opicname
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
...
...
include/common/systable.h
浏览文件 @
d6dd087f
...
...
@@ -12,6 +12,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYSTABLE_H
#define TDENGINE_SYSTABLE_H
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -19,9 +22,6 @@ extern "C" {
#include "os.h"
#ifndef TDENGINE_SYSTABLE_H
#define TDENGINE_SYSTABLE_H
#define TSDB_INFORMATION_SCHEMA_DB "information_schema"
#define TSDB_INS_TABLE_DNODES "ins_dnodes"
#define TSDB_INS_TABLE_MNODES "ins_mnodes"
...
...
include/libs/qcom/query.h
浏览文件 @
d6dd087f
...
...
@@ -26,6 +26,7 @@ extern "C" {
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "systable.h"
typedef
enum
{
JOB_TASK_STATUS_NULL
=
0
,
...
...
@@ -302,9 +303,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define REQUEST_TOTAL_EXEC_TIMES 2
#define IS_SYS_DBNAME(_dbname) \
(((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || \
((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
#define IS_INFORMATION_SCHEMA_DB(_name) ((*(_name) == 'i') && (0 == strcmp(_name, TSDB_INFORMATION_SCHEMA_DB)))
#define IS_PERFORMANCE_SCHEMA_DB(_name) ((*(_name) == 'p') && (0 == strcmp(_name, TSDB_PERFORMANCE_SCHEMA_DB)))
#define IS_SYS_DBNAME(_dbname) (IS_INFORMATION_SCHEMA_DB(_dbname) || IS_PERFORMANCE_SCHEMA_DB(_dbname))
#define qFatal(...) \
do { \
...
...
include/libs/wal/wal.h
浏览文件 @
d6dd087f
...
...
@@ -66,6 +66,7 @@ typedef struct {
int64_t
commitVer
;
int64_t
appliedVer
;
int64_t
lastVer
;
int64_t
logRetention
;
}
SWalVer
;
#pragma pack(push, 1)
...
...
@@ -126,7 +127,7 @@ typedef struct SWal {
typedef
struct
{
int64_t
refId
;
int64_t
refVer
;
int64_t
refFile
;
//
int64_t refFile;
SWal
*
pWal
;
}
SWalRef
;
...
...
@@ -180,7 +181,7 @@ void walFsync(SWal *, bool force);
int32_t
walCommit
(
SWal
*
,
int64_t
ver
);
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous logs can be pruned safely
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walBeginSnapshot
(
SWal
*
,
int64_t
ver
,
int64_t
logRetention
);
int32_t
walEndSnapshot
(
SWal
*
);
int32_t
walRestoreFromSnapshot
(
SWal
*
,
int64_t
ver
);
// for tq
...
...
source/client/src/clientHb.c
浏览文件 @
d6dd087f
...
...
@@ -49,6 +49,48 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
hbGenerateVgInfoFromRsp
(
SDBVgInfo
**
pInfo
,
SUseDbRsp
*
rsp
)
{
int32_t
code
=
0
;
SDBVgInfo
*
vgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SDBVgInfo
));
if
(
NULL
==
vgInfo
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
vgInfo
->
vgVersion
=
rsp
->
vgVersion
;
vgInfo
->
stateTs
=
rsp
->
stateTs
;
vgInfo
->
hashMethod
=
rsp
->
hashMethod
;
vgInfo
->
hashPrefix
=
rsp
->
hashPrefix
;
vgInfo
->
hashSuffix
=
rsp
->
hashSuffix
;
vgInfo
->
vgHash
=
taosHashInit
(
rsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
vgInfo
->
vgHash
)
{
taosMemoryFree
(
vgInfo
);
tscError
(
"hash init[%d] failed"
,
rsp
->
vgNum
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_return
;
}
for
(
int32_t
j
=
0
;
j
<
rsp
->
vgNum
;
++
j
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
rsp
->
pVgroupInfos
,
j
);
if
(
taosHashPut
(
vgInfo
->
vgHash
,
&
pInfo
->
vgId
,
sizeof
(
int32_t
),
pInfo
,
sizeof
(
SVgroupInfo
))
!=
0
)
{
tscError
(
"hash push failed, errno:%d"
,
errno
);
taosHashCleanup
(
vgInfo
->
vgHash
);
taosMemoryFree
(
vgInfo
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_return
;
}
}
_return:
if
(
code
)
{
taosHashCleanup
(
vgInfo
->
vgHash
);
taosMemoryFreeClear
(
vgInfo
);
}
*
pInfo
=
vgInfo
;
return
code
;
}
static
int32_t
hbProcessDBInfoRsp
(
void
*
value
,
int32_t
valueLen
,
struct
SCatalog
*
pCatalog
)
{
int32_t
code
=
0
;
...
...
@@ -67,37 +109,22 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if
(
rsp
->
vgVersion
<
0
)
{
code
=
catalogRemoveDB
(
pCatalog
,
rsp
->
db
,
rsp
->
uid
);
}
else
{
SDBVgInfo
*
vgInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SDBVgInfo
))
;
if
(
NULL
==
vgInfo
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
SDBVgInfo
*
vgInfo
=
NULL
;
code
=
hbGenerateVgInfoFromRsp
(
&
vgInfo
,
rsp
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
goto
_return
;
}
vgInfo
->
vgVersion
=
rsp
->
vgVersion
;
vgInfo
->
stateTs
=
rsp
->
stateTs
;
vgInfo
->
hashMethod
=
rsp
->
hashMethod
;
vgInfo
->
hashPrefix
=
rsp
->
hashPrefix
;
vgInfo
->
hashSuffix
=
rsp
->
hashSuffix
;
vgInfo
->
vgHash
=
taosHashInit
(
rsp
->
vgNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
vgInfo
->
vgHash
)
{
taosMemoryFree
(
vgInfo
);
tscError
(
"hash init[%d] failed"
,
rsp
->
vgNum
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_return
;
}
catalogUpdateDBVgInfo
(
pCatalog
,
rsp
->
db
,
rsp
->
uid
,
vgInfo
);
for
(
int32_t
j
=
0
;
j
<
rsp
->
vgNum
;
++
j
)
{
SVgroupInfo
*
pInfo
=
taosArrayGet
(
rsp
->
pVgroupInfos
,
j
);
if
(
taosHashPut
(
vgInfo
->
vgHash
,
&
pInfo
->
vgId
,
sizeof
(
int32_t
),
pInfo
,
sizeof
(
SVgroupInfo
))
!=
0
)
{
tscError
(
"hash push failed, errno:%d"
,
errno
);
taosHashCleanup
(
vgInfo
->
vgHash
);
taosMemoryFree
(
vgInfo
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
if
(
IS_SYS_DBNAME
(
rsp
->
db
))
{
code
=
hbGenerateVgInfoFromRsp
(
&
vgInfo
,
rsp
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
goto
_return
;
}
}
catalogUpdateDBVgInfo
(
pCatalog
,
rsp
->
db
,
rsp
->
uid
,
vgInfo
);
catalogUpdateDBVgInfo
(
pCatalog
,
(
rsp
->
db
[
0
]
==
'i'
)
?
TSDB_PERFORMANCE_SCHEMA_DB
:
TSDB_INFORMATION_SCHEMA_DB
,
rsp
->
uid
,
vgInfo
);
}
}
if
(
code
)
{
...
...
@@ -491,6 +518,9 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
for
(
int32_t
i
=
0
;
i
<
dbNum
;
++
i
)
{
SDbVgVersion
*
db
=
&
dbs
[
i
];
tscDebug
(
"the %dth expired dbFName:%s, dbId:%"
PRId64
", vgVersion:%d, numOfTable:%d, startTs:%"
PRId64
,
i
,
db
->
dbFName
,
db
->
dbId
,
db
->
vgVersion
,
db
->
numOfTable
,
db
->
stateTs
);
db
->
dbId
=
htobe64
(
db
->
dbId
);
db
->
vgVersion
=
htonl
(
db
->
vgVersion
);
db
->
numOfTable
=
htonl
(
db
->
numOfTable
);
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
d6dd087f
...
...
@@ -162,6 +162,22 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
}
else
{
struct
SCatalog
*
pCatalog
=
NULL
;
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
SRequestConnInfo
conn
=
{.
pTrans
=
pTscObj
->
pAppInfo
->
pTransporter
,
.
requestId
=
pRequest
->
requestId
,
.
requestObjRefId
=
pRequest
->
self
,
.
mgmtEps
=
getEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
)};
char
dbFName
[
TSDB_DB_FNAME_LEN
];
snprintf
(
dbFName
,
sizeof
(
dbFName
)
-
1
,
"%d.%s"
,
pTscObj
->
acctId
,
TSDB_INFORMATION_SCHEMA_DB
);
catalogRefreshDBVgInfo
(
pCatalog
,
&
conn
,
dbFName
);
snprintf
(
dbFName
,
sizeof
(
dbFName
)
-
1
,
"%d.%s"
,
pTscObj
->
acctId
,
TSDB_PERFORMANCE_SCHEMA_DB
);
catalogRefreshDBVgInfo
(
pCatalog
,
&
conn
,
dbFName
);
}
}
if
(
pRequest
->
body
.
queryFp
)
{
...
...
source/client/src/clientTmq.c
浏览文件 @
d6dd087f
...
...
@@ -823,7 +823,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t
*
pRefId
=
taosMemoryMalloc
(
sizeof
(
int64_t
));
*
pRefId
=
pTmq
->
refId
;
tscDebug
(
"consumer:0x%"
PRIx64
"
next
retrieve ep from mnode in 1s"
,
pTmq
->
consumerId
);
tscDebug
(
"consumer:0x%"
PRIx64
" retrieve ep from mnode in 1s"
,
pTmq
->
consumerId
);
taosTmrReset
(
tmqAssignAskEpTask
,
1000
,
pRefId
,
tmqMgmt
.
timer
,
&
pTmq
->
epTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__COMMIT
)
{
tmqCommitInner
(
pTmq
,
NULL
,
1
,
1
,
pTmq
->
commitCb
,
pTmq
->
commitCbUserParam
);
...
...
@@ -831,7 +831,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
int64_t
*
pRefId
=
taosMemoryMalloc
(
sizeof
(
int64_t
));
*
pRefId
=
pTmq
->
refId
;
tscDebug
(
"consumer:0x%"
PRIx64
"
next commit to mnode
in %.2fs"
,
pTmq
->
consumerId
,
tscDebug
(
"consumer:0x%"
PRIx64
"
commit to vnode(s)
in %.2fs"
,
pTmq
->
consumerId
,
pTmq
->
autoCommitInterval
/
1000
.
0
);
taosTmrReset
(
tmqAssignDelayedCommitTask
,
pTmq
->
autoCommitInterval
,
pRefId
,
tmqMgmt
.
timer
,
&
pTmq
->
commitTimer
);
}
else
if
(
*
pTaskType
==
TMQ_DELAYED_TASK__REPORT
)
{
...
...
@@ -1575,25 +1575,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
}
void
tmqBuildConsumeReqImpl
(
SMqPollReq
*
pReq
,
tmq_t
*
tmq
,
int64_t
timeout
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
int32_t
groupLen
=
strlen
(
tmq
->
groupId
);
memcpy
(
pReq
->
subKey
,
tmq
->
groupId
,
groupLen
);
pReq
->
subKey
[
groupLen
]
=
TMQ_SEPARATOR
;
strcpy
(
pReq
->
subKey
+
groupLen
+
1
,
pTopic
->
topicName
);
pReq
->
withTbName
=
tmq
->
withTbName
;
pReq
->
timeout
=
timeout
;
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
timeout
=
timeout
;
pReq
->
epoch
=
tmq
->
epoch
;
/*pReq->currentOffset = reqOffset;*/
pReq
->
reqOffset
=
pVg
->
currentOffset
;
pReq
->
reqId
=
generateRequestId
();
pReq
->
useSnapshot
=
tmq
->
useSnapshot
;
pReq
->
head
.
vgId
=
pVg
->
vgId
;
pReq
->
useSnapshot
=
tmq
->
useSnapshot
;
pReq
->
reqId
=
generateRequestId
();
}
SMqMetaRspObj
*
tmqBuildMetaRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
...
...
@@ -1643,6 +1638,76 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
static
int32_t
handleErrorBeforePoll
(
SMqClientVg
*
pVg
,
tmq_t
*
pTmq
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
pTmq
->
rspSem
);
return
-
1
;
}
static
int32_t
doTmqPollImpl
(
tmq_t
*
pTmq
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
,
int64_t
timeout
)
{
SMqPollReq
req
=
{
0
};
tmqBuildConsumeReqImpl
(
&
req
,
pTmq
,
timeout
,
pTopic
,
pVg
);
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
return
handleErrorBeforePoll
(
pVg
,
pTmq
);
}
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
return
handleErrorBeforePoll
(
pVg
,
pTmq
);
}
if
(
tSerializeSMqPollReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
taosMemoryFree
(
msg
);
return
handleErrorBeforePoll
(
pVg
,
pTmq
);
}
SMqPollCbParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqPollCbParam
));
if
(
pParam
==
NULL
)
{
taosMemoryFree
(
msg
);
return
handleErrorBeforePoll
(
pVg
,
pTmq
);
}
pParam
->
refId
=
pTmq
->
refId
;
pParam
->
epoch
=
pTmq
->
epoch
;
pParam
->
pVg
=
pVg
;
// pVg may be released,fix it
pParam
->
pTopic
=
pTopic
;
pParam
->
vgId
=
pVg
->
vgId
;
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
{
taosMemoryFree
(
pParam
);
taosMemoryFree
(
msg
);
return
handleErrorBeforePoll
(
pVg
,
pTmq
);
}
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
msg
,
.
len
=
msgSize
,
.
handle
=
NULL
,
};
sendInfo
->
requestId
=
req
.
reqId
;
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqPollCb
;
sendInfo
->
msgType
=
TDMT_VND_TMQ_CONSUME
;
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
80
];
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%"
PRIx64
,
pTmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pTmq
->
epoch
,
offsetFormatBuf
,
req
.
reqId
);
asyncSendMsgToServer
(
pTmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
pTmq
->
pollCnt
++
;
return
TSDB_CODE_SUCCESS
;
}
// broadcast the poll request to all related vnodes
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
...
...
@@ -1650,7 +1715,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for
(
int
i
=
0
;
i
<
numOfTopics
;
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pTopic
->
vgs
);
j
++
)
{
int32_t
numOfVg
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int
j
=
0
;
j
<
numOfVg
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
int32_t
vgStatus
=
atomic_val_compare_exchange_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
,
TMQ_VG_STATUS__WAIT
);
if
(
vgStatus
==
TMQ_VG_STATUS__WAIT
)
{
...
...
@@ -1669,77 +1736,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
SMqPollReq
req
=
{
0
};
tmqBuildConsumeReqImpl
(
&
req
,
tmq
,
timeout
,
pTopic
,
pVg
);
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
char
*
msg
=
taosMemoryCalloc
(
1
,
msgSize
);
if
(
NULL
==
msg
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
if
(
tSerializeSMqPollReq
(
msg
,
msgSize
,
&
req
)
<
0
)
{
taosMemoryFree
(
msg
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
SMqPollCbParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqPollCbParam
));
if
(
pParam
==
NULL
)
{
taosMemoryFree
(
msg
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
pParam
->
refId
=
tmq
->
refId
;
pParam
->
epoch
=
tmq
->
epoch
;
pParam
->
pVg
=
pVg
;
pParam
->
pTopic
=
pTopic
;
pParam
->
vgId
=
pVg
->
vgId
;
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
if
(
sendInfo
==
NULL
)
{
taosMemoryFree
(
msg
);
taosMemoryFree
(
pParam
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
int32_t
code
=
doTmqPollImpl
(
tmq
,
pTopic
,
pVg
,
timeout
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
msg
,
.
len
=
msgSize
,
.
handle
=
NULL
,
};
sendInfo
->
requestId
=
req
.
reqId
;
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqPollCb
;
sendInfo
->
msgType
=
TDMT_VND_TMQ_CONSUME
;
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
80
];
tFormatOffset
(
offsetFormatBuf
,
80
,
&
pVg
->
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
offsetFormatBuf
,
req
.
reqId
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
tmq
->
pollCnt
++
;
}
}
...
...
source/dnode/mnode/impl/inc/mndTopic.h
浏览文件 @
d6dd087f
...
...
@@ -27,14 +27,9 @@ void mndCleanupTopic(SMnode *pMnode);
SMqTopicObj
*
mndAcquireTopic
(
SMnode
*
pMnode
,
const
char
*
topicName
);
void
mndReleaseTopic
(
SMnode
*
pMnode
,
SMqTopicObj
*
pTopic
);
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
);
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndDropTopicByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
);
int32_t
mndCheckTopicExist
(
SMnode
*
pMnode
,
SDbObj
*
pDb
);
const
char
*
mndTopicGetShowName
(
const
char
topic
[
TSDB_TOPIC_FNAME_LEN
]);
int32_t
mndDropTopicByDB
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
);
bool
mndTopicExistsForDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
);
const
char
*
mndTopicGetShowName
(
const
char
topic
[
TSDB_TOPIC_FNAME_LEN
]);
int32_t
mndSetTopicCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqTopicObj
*
pTopic
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
d6dd087f
...
...
@@ -259,8 +259,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
int32_t
hbStatus
=
atomic_add_fetch_32
(
&
pConsumer
->
hbStatus
,
1
);
int32_t
status
=
atomic_load_32
(
&
pConsumer
->
status
);
mDebug
(
"check for consumer:0x%"
PRIx64
" status:%d(%s), sub-time:%"
PRId64
", uptime:%"
PRId64
,
pConsumer
->
consumerId
,
status
,
mndConsumerStatusName
(
status
),
pConsumer
->
subscribeTime
,
pConsumer
->
upTime
);
mDebug
(
"check for consumer:0x%"
PRIx64
" status:%d(%s), sub-time:%"
PRId64
", uptime:%"
PRId64
", hbstatus:%d"
,
pConsumer
->
consumerId
,
status
,
mndConsumerStatusName
(
status
),
pConsumer
->
subscribeTime
,
pConsumer
->
upTime
,
hbStatus
);
if
(
status
==
MQ_CONSUMER_STATUS__READY
)
{
if
(
hbStatus
>
MND_CONSUMER_LOST_HB_CNT
)
{
...
...
@@ -272,6 +273,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
.
pCont
=
pLostMsg
,
.
contLen
=
sizeof
(
SMqConsumerLostMsg
),
};
tmsgPutToQueue
(
&
pMnode
->
msgCb
,
WRITE_QUEUE
,
&
rpcMsg
);
}
}
else
if
(
status
==
MQ_CONSUMER_STATUS__LOST_REBD
)
{
...
...
@@ -285,6 +287,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
.
pCont
=
pClearMsg
,
.
contLen
=
sizeof
(
SMqConsumerClearMsg
),
};
tmsgPutToQueue
(
&
pMnode
->
msgCb
,
WRITE_QUEUE
,
&
rpcMsg
);
}
}
else
if
(
status
==
MQ_CONSUMER_STATUS__LOST
)
{
...
...
@@ -664,7 +667,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
else
{
char
*
oldTopic
=
taosArrayGetP
(
pConsumerOld
->
currentTopics
,
i
);
char
*
newTopic
=
taosArrayGetP
(
newSub
,
j
);
int
comp
=
compareLenPrefixedStr
(
oldTopic
,
newTopic
);
int
comp
=
strcmp
(
oldTopic
,
newTopic
);
if
(
comp
==
0
)
{
i
++
;
j
++
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
d6dd087f
...
...
@@ -1039,13 +1039,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_DB
,
pReq
,
"drop-db"
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
if
(
pTrans
==
NULL
)
{
goto
_OVER
;
}
mInfo
(
"trans:%d start to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
mInfo
(
"trans:%d, used to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
mndTransSetDbName
(
pTrans
,
pDb
->
name
,
NULL
);
if
(
mndTrancCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndCheckTopicExist
(
pMnode
,
pDb
)
<
0
)
goto
_OVER
;
if
(
mndTrancCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
goto
_OVER
;
}
if
(
mndTopicExistsForDb
(
pMnode
,
pDb
))
{
terrno
=
TSDB_CODE_MND_TOPIC_MUST_BE_DELETED
;
goto
_OVER
;
}
if
(
mndSetDropDbRedoLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropDbCommitLogs
(
pMnode
,
pTrans
,
pDb
)
!=
0
)
goto
_OVER
;
...
...
@@ -1097,10 +1107,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
}
code
=
mndDropDb
(
pMnode
,
pReq
,
pDb
);
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
}
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to drop since %s"
,
dropReq
.
db
,
terrstr
());
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
d6dd087f
...
...
@@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
tlen
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
d6dd087f
...
...
@@ -31,6 +31,9 @@
#define MND_TOPIC_VER_NUMBER 2
#define MND_TOPIC_RESERVE_SIZE 64
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
);
SSdbRow
*
mndTopicActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
pOldTopic
,
SMqTopicObj
*
pNewTopic
);
...
...
@@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if
(
pTopic
->
physicalPlan
)
{
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
}
int32_t
schemaLen
=
0
;
if
(
pTopic
->
schema
.
nCols
)
{
schemaLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
...
...
@@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
physicalPlanLen
+
pTopic
->
sqlLen
+
pTopic
->
astLen
+
schemaLen
+
ntbColLen
+
MND_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
if
(
pRaw
==
NULL
)
{
goto
TOPIC_ENCODE_OVER
;
}
int32_t
dataPos
=
0
;
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
name
,
TSDB_TOPIC_FNAME_LEN
,
TOPIC_ENCODE_OVER
);
...
...
@@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
if
(
pTopic
->
astLen
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
ast
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
}
...
...
@@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper
(
&
aswBuf
,
&
pTopic
->
schema
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
swBuf
,
schemaLen
,
TOPIC_ENCODE_OVER
);
}
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
ntbUid
,
TOPIC_ENCODE_OVER
);
if
(
pTopic
->
ntbUid
!=
0
)
{
int32_t
sz
=
taosArrayGetSize
(
pTopic
->
ntbColIds
);
...
...
@@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT16
(
pRaw
,
dataPos
,
colId
,
TOPIC_ENCODE_OVER
);
}
}
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
ctbStbUid
,
TOPIC_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
...
...
@@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
colId
,
TOPIC_DECODE_OVER
);
taosArrayPush
(
pTopic
->
ntbColIds
,
&
colId
);
}
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
ctbStbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
ctbStbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
TOPIC_DECODE_OVER:
...
...
@@ -266,12 +274,12 @@ TOPIC_DECODE_OVER:
}
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
)
{
mTrace
(
"topic:%s
,
perform insert action"
,
pTopic
->
name
);
mTrace
(
"topic:%s perform insert action"
,
pTopic
->
name
);
return
0
;
}
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
)
{
mTrace
(
"topic:%s
,
perform delete action"
,
pTopic
->
name
);
mTrace
(
"topic:%s perform delete action"
,
pTopic
->
name
);
taosMemoryFreeClear
(
pTopic
->
sql
);
taosMemoryFreeClear
(
pTopic
->
ast
);
taosMemoryFreeClear
(
pTopic
->
physicalPlan
);
...
...
@@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
}
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
pOldTopic
,
SMqTopicObj
*
pNewTopic
)
{
mTrace
(
"topic:%s
,
perform update action"
,
pOldTopic
->
name
);
mTrace
(
"topic:%s perform update action"
,
pOldTopic
->
name
);
atomic_exchange_64
(
&
pOldTopic
->
updateTime
,
pNewTopic
->
updateTime
);
atomic_exchange_32
(
&
pOldTopic
->
version
,
pNewTopic
->
version
);
...
...
@@ -364,7 +372,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
,
const
char
*
userName
)
{
mInfo
(
"topic:%s to create"
,
pCreate
->
name
);
mInfo
(
"start to create topic:%s"
,
pCreate
->
name
);
SMqTopicObj
topicObj
=
{
0
};
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
topicObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
...
...
@@ -383,19 +392,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
topicObj
.
sqlLen
=
strlen
(
pCreate
->
sql
)
+
1
;
topicObj
.
subType
=
pCreate
->
subType
;
topicObj
.
withMeta
=
pCreate
->
withMeta
;
if
(
topicObj
.
withMeta
)
{
if
(
topicObj
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
pCreate
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
if
(
pCreate
->
withMeta
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
if
(
pCreate
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
topicObj
.
ast
=
taosStrdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
qDebugL
(
"
ast %s"
,
topicObj
.
ast
);
qDebugL
(
"
topic:%s ast %s"
,
topicObj
.
name
,
topicObj
.
ast
);
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
!=
0
)
{
...
...
@@ -409,18 +417,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
SPlanContext
cxt
=
{.
pAstRoot
=
pAst
,
.
topicQuery
=
true
};
if
(
qCreateQueryPlan
(
&
cxt
,
&
pPlan
,
NULL
)
!=
0
)
{
mError
(
"
topic:%s, failed to create
since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"
failed to create topic:%s
since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFree
(
topicObj
.
ast
);
taosMemoryFree
(
topicObj
.
sql
);
return
-
1
;
}
int64_t
ntbUid
;
topicObj
.
ntbColIds
=
taosArrayInit
(
0
,
sizeof
(
int16_t
));
if
(
topicObj
.
ntbColIds
==
NULL
)
{
taosMemoryFree
(
topicObj
.
ast
);
taosMemoryFree
(
topicObj
.
sql
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
extractTopicTbInfo
(
pAst
,
&
topicObj
);
if
(
topicObj
.
ntbUid
==
0
)
{
...
...
@@ -449,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
return
-
1
;
}
topicObj
.
stbUid
=
pStb
->
uid
;
mndReleaseStb
(
pMnode
,
pStb
);
}
...
...
@@ -467,7 +478,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFreeClear
(
topicObj
.
physicalPlan
);
return
-
1
;
}
mInfo
(
"trans:%d, used to create topic:%s"
,
pTrans
->
id
,
pCreate
->
name
);
mInfo
(
"trans:%d to create topic:%s"
,
pTrans
->
id
,
pCreate
->
name
);
SSdbRaw
*
pCommitRaw
=
mndTopicActionEncode
(
&
topicObj
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
{
...
...
@@ -476,6 +488,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
mndTransDrop
(
pTrans
);
return
-
1
;
}
(
void
)
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
);
if
(
topicObj
.
ntbUid
!=
0
)
{
...
...
@@ -544,7 +557,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
taosMemoryFreeClear
(
topicObj
.
sql
);
taosMemoryFreeClear
(
topicObj
.
ast
);
taosArrayDestroy
(
topicObj
.
ntbColIds
);
if
(
topicObj
.
schema
.
nCols
)
taosMemoryFreeClear
(
topicObj
.
schema
.
pSchema
);
if
(
topicObj
.
schema
.
nCols
)
{
taosMemoryFreeClear
(
topicObj
.
schema
.
pSchema
);
}
mndTransDrop
(
pTrans
);
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
...
...
@@ -589,11 +606,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
}
code
=
mndCreateTopic
(
pMnode
,
pReq
,
&
createTopicReq
,
pDb
,
pReq
->
info
.
conn
.
user
);
if
(
code
==
0
)
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
if
(
code
==
0
)
{
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
}
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mError
(
"
topic:%s, failed to create
since %s"
,
createTopicReq
.
name
,
terrstr
());
mError
(
"
failed to create topic:%s
since %s"
,
createTopicReq
.
name
,
terrstr
());
}
mndReleaseTopic
(
pMnode
,
pTopic
);
...
...
@@ -605,13 +624,18 @@ _OVER:
static
int32_t
mndDropTopic
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SRpcMsg
*
pReq
,
SMqTopicObj
*
pTopic
)
{
int32_t
code
=
-
1
;
if
(
mndUserRemoveTopic
(
pMnode
,
pTrans
,
pTopic
->
name
)
!=
0
)
goto
_OVER
;
if
(
mndUserRemoveTopic
(
pMnode
,
pTrans
,
pTopic
->
name
)
!=
0
)
{
goto
_OVER
;
}
SSdbRaw
*
pCommitRaw
=
mndTopicActionEncode
(
pTopic
);
if
(
pCommitRaw
==
NULL
||
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
goto
_OVER
;
(
void
)
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
);
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
goto
_OVER
;
}
code
=
0
;
_OVER:
...
...
@@ -650,7 +674,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
SMqConsumerObj
*
pConsumer
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
if
(
pConsumer
->
status
==
MQ_CONSUMER_STATUS__LOST_REBD
)
continue
;
...
...
@@ -661,8 +687,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
mndReleaseTopic
(
pMnode
,
pTopic
);
terrno
=
TSDB_CODE_MND_TOPIC_SUBSCRIBED
;
mError
(
"topic:%s, failed to drop since subscribed by consumer:
%"
PRId64
", in consumer group %s"
,
dropReq
.
name
,
pConsumer
->
consumerId
,
pConsumer
->
cgroup
);
mError
(
"topic:%s, failed to drop since subscribed by consumer:
0x%"
PRIx64
", in consumer group %s"
,
dropReq
.
name
,
pConsumer
->
consumerId
,
pConsumer
->
cgroup
);
return
-
1
;
}
}
...
...
@@ -768,6 +794,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
static
int32_t
mndGetNumOfTopics
(
SMnode
*
pMnode
,
char
*
dbName
,
int32_t
*
pNumOfTopics
)
{
*
pNumOfTopics
=
0
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
dbName
);
if
(
pDb
==
NULL
)
{
...
...
@@ -780,7 +808,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
while
(
1
)
{
SMqTopicObj
*
pTopic
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pIter
,
(
void
**
)
&
pTopic
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
if
(
pTopic
->
dbUid
==
pDb
->
uid
)
{
numOfTopics
++
;
...
...
@@ -808,11 +838,10 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
SName
n
;
int32_t
cols
=
0
;
char
topicName
[
TSDB_TOPIC_NAME_LEN
+
VARSTR_HEADER_SIZE
+
5
]
=
{
0
};
tstrncpy
(
varDataVal
(
topicName
),
mndGetDbStr
(
pTopic
->
name
),
sizeof
(
topicName
)
-
2
);
/*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
/*tNameGetDbName(&n, varDataVal(topicName));*/
varDataSetLen
(
topicName
,
strlen
(
varDataVal
(
topicName
)));
char
topicName
[
TSDB_TOPIC_NAME_LEN
+
VARSTR_HEADER_SIZE
+
5
]
=
{
0
};
const
char
*
pName
=
mndGetDbStr
(
pTopic
->
name
);
STR_TO_VARSTR
(
topicName
,
pName
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
topicName
,
false
);
...
...
@@ -820,6 +849,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
tNameFromString
(
&
n
,
pTopic
->
db
,
T_NAME_ACCT
|
T_NAME_DB
);
tNameGetDbName
(
&
n
,
varDataVal
(
dbName
));
varDataSetLen
(
dbName
,
strlen
(
varDataVal
(
dbName
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
dbName
,
false
);
...
...
@@ -827,8 +857,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTopic
->
createTime
,
false
);
char
sql
[
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
&
sql
[
VARSTR_HEADER_SIZE
],
pTopic
->
sql
,
TSDB_SHOW_SQL_LEN
);
varDataSetLen
(
sql
,
strlen
(
&
sql
[
VARSTR_HEADER_SIZE
]));
STR_TO_VARSTR
(
sql
,
pTopic
->
sql
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
sql
,
false
);
...
...
@@ -863,24 +893,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
sdbCancelFetch
(
pSdb
,
pIter
);
}
int32_t
mndCheckTopicExist
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
bool
mndTopicExistsForDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
SMqTopicObj
*
pTopic
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_TOPIC
,
pIter
,
(
void
**
)
&
pTopic
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
if
(
pTopic
->
dbUid
==
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pTopic
);
terrno
=
TSDB_CODE_MND_TOPIC_MUST_BE_DELETED
;
return
-
1
;
return
true
;
}
sdbRelease
(
pSdb
,
pTopic
);
}
return
0
;
return
false
;
}
#if 0
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
d6dd087f
...
...
@@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_USER
,
pIter
,
(
void
**
)
&
pUser
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
code
=
-
1
;
if
(
mndUserDupObj
(
pUser
,
&
newUser
)
!=
0
)
break
;
if
(
mndUserDupObj
(
pUser
,
&
newUser
)
!=
0
)
{
break
;
}
bool
inTopic
=
(
taosHashGet
(
newUser
.
topics
,
topic
,
len
)
!=
NULL
);
if
(
inTopic
)
{
...
...
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
d6dd087f
...
...
@@ -472,7 +472,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
taosThreadMutexLock
(
&
pSdb
->
filelock
);
if
(
pSdb
->
pWal
!=
NULL
)
{
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
// code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex
, 0
);
if
(
pSdb
->
sync
==
0
)
{
code
=
0
;
}
else
{
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d6dd087f
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
d6dd087f
...
...
@@ -355,11 +355,13 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t ver) {
pReader->pMsg = pMsg;
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
if (pReader->pBlock == NULL) break;
}
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
// while (true) {
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
// pReader->msgIter.len, pReader->msgIter.uid);
// if (pReader->pBlock == NULL) break;
// }
if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
pReader->ver = ver;
...
...
source/libs/executor/src/executor.c
浏览文件 @
d6dd087f
...
...
@@ -110,7 +110,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator
->
status
=
OP_NOT_OPENED
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
qDebug
(
"stream set total blocks:%d, task id:%s"
PRIx64
,
(
int32_t
)
numOfBlocks
,
id
);
ASSERT
(
pInfo
->
validBlockIndex
==
0
);
ASSERT
(
taosArrayGetSize
(
pInfo
->
pBlockLists
)
==
0
);
...
...
@@ -1064,18 +1064,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %" PRId64 " actual ver %" PRId64 ", last %" PRId64, pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if
(
tqSeekVer
(
pInfo
->
tqReader
,
pOffset
->
version
+
1
)
<
0
)
{
return
-
1
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pOffset
->
version
+
1
);
}
else
if
(
pOffset
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
)
{
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t
uid
=
pOffset
->
uid
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d6dd087f
...
...
@@ -1197,13 +1197,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
releaseBufPage
(
pBuf
,
page
);
if
(
pBlock
->
info
.
rows
<=
0
||
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
q
Error
(
"error in copy data to ssdatablock, existed rows in block:%d, rows in pRow:%d, capacity:%d, %s"
,
pBlock
->
info
.
rows
,
pRow
->
numOfRows
,
pBlock
->
info
.
capacity
,
GET_TASKID
(
pTaskInfo
));
T_LONG_JMP
(
pTaskInfo
->
env
,
TSDB_CODE_APP_ERROR
);
// expand the result datablock capacity
if
(
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
blockDataEnsureCapacity
(
pBlock
,
pRow
->
numOfRows
);
q
Debug
(
"datablock capacity not sufficient, expand to requried:%d, current capacity:%d, %s"
,
pRow
->
numOfRows
,
pBlock
->
info
.
capacity
,
GET_TASKID
(
pTaskInfo
));
// todo set the pOperator->resultInfo size
}
else
{
releaseBufPage
(
pBuf
,
page
);
break
;
}
}
...
...
source/libs/executor/src/filloperator.c
浏览文件 @
d6dd087f
...
...
@@ -924,6 +924,7 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
static
void
keepResultInDiscBuf
(
SOperatorInfo
*
pOperator
,
uint64_t
groupId
,
SResultRowData
*
pRow
,
int32_t
len
)
{
SWinKey
key
=
{.
groupId
=
groupId
,
.
ts
=
pRow
->
key
};
int32_t
code
=
streamStateFillPut
(
pOperator
->
pTaskInfo
->
streamInfo
.
pState
,
&
key
,
pRow
->
pRowVal
,
len
);
qDebug
(
"===stream===fill operator save key ts:%"
PRId64
" group id:%"
PRIu64
" code:%d"
,
key
.
ts
,
key
.
groupId
,
code
);
ASSERT
(
code
==
TSDB_CODE_SUCCESS
);
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
d6dd087f
...
...
@@ -1117,11 +1117,8 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
p
TaskInfo
->
code
=
p
rojectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
pInfo
->
scalarSup
.
numOfExprs
,
NULL
);
if
(
pTaskInfo
->
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
pTaskInfo
->
code
);
}
}
taosHashClear
(
pInfo
->
pPartitions
);
doStreamHashPartitionImpl
(
pInfo
,
pBlock
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
d6dd087f
...
...
@@ -1623,7 +1623,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog
(
&
pTaskInfo
->
streamInfo
.
lastStatus
,
pTaskInfo
->
streamInfo
.
snapshotVer
);
return
NULL
;
}
ASSERT
(
pInfo
->
tqReader
->
pWalReader
->
curVersion
==
pTaskInfo
->
streamInfo
.
snapshotVer
+
1
);
}
else
{
return
NULL
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
d6dd087f
...
...
@@ -1502,16 +1502,16 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
code
=
streamStateGetKVByCur
(
pCur
,
&
tmpKey
,
NULL
,
0
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
STimeWindow
tw
=
getFinalTimeWindow
(
tmpKey
.
ts
,
pInterval
);
qDebug
(
"===stream===error stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
d
64
",mark %"
PRId64
,
tw
.
skey
,
qDebug
(
"===stream===error stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
u
64
",mark %"
PRId64
,
tw
.
skey
,
tw
.
ekey
,
tmpKey
.
groupId
,
mark
);
}
else
{
STimeWindow
tw
=
getFinalTimeWindow
(
key
->
ts
,
pInterval
);
qDebug
(
"===stream===stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
d
64
",mark %"
PRId64
,
tw
.
skey
,
tw
.
ekey
,
qDebug
(
"===stream===stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
u
64
",mark %"
PRId64
,
tw
.
skey
,
tw
.
ekey
,
key
->
groupId
,
mark
);
}
}
else
{
STimeWindow
tw
=
getFinalTimeWindow
(
key
->
ts
,
pInterval
);
qDebug
(
"===stream===stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
d
64
",mark %"
PRId64
,
tw
.
skey
,
tw
.
ekey
,
qDebug
(
"===stream===stream state first key:%"
PRId64
"-%"
PRId64
",%"
PRI
u
64
",mark %"
PRId64
,
tw
.
skey
,
tw
.
ekey
,
key
->
groupId
,
mark
);
}
streamStateFreeCur
(
pCur
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
d6dd087f
...
...
@@ -5092,14 +5092,24 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_MODIFY_COL
);
}
if
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
==
pStmt
->
alterType
&&
pTableMeta
->
tableInfo
.
rowSize
+
calcTypeBytes
(
pStmt
->
dataType
)
-
pSchema
->
bytes
>
TSDB_MAX_BYTES_PER_ROW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
if
(
TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
==
pStmt
->
alterType
)
{
if
(
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_FIELD_LEN
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
if
(
pTableMeta
->
tableInfo
.
rowSize
+
calcTypeBytes
(
pStmt
->
dataType
)
-
pSchema
->
bytes
>
TSDB_MAX_BYTES_PER_ROW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_ROW_LENGTH
,
TSDB_MAX_BYTES_PER_ROW
);
}
}
if
(
TSDB_ALTER_TABLE_UPDATE_TAG_BYTES
==
pStmt
->
alterType
&&
tagsLen
+
calcTypeBytes
(
pStmt
->
dataType
)
-
pSchema
->
bytes
>
TSDB_MAX_TAGS_LEN
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_TAGS_LENGTH
,
TSDB_MAX_TAGS_LEN
);
if
(
TSDB_ALTER_TABLE_UPDATE_TAG_BYTES
==
pStmt
->
alterType
)
{
if
(
calcTypeBytes
(
pStmt
->
dataType
)
>
TSDB_MAX_FIELD_LEN
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
);
}
if
(
tagsLen
+
calcTypeBytes
(
pStmt
->
dataType
)
-
pSchema
->
bytes
>
TSDB_MAX_TAGS_LEN
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_TAGS_LENGTH
,
TSDB_MAX_TAGS_LEN
);
}
}
}
...
...
source/libs/scalar/src/filter.c
浏览文件 @
d6dd087f
...
...
@@ -15,7 +15,7 @@
#include <tlog.h>
#include "os.h"
#include "thash.h"
//#include "queryLog.h"
//
#include "queryLog.h"
#include "filter.h"
#include "filterInt.h"
#include "functionMgt.h"
...
...
@@ -123,36 +123,16 @@ int8_t filterGetRangeCompFuncFromOptrs(uint8_t optr, uint8_t optr2) {
return
-
1
;
}
__compar_fn_t
gDataCompare
[]
=
{
compareInt32Val
,
compareInt8Val
,
compareInt16Val
,
compareInt64Val
,
compareFloatVal
,
compareDoubleVal
,
compareLenPrefixedStr
,
comparestrPatternMatch
,
compareChkInString
,
comparewcsPatternMatch
,
compareLenPrefixedWStr
,
compareUint8Val
,
compareUint16Val
,
compareUint32Val
,
compareUint64Val
,
setChkInBytes1
,
setChkInBytes2
,
setChkInBytes4
,
setChkInBytes8
,
comparestrRegexMatch
,
comparestrRegexNMatch
,
setChkNotInBytes1
,
setChkNotInBytes2
,
setChkNotInBytes4
,
setChkNotInBytes8
,
compareChkNotInString
,
comparestrPatternNMatch
,
comparewcsPatternNMatch
,
comparewcsRegexMatch
,
comparewcsRegexNMatch
,};
__compar_fn_t
gDataCompare
[]
=
{
compareInt32Val
,
compareInt8Val
,
compareInt16Val
,
compareInt64Val
,
compareFloatVal
,
compareDoubleVal
,
compareLenPrefixedStr
,
comparestrPatternMatch
,
compareChkInString
,
comparewcsPatternMatch
,
compareLenPrefixedWStr
,
compareUint8Val
,
compareUint16Val
,
compareUint32Val
,
compareUint64Val
,
setChkInBytes1
,
setChkInBytes2
,
setChkInBytes4
,
setChkInBytes8
,
comparestrRegexMatch
,
comparestrRegexNMatch
,
setChkNotInBytes1
,
setChkNotInBytes2
,
setChkNotInBytes4
,
setChkNotInBytes8
,
compareChkNotInString
,
comparestrPatternNMatch
,
comparewcsPatternNMatch
,
comparewcsRegexMatch
,
comparewcsRegexNMatch
,
};
__compar_fn_t
gInt8SignCompare
[]
=
{
compareInt8Val
,
compareInt8Int16
,
compareInt8Int32
,
compareInt8Int64
,
compareInt8Float
,
compareInt8Double
};
...
...
@@ -341,7 +321,7 @@ __compar_fn_t filterGetCompFuncEx(int32_t lType, int32_t rType, int32_t optr) {
if
(
TSDB_DATA_TYPE_NULL
==
rType
||
TSDB_DATA_TYPE_JSON
==
rType
)
{
return
NULL
;
}
switch
(
lType
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
if
(
IS_SIGNED_NUMERIC_TYPE
(
rType
)
||
IS_FLOAT_TYPE
(
rType
))
{
...
...
@@ -519,7 +499,7 @@ int32_t filterReuseRangeCtx(SFilterRangeCtx *ctx, int32_t type, int32_t options)
int32_t
filterConvertRange
(
SFilterRangeCtx
*
cur
,
SFilterRange
*
ra
,
bool
*
notNull
)
{
int64_t
tmp
=
0
;
if
(
!
FILTER_GET_FLAG
(
ra
->
sflag
,
RANGE_FLG_NULL
))
{
int32_t
sr
=
cur
->
pCompareFunc
(
&
ra
->
s
,
getDataMin
(
cur
->
type
,
&
tmp
));
if
(
sr
==
0
)
{
...
...
@@ -704,7 +684,7 @@ int32_t filterAddRangeImpl(void *h, SFilterRange *ra, int32_t optr) {
int32_t
filterAddRange
(
void
*
h
,
SFilterRange
*
ra
,
int32_t
optr
)
{
SFilterRangeCtx
*
ctx
=
(
SFilterRangeCtx
*
)
h
;
int64_t
tmp
=
0
;
int64_t
tmp
=
0
;
if
(
FILTER_GET_FLAG
(
ra
->
sflag
,
RANGE_FLG_NULL
))
{
SIMPLE_COPY_VALUES
(
&
ra
->
s
,
getDataMin
(
ctx
->
type
,
&
tmp
));
...
...
@@ -991,7 +971,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
bool
freeIfExists
,
int16_t
*
srcFlag
)
{
int32_t
idx
=
-
1
;
uint32_t
*
num
;
bool
sameBuf
=
false
;
bool
sameBuf
=
false
;
num
=
&
info
->
fields
[
type
].
num
;
...
...
@@ -1251,13 +1231,13 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u
SFilterField
*
t
=
FILTER_UNIT_LEFT_FIELD
(
src
,
u
);
if
(
u
->
right
.
type
==
FLD_TYPE_VALUE
)
{
void
*
data
=
FILTER_UNIT_VAL_DATA
(
src
,
u
);
void
*
data
=
FILTER_UNIT_VAL_DATA
(
src
,
u
);
SFilterField
*
rField
=
FILTER_UNIT_RIGHT_FIELD
(
src
,
u
);
if
(
IS_VAR_DATA_TYPE
(
type
))
{
if
(
FILTER_UNIT_OPTR
(
u
)
==
OP_TYPE_IN
)
{
filterAddField
(
dst
,
NULL
,
&
data
,
FLD_TYPE_VALUE
,
&
right
,
POINTER_BYTES
,
false
,
&
rField
->
flag
);
// POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
filterAddField
(
dst
,
NULL
,
&
data
,
FLD_TYPE_VALUE
,
&
right
,
POINTER_BYTES
,
false
,
&
rField
->
flag
);
// POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
t
=
FILTER_GET_FIELD
(
dst
,
right
);
FILTER_SET_FLAG
(
t
->
flag
,
FLD_DATA_IS_HASH
);
...
...
@@ -3769,31 +3749,31 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
return
DEAL_RES_CONTINUE
;
}
/*
if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
return DEAL_RES_CONTINUE;
}
/*
if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
return DEAL_RES_CONTINUE;
}
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR != valueNode->node.resType.type) {
return DEAL_RES_CONTINUE;
}
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type && TSDB_DATA_TYPE_NCHAR !=
valueNode->node.resType.type) {
return DEAL_RES_CONTINUE;
}
if (stat->precision < 0) {
int32_t code = fltAddValueNodeToConverList(stat, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
if (stat->precision < 0) {
int32_t code = fltAddValueNodeToConverList(stat, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
return DEAL_RES_CONTINUE;
}
int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
*/
int32_t code = sclConvertToTsValueNode(stat->precision, valueNode);
if (code) {
stat->code = code;
return DEAL_RES_ERROR;
}
*/
return
DEAL_RES_CONTINUE
;
}
...
...
@@ -3939,7 +3919,7 @@ EDealRes fltReviseRewriter(SNode **pNode, void *pContext) {
stat
->
scalarMode
=
true
;
return
DEAL_RES_CONTINUE
;
}
int32_t
type
=
vectorGetConvertType
(
refNode
->
node
.
resType
.
type
,
listNode
->
node
.
resType
.
type
);
int32_t
type
=
vectorGetConvertType
(
refNode
->
node
.
resType
.
type
,
listNode
->
node
.
resType
.
type
);
if
(
0
!=
type
&&
type
!=
refNode
->
node
.
resType
.
type
)
{
stat
->
scalarMode
=
true
;
return
DEAL_RES_CONTINUE
;
...
...
@@ -3963,14 +3943,14 @@ int32_t fltReviseNodes(SFilterInfo *pInfo, SNode **pNode, SFltTreeStat *pStat) {
FLT_ERR_JRET
(
pStat
->
code
);
/*
int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
for (int32_t i = 0; i < nodeNum; ++i) {
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
/*
int32_t nodeNum = taosArrayGetSize(pStat->nodeList);
for (int32_t i = 0; i < nodeNum; ++i) {
SValueNode *valueNode = *(SValueNode **)taosArrayGet(pStat->nodeList, i);
FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
}
*/
FLT_ERR_JRET(sclConvertToTsValueNode(pStat->precision, valueNode));
}
*/
_return:
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
d6dd087f
...
...
@@ -270,86 +270,38 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
return
-
1
;
}
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
!
(
lastApplyIndex
>=
beginIndex
&&
lastApplyIndex
<=
endIndex
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
int32_t
code
=
0
;
int64_t
logRetention
=
0
;
if
(
syncNodeIsMnode
(
pSyncNode
))
{
// mnode
int64_t
logRetention
=
SYNC_MNODE_LOG_RETENTION
;
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
int64_t
logNum
=
endIndex
-
beginIndex
;
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
(
!
isEmpty
&&
logNum
<
logRetention
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", log-num:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
logNum
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
goto
_DEL_WAL
;
logRetention
=
SYNC_MNODE_LOG_RETENTION
;
}
else
{
SyncIndex
beginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
endIndex
=
pSyncNode
->
pLogStore
->
syncLogEndIndex
(
pSyncNode
->
pLogStore
);
bool
isEmpty
=
pSyncNode
->
pLogStore
->
syncLogIsEmpty
(
pSyncNode
->
pLogStore
);
if
(
isEmpty
||
!
(
lastApplyIndex
>=
beginIndex
&&
lastApplyIndex
<=
endIndex
))
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
", empty:%d, do not delete wal"
,
lastApplyIndex
,
isEmpty
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
// vnode
if
(
pSyncNode
->
replicaNum
>
1
)
{
// multi replicas
logRetention
=
SYNC_VNODE_LOG_RETENTION
;
}
}
lastApplyIndex
=
TMAX
(
lastApplyIndex
-
SYNC_VNODE_LOG_RETENTION
,
beginIndex
-
1
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
pSyncNode
->
minMatchIndex
=
syncMinMatchIndex
(
pSyncNode
);
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
int64_t
matchIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pMatchIndex
,
&
(
pSyncNode
->
peersId
[
i
]));
if
(
lastApplyIndex
>
matchIndex
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" is greater than match-index:%"
PRId64
" of dnode:%d, do not delete wal"
,
lastApplyIndex
,
matchIndex
,
DID
(
&
pSyncNode
->
peersId
[
i
]));
syncNodeRelease
(
pSyncNode
);
return
0
;
}
}
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
if
(
lastApplyIndex
>
pSyncNode
->
minMatchIndex
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" is greater than min-match-index:%"
PRId64
", do not delete wal"
,
lastApplyIndex
,
pSyncNode
->
minMatchIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
}
else
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" candidate, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
else
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" unknown state, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
goto
_DEL_WAL
;
}
else
{
// one replica
goto
_DEL_WAL
;
if
(
pSyncNode
->
replicaNum
>
1
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
&&
pSyncNode
->
state
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
sNTrace
(
pSyncNode
,
"new-snapshot-index:%"
PRId64
" candidate or unknown state, do not delete wal"
,
lastApplyIndex
);
syncNodeRelease
(
pSyncNode
);
return
0
;
}
logRetention
=
TMAX
(
logRetention
,
lastApplyIndex
-
pSyncNode
->
minMatchIndex
);
}
_DEL_WAL:
...
...
@@ -366,7 +318,7 @@ _DEL_WAL:
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
pSyncNode
->
snapshottingTime
=
taosGetTimestampMs
();
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
);
code
=
walBeginSnapshot
(
pData
->
pWal
,
lastApplyIndex
,
logRetention
);
if
(
code
==
0
)
{
sNTrace
(
pSyncNode
,
"wal snapshot begin, index:%"
PRId64
", last apply index:%"
PRId64
,
pSyncNode
->
snapshottingIndex
,
lastApplyIndex
);
...
...
@@ -2141,24 +2093,19 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
if
(
timerLogicClock
==
msgLogicClock
)
{
if
(
tsNow
>
pData
->
execTime
)
{
#if 0
sTrace(
"vgId:%d, hbDataRid:%ld, EXECUTE this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, "
"---------",
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
pData
->
execTime
+=
pSyncTimer
->
timerMS
;
SRpcMsg
rpcMsg
=
{
0
};
(
void
)
syncBuildHeartbeat
(
&
rpcMsg
,
pSyncNode
->
vgId
);
pSyncNode
->
minMatchIndex
=
syncMinMatchIndex
(
pSyncNode
);
SyncHeartbeat
*
pSyncMsg
=
rpcMsg
.
pCont
;
pSyncMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pSyncMsg
->
destId
=
pData
->
destId
;
pSyncMsg
->
term
=
raftStoreGetTerm
(
pSyncNode
);
pSyncMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pSyncMsg
->
minMatchIndex
=
syncMinMatchIndex
(
pSyncNode
)
;
pSyncMsg
->
minMatchIndex
=
pSyncNode
->
minMatchIndex
;
pSyncMsg
->
privateTerm
=
0
;
pSyncMsg
->
timeStamp
=
tsNow
;
...
...
@@ -2170,11 +2117,6 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
syncLogSendHeartbeat
(
pSyncNode
,
pSyncMsg
,
false
,
timerElapsed
,
pData
->
execTime
);
syncNodeSendHeartbeat
(
pSyncNode
,
&
pSyncMsg
->
destId
,
&
rpcMsg
);
}
else
{
#if 0
sTrace(
"vgId:%d, hbDataRid:%ld, pass this step-------- heartbeat tsNow:%ld, exec:%ld, tsNow-exec:%ld, ---------",
pSyncNode->vgId, hbDataRid, tsNow, pData->execTime, tsNow - pData->execTime);
#endif
}
if
(
syncIsInit
())
{
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
d6dd087f
...
...
@@ -260,7 +260,7 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
}
int
tdbBtreeUpsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
nKey
,
const
void
*
pData
,
int
nData
,
TXN
*
pTxn
)
{
SBTC
btc
;
SBTC
btc
=
{
0
}
;
int
c
;
int
ret
;
...
...
@@ -272,10 +272,17 @@ int tdbBtreeUpsert(SBTree *pBt, const void *pKey, int nKey, const void *pData, i
ret
=
tdbBtcMoveTo
(
&
btc
,
pKey
,
nKey
,
&
c
);
if
(
ret
<
0
)
{
tdbError
(
"tdb/btree-upsert: btc move to failed with ret: %d."
,
ret
);
if
(
TDB_CELLDECODER_FREE_KEY
(
&
btc
.
coder
))
{
tdbFree
(
btc
.
coder
.
pKey
);
}
tdbBtcClose
(
&
btc
);
return
-
1
;
}
if
(
TDB_CELLDECODER_FREE_KEY
(
&
btc
.
coder
))
{
tdbFree
(
btc
.
coder
.
pKey
);
}
if
(
btc
.
idx
==
-
1
)
{
btc
.
idx
=
0
;
c
=
1
;
...
...
@@ -1442,15 +1449,19 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
// Clear the state of decoder
if
(
TDB_CELLDECODER_FREE_VAL
(
pDecoder
))
{
tdbFree
(
pDecoder
->
pVal
);
TDB_CELLDECODER_CLZ_FREE_VAL
(
pDecoder
);
// tdbTrace("tdb btc decoder val set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
}
if
(
TDB_CELLDECODER_FREE_KEY
(
pDecoder
))
{
tdbFree
(
pDecoder
->
pKey
);
TDB_CELLDECODER_CLZ_FREE_KEY
(
pDecoder
);
// tdbTrace("tdb btc decoder key set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
}
pDecoder
->
kLen
=
-
1
;
pDecoder
->
pKey
=
NULL
;
pDecoder
->
vLen
=
-
1
;
pDecoder
->
pVal
=
NULL
;
pDecoder
->
pgno
=
0
;
TDB_CELLDECODER_SET_FREE_NIL
(
pDecoder
);
// tdbTrace("tdb btc decoder set nil: %p/0x%x ", pDecoder, pDecoder->freeKV);
// 1. Decode header part
if
(
!
leaf
)
{
...
...
@@ -2270,10 +2281,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
}
else
{
lidx
=
lidx
+
1
;
}
if
(
TDB_CELLDECODER_FREE_KEY
(
&
pBtc
->
coder
))
{
tdbFree
((
void
*
)
pTKey
);
}
// compare last cell
if
(
lidx
<=
ridx
)
{
pBtc
->
idx
=
ridx
;
...
...
@@ -2284,9 +2291,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
}
else
{
ridx
=
ridx
-
1
;
}
if
(
TDB_CELLDECODER_FREE_KEY
(
&
pBtc
->
coder
))
{
tdbFree
((
void
*
)
pTKey
);
}
}
// binary search
...
...
@@ -2297,9 +2301,6 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
pBtc
->
idx
=
(
lidx
+
ridx
)
>>
1
;
tdbBtcGet
(
pBtc
,
&
pTKey
,
&
tkLen
,
NULL
,
NULL
);
c
=
pBt
->
kcmpr
(
pKey
,
kLen
,
pTKey
,
tkLen
);
if
(
TDB_CELLDECODER_FREE_KEY
(
&
pBtc
->
coder
))
{
tdbFree
((
void
*
)
pTKey
);
}
if
(
c
<
0
)
{
// pKey < cd.pKey
ridx
=
pBtc
->
idx
-
1
;
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
d6dd087f
...
...
@@ -122,6 +122,8 @@ typedef struct SBtInfo {
#define TDB_CELLD_F_VAL 0x2
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
#define TDB_CELLDECODER_CLZ_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_CLZ_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV &= ~TDB_CELLD_F_VAL)
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
d6dd087f
...
...
@@ -22,6 +22,7 @@ extern "C" {
#include "os.h"
#include "taoserror.h"
#include "theap.h"
#include "tmisce.h"
#include "transLog.h"
#include "transportInt.h"
#include "trpc.h"
...
...
source/libs/transport/src/transCli.c
浏览文件 @
d6dd087f
...
...
@@ -11,7 +11,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "transComm.h"
typedef
struct
SConnList
{
...
...
@@ -224,9 +223,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
} while (0);
// snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
do { \
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
do { \
char* p = key; \
int32_t len = strlen(ip); \
if (p != NULL) memcpy(p, ip, len); \
p[len] = ':'; \
titoa(port, 10, &p[len + 1]); \
} while (0)
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
...
...
@@ -664,7 +667,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static
void
cliAllocRecvBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
SCliConn
*
conn
=
handle
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
t
Debug
(
"%s conn %p alloc read buf"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
t
Trace
(
"%s conn %p alloc read buf"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
transAllocBuffer
(
pBuf
,
buf
);
}
static
void
cliRecvCb
(
uv_stream_t
*
handle
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
...
@@ -677,7 +680,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
while
(
transReadComplete
(
pBuf
))
{
t
Debug
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
t
Trace
(
"%s conn %p read complete"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
if
(
pBuf
->
invalid
)
{
cliHandleExcept
(
conn
);
break
;
...
...
@@ -1949,11 +1952,13 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tGDebug
(
"%s retry on next node,use:%s, step: %d,timeout:%"
PRId64
""
,
transLabel
(
pThrd
->
pTransInst
),
tbuf
,
pCtx
->
retryStep
,
pCtx
->
retryNextInterval
);
if
(
rpcDebugFlag
&
DEBUG_DEBUG
)
{
STraceId
*
trace
=
&
pMsg
->
msg
.
info
.
traceId
;
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tGDebug
(
"%s retry on next node,use:%s, step: %d,timeout:%"
PRId64
""
,
transLabel
(
pThrd
->
pTransInst
),
tbuf
,
pCtx
->
retryStep
,
pCtx
->
retryNextInterval
);
}
STaskArg
*
arg
=
taosMemoryMalloc
(
sizeof
(
STaskArg
));
arg
->
param1
=
pMsg
;
...
...
@@ -1990,7 +1995,7 @@ FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) {
pResp
->
pCont
=
buf
;
pResp
->
contLen
=
len
;
*
dst
=
epset
;
epsetAssign
(
dst
,
&
epset
)
;
return
true
;
}
bool
cliResetEpset
(
STransConnCtx
*
pCtx
,
STransMsg
*
pResp
,
bool
hasEpSet
)
{
...
...
@@ -2015,7 +2020,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
else
{
if
(
!
transEpSetIsEqual
(
&
pCtx
->
epSet
,
&
epSet
))
{
tDebug
(
"epset not equal, retry new epset"
);
pCtx
->
epSet
=
epSet
;
epsetAssign
(
&
pCtx
->
epSet
,
&
epSet
)
;
noDelay
=
false
;
}
else
{
if
(
pCtx
->
epsetRetryCnt
>=
pCtx
->
epSet
.
numOfEps
)
{
...
...
@@ -2040,7 +2045,7 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
else
{
if
(
!
transEpSetIsEqual
(
&
pCtx
->
epSet
,
&
epSet
))
{
tDebug
(
"epset not equal, retry new epset"
);
pCtx
->
epSet
=
epSet
;
epsetAssign
(
&
pCtx
->
epSet
,
&
epSet
)
;
noDelay
=
false
;
}
else
{
if
(
pCtx
->
epsetRetryCnt
>=
pCtx
->
epSet
.
numOfEps
)
{
...
...
@@ -2130,10 +2135,6 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if
(
pCtx
->
retryNextInterval
>=
pCtx
->
retryMaxInterval
)
{
pCtx
->
retryNextInterval
=
pCtx
->
retryMaxInterval
;
}
// if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
// return false;
// }
}
else
{
pCtx
->
retryNextInterval
=
0
;
pCtx
->
epsetRetryCnt
++
;
...
...
@@ -2181,9 +2182,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STraceId
*
trace
=
&
pResp
->
info
.
traceId
;
bool
hasEpSet
=
cliTryExtractEpSet
(
pResp
,
&
pCtx
->
epSet
);
if
(
hasEpSet
)
{
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tGTrace
(
"%s conn %p extract epset from msg"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
if
(
rpcDebugFlag
&
DEBUG_TRACE
)
{
char
tbuf
[
256
]
=
{
0
};
EPSET_DEBUG_STR
(
&
pCtx
->
epSet
,
tbuf
);
tGTrace
(
"%s conn %p extract epset from msg"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
}
}
if
(
pCtx
->
pSem
!=
NULL
)
{
...
...
@@ -2310,8 +2313,9 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
origEpSet
=
*
pEpSet
;
epsetAssign
(
&
pCtx
->
epSet
,
pEpSet
);
epsetAssign
(
&
pCtx
->
origEpSet
,
pEpSet
);
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
pCtx
->
msgType
=
pReq
->
msgType
;
...
...
@@ -2356,8 +2360,8 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
TRACE_SET_MSGID
(
&
pReq
->
info
.
traceId
,
tGenIdPI64
());
STransConnCtx
*
pCtx
=
taosMemoryCalloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
epSet
=
*
pEpSet
;
pCtx
->
origEpSet
=
*
pEpSet
;
epsetAssign
(
&
pCtx
->
epSet
,
pEpSet
)
;
epsetAssign
(
&
pCtx
->
origEpSet
,
pEpSet
)
;
pCtx
->
ahandle
=
pReq
->
info
.
ahandle
;
pCtx
->
msgType
=
pReq
->
msgType
;
pCtx
->
pSem
=
sem
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
d6dd087f
...
...
@@ -96,7 +96,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
if
(
walSkipFetchBodyNew
(
pReader
)
<
0
)
{
return
-
1
;
}
fetchVer
++
;
fetchVer
=
pReader
->
curVersion
;
}
}
pReader
->
curStopped
=
1
;
...
...
@@ -142,7 +142,7 @@ static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int
}
static
int32_t
walReadChangeFile
(
SWalReader
*
pReader
,
int64_t
fileFirstVer
)
{
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
]
=
{
0
}
;
taosCloseFile
(
&
pReader
->
pIdxFile
);
taosCloseFile
(
&
pReader
->
pLogFile
);
...
...
@@ -299,14 +299,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", read request index:%"
PRId64
,
pReader
->
pWal
->
cfg
.
vgId
,
pReader
->
pHead
->
head
.
version
,
ver
);
pReader
->
curInvalid
=
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
pReader
->
pHead
)
!=
0
)
{
wError
(
"vgId:%d, wal fetch body error:%"
PRId64
", since body checksum not passed"
,
pReader
->
pWal
->
cfg
.
vgId
,
ver
);
pReader
->
curInvalid
=
1
;
...
...
source/libs/wal/src/walRef.c
浏览文件 @
d6dd087f
...
...
@@ -26,7 +26,7 @@ SWalRef *walOpenRef(SWal *pWal) {
}
pRef
->
refId
=
tGenIdPI64
();
pRef
->
refVer
=
-
1
;
pRef
->
refFile
=
-
1
;
//
pRef->refFile = -1;
pRef
->
pWal
=
pWal
;
taosHashPut
(
pWal
->
pRefHash
,
&
pRef
->
refId
,
sizeof
(
int64_t
),
&
pRef
,
sizeof
(
void
*
));
return
pRef
;
...
...
@@ -58,11 +58,11 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
pRef
->
refVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
/
*A(pRet != NULL);*/
pRef
->
refFile
=
pRet
->
firstVer
;
//
SWalFileInfo tmpInfo;
//
tmpInfo.firstVer = ver;
//
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
/
/ ASSERT(pRet != NULL);
//
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
}
...
...
@@ -73,7 +73,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
#if 1
void
walUnrefVer
(
SWalRef
*
pRef
)
{
pRef
->
refId
=
-
1
;
pRef
->
refFile
=
-
1
;
//
pRef->refFile = -1;
}
#endif
...
...
@@ -85,20 +85,18 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) {
}
}
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
ver
=
walGetFirstVer
(
pWal
);
wDebug
(
"vgId:%d, wal ref version %"
PRId64
" for first"
,
pWal
->
cfg
.
vgId
,
ver
);
pRef
->
refVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
pRef
->
refFile
=
pRet
->
firstVer
;
//
SWalFileInfo tmpInfo;
//
tmpInfo.firstVer = ver;
//
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
//
ASSERT(pRet != NULL);
//
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
wDebug
(
"vgId:%d, wal ref version %"
PRId64
" for first"
,
pWal
->
cfg
.
vgId
,
ver
);
return
pRef
;
}
...
...
@@ -119,8 +117,8 @@ SWalRef *walRefCommittedVer(SWal *pWal) {
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
/*A(pRet != NULL);*/
pRef
->
refFile
=
pRet
->
firstVer
;
ASSERT
(
pRet
!=
NULL
);
//
pRef->refFile = pRet->firstVer;
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
pRef
;
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
d6dd087f
...
...
@@ -247,21 +247,23 @@ static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) {
return
0
;
}
int32_t
walBeginSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walBeginSnapshot
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
logRetention
)
{
taosThreadMutexLock
(
&
pWal
->
mutex
);
ASSERT
(
logRetention
>=
0
);
pWal
->
vers
.
verInSnapshotting
=
ver
;
wDebug
(
"vgId:%d, wal begin snapshot for version %"
PRId64
", first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
pWal
->
vers
.
logRetention
=
logRetention
;
wDebug
(
"vgId:%d, wal begin snapshot for version %"
PRId64
", log retention %"
PRId64
" first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
// check file rolling
if
(
pWal
->
cfg
.
retentionPeriod
==
0
)
{
if
(
walGetLastFileSize
(
pWal
)
!=
0
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
wError
(
"vgId:%d, failed to roll wal files since %s"
,
pWal
->
cfg
.
vgId
,
terrstr
());
goto
_err
;
}
if
(
walGetLastFileSize
(
pWal
)
!=
0
)
{
if
(
walRollImpl
(
pWal
)
<
0
)
{
wError
(
"vgId:%d, failed to roll wal files since %s"
,
pWal
->
cfg
.
vgId
,
terrstr
());
goto
_err
;
}
}
taosThreadMutexUnlock
(
&
pWal
->
mutex
);
return
0
;
...
...
@@ -275,8 +277,9 @@ int32_t walEndSnapshot(SWal *pWal) {
taosThreadMutexLock
(
&
pWal
->
mutex
);
int64_t
ver
=
pWal
->
vers
.
verInSnapshotting
;
wDebug
(
"vgId:%d, wal end snapshot for version %"
PRId64
", first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
wDebug
(
"vgId:%d, wal end snapshot for version %"
PRId64
", log retention %"
PRId64
" first ver %"
PRId64
", last ver %"
PRId64
,
pWal
->
cfg
.
vgId
,
ver
,
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
,
pWal
->
vers
.
lastVer
);
if
(
ver
==
-
1
)
{
code
=
-
1
;
...
...
@@ -286,6 +289,7 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal
->
vers
.
snapshotVer
=
ver
;
int
ts
=
taosGetTimestampSec
();
ver
=
TMAX
(
ver
-
pWal
->
vers
.
logRetention
,
pWal
->
vers
.
firstVer
-
1
);
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pWal
->
pRefHash
,
pIter
);
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
d6dd087f
...
...
@@ -264,7 +264,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) {
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
i
);
if
(
i
==
5
)
{
walBeginSnapshot
(
pWal
,
i
);
walBeginSnapshot
(
pWal
,
i
,
0
);
walEndSnapshot
(
pWal
);
}
}
...
...
@@ -301,7 +301,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
walBeginSnapshot
(
pWal
,
i
-
1
);
walBeginSnapshot
(
pWal
,
i
-
1
,
0
);
ASSERT_EQ
(
pWal
->
vers
.
verInSnapshotting
,
i
-
1
);
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
pWal
->
vers
.
snapshotVer
,
i
-
1
);
...
...
@@ -317,7 +317,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ
(
pWal
->
vers
.
commitVer
,
i
);
}
code
=
walBeginSnapshot
(
pWal
,
i
-
1
);
code
=
walBeginSnapshot
(
pWal
,
i
-
1
,
0
);
ASSERT_EQ
(
code
,
0
);
code
=
walEndSnapshot
(
pWal
);
ASSERT_EQ
(
code
,
0
);
...
...
source/os/src/osSemaphore.c
浏览文件 @
d6dd087f
...
...
@@ -132,7 +132,8 @@ int tsem_wait(tsem_t *psem) {
int
tsem_timewait
(
tsem_t
*
psem
,
int64_t
milis
)
{
if
(
psem
==
NULL
||
*
psem
==
NULL
)
return
-
1
;
dispatch_semaphore_wait
(
*
psem
,
milis
*
1000
*
1000
);
dispatch_time_t
time
=
dispatch_time
(
DISPATCH_TIME_NOW
,
(
int64_t
)(
milis
*
USEC_PER_SEC
));
dispatch_semaphore_wait
(
*
psem
,
time
);
return
0
;
}
...
...
source/util/src/tarray.c
浏览文件 @
d6dd087f
...
...
@@ -139,7 +139,8 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
taosArraySet
(
pArray
,
pos
+
1
,
p2
);
pos
+=
1
;
memset
(
TARRAY_GET_ELEM
(
pArray
,
i
),
0
,
pArray
->
elemSize
);
pos
+=
1
;
}
else
{
pos
+=
1
;
}
...
...
@@ -171,13 +172,14 @@ void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp
// do nothing
}
else
{
if
(
pos
+
1
!=
i
)
{
void
*
p
=
taosArrayGet
(
pArray
,
pos
+
1
);
void
*
p
=
taosArrayGet
P
(
pArray
,
pos
+
1
);
if
(
fp
!=
NULL
)
{
fp
(
p
);
}
taosArraySet
(
pArray
,
pos
+
1
,
p2
);
pos
+=
1
;
memset
(
TARRAY_GET_ELEM
(
pArray
,
i
),
0
,
pArray
->
elemSize
);
pos
+=
1
;
}
else
{
pos
+=
1
;
}
...
...
source/util/src/tcompare.c
浏览文件 @
d6dd087f
...
...
@@ -1235,7 +1235,7 @@ int32_t taosArrayCompareString(const void *a, const void *b) {
const
char
*
x
=
*
(
const
char
**
)
a
;
const
char
*
y
=
*
(
const
char
**
)
b
;
return
compareLenPrefixedStr
(
x
,
y
);
return
strcmp
(
x
,
y
);
}
int32_t
comparestrPatternMatch
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
...
...
source/util/src/terror.c
浏览文件 @
d6dd087f
...
...
@@ -523,7 +523,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
,
"Illegal number of columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_TOO_MANY_COLUMNS
,
"Too many columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_FIRST_COLUMN
,
"First column must be timestamp"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
,
"Invalid binary/nchar column length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN
,
"Invalid binary/nchar column
/tag
length"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_TAGS_NUM
,
"Invalid number of tag columns"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_PERMISSION_DENIED
,
"Permission denied"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_STREAM_QUERY
,
"Invalid stream query"
)
...
...
tests/script/tsim/catalog/alterInCurrent.sim
浏览文件 @
d6dd087f
...
...
@@ -67,4 +67,19 @@ sql insert into t1 values (1591060628000, 1);
sql alter table st1 drop tag t2;
sql create table t2 using st1 tags(2);
print ======== drop tag in super table
sql create database if not exists aaa;
sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'aaa';
if $rows != 0 then
return -1
endi
sql drop database if exists foo;
sql create database if not exists foo;
sql create table foo.t(ts timestamp,name varchar(20));
sql create table foo.xt(ts timestamp,name varchar(20));
sql select table_name, db_name from information_schema.ins_tables t where t.db_name like 'foo';
if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/system-test/7-tmq/tmq_taosx.py
浏览文件 @
d6dd087f
...
...
@@ -200,6 +200,7 @@ class TDTestCase:
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
2
,
'{"k1":1,"k2":"hello"}'
)
time
.
sleep
(
10
)
tdSql
.
query
(
"select * from information_schema.ins_tables where table_name = 'stt4'"
)
uid1
=
tdSql
.
getData
(
0
,
5
)
uid2
=
tdSql
.
getData
(
1
,
5
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录