Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c06b5bbd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c06b5bbd
编写于
4月 26, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feature/vnode_refact1
上级
2ccdaf0c
6c6b0055
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
139 addition
and
110 deletion
+139
-110
include/common/tmsg.h
include/common/tmsg.h
+1
-48
include/libs/wal/wal.h
include/libs/wal/wal.h
+0
-2
source/client/src/tmq.c
source/client/src/tmq.c
+39
-29
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-31
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/db/taosdlog.sim
tests/script/tsim/db/taosdlog.sim
+31
-0
tests/system-test/0-others/taosdlog.py
tests/system-test/0-others/taosdlog.py
+65
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
c06b5bbd
...
...
@@ -2383,7 +2383,7 @@ typedef struct {
int32_t
epoch
;
uint64_t
reqId
;
int64_t
consumerId
;
int64_t
blocking
Time
;
int64_t
wait
Time
;
int64_t
currentOffset
;
}
SMqPollReqV2
;
...
...
@@ -2400,53 +2400,6 @@ typedef struct {
SSchemaWrapper
schema
;
}
SMqSubTopicEp
;
typedef
struct
{
SMqRspHead
head
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
dataLen
;
SArray
*
blockPos
;
// beginning pos for each SRetrieveTableRsp
void
*
blockData
;
// serialized batched SRetrieveTableRsp
}
SMqPollRspV2
;
static
FORCE_INLINE
int32_t
tEncodeSMqPollRspV2
(
void
**
buf
,
const
SMqPollRspV2
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
dataLen
);
if
(
pRsp
->
dataLen
!=
0
)
{
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
blockPos
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
blockPos
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockPos
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
blockPos
);
}
tlen
+=
taosEncodeBinary
(
buf
,
pRsp
->
blockData
,
pRsp
->
dataLen
);
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqPollRspV2
(
const
void
*
buf
,
SMqPollRspV2
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
dataLen
);
if
(
pRsp
->
dataLen
!=
0
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pRsp
->
blockPos
=
taosArrayInit
(
sz
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
blockPos
;
buf
=
taosDecodeFixedI32
(
buf
,
&
blockPos
);
taosArrayPush
(
pRsp
->
blockPos
,
&
blockPos
);
}
buf
=
taosDecodeBinary
(
buf
,
&
pRsp
->
blockData
,
pRsp
->
dataLen
);
}
return
(
void
*
)
buf
;
}
typedef
struct
{
SMqRspHead
head
;
int64_t
reqOffset
;
...
...
include/libs/wal/wal.h
浏览文件 @
c06b5bbd
...
...
@@ -72,8 +72,6 @@ extern "C" {
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1
#pragma pack(push, 1)
typedef
enum
{
TAOS_WAL_NOLOG
=
0
,
...
...
source/client/src/tmq.c
浏览文件 @
c06b5bbd
...
...
@@ -68,7 +68,7 @@ struct tmq_conf_t {
char
*
user
;
char
*
pass
;
char
*
db
;
tmq_commit_cb
*
commit
_c
b
;
tmq_commit_cb
*
commit
C
b
;
};
struct
tmq_t
{
...
...
@@ -115,8 +115,8 @@ enum {
enum
{
TMQ_CONSUMER_STATUS__INIT
=
0
,
TMQ_CONSUMER_STATUS__SUBSCRIBED
,
TMQ_CONSUMER_STATUS__READY
,
TMQ_CONSUMER_STATUS__NO_TOPIC
,
};
enum
{
...
...
@@ -300,6 +300,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__HB
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
void
tmqAssignDelayedCommitTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -307,6 +308,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__COMMIT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
void
tmqAssignDelayedReportTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -314,6 +316,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
*
pTaskType
=
TMQ_DELAYED_TASK__REPORT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
int32_t
tmqHandleAllDelayedTask
(
tmq_t
*
tmq
)
{
...
...
@@ -364,7 +367,6 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam
*
pParam
=
(
SMqSubscribeCbParam
*
)
param
;
pParam
->
rspErr
=
code
;
tmq_t
*
tmq
=
pParam
->
tmq
;
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__SUBSCRIBED
);
tsem_post
(
&
pParam
->
rspSem
);
return
0
;
}
...
...
@@ -475,7 +477,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
autoCommit
=
conf
->
autoCommit
;
pTmq
->
autoCommitInterval
=
conf
->
autoCommitInterval
;
pTmq
->
commit_cb
=
conf
->
commit
_c
b
;
pTmq
->
commit_cb
=
conf
->
commit
C
b
;
pTmq
->
resetOffsetCfg
=
conf
->
resetOffset
;
// assign consumerId
...
...
@@ -686,7 +688,7 @@ FAIL:
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
)
{
//
conf
->
commit
_c
b
=
cb
;
conf
->
commit
C
b
=
cb
;
}
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
)
{
...
...
@@ -798,7 +800,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// do not write into queue since updating epoch reset
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
0
;
}
...
...
@@ -843,14 +845,14 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper
->
msg
.
reqOffset
,
pRspWrapper
->
msg
.
rspOffset
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
0
;
CREATE_MSG_FAIL:
if
(
pParam
->
epoch
==
tmq
->
epoch
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
}
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
...
...
@@ -927,6 +929,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
taosHashCleanup
(
pHash
);
tmq
->
clientTopics
=
newTopics
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__NO_TOPIC
);
else
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
atomic_store_32
(
&
tmq
->
epoch
,
epoch
);
return
set
;
}
...
...
@@ -955,9 +963,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
rsp
);
/*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
/*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
if
(
tmqUpdateEp
(
tmq
,
head
->
epoch
,
&
rsp
))
{
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__READY
);
}
tmqUpdateEp
(
tmq
,
head
->
epoch
,
&
rsp
);
tDeleteSMqAskEpRsp
(
&
rsp
);
}
else
{
SMqAskEpRspWrapper
*
pWrapper
=
taosAllocateQitem
(
sizeof
(
SMqAskEpRspWrapper
));
...
...
@@ -972,7 +978,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqAskEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pWrapper
->
msg
);
taosWriteQitem
(
tmq
->
mqueue
,
pWrapper
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
taosMemoryFree
(
pParam
);
}
...
...
@@ -1076,7 +1082,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return
TMQ_RESP_ERR__FAIL
;
}
SMqPollReqV2
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blocking
Time
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
SMqPollReqV2
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
wait
Time
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
int64_t
reqOffset
;
if
(
pVg
->
currentOffset
>=
0
)
{
reqOffset
=
pVg
->
currentOffset
;
...
...
@@ -1101,7 +1107,7 @@ SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClient
pReq
->
subKey
[
tlen
]
=
TMQ_SEPARATOR
;
strcpy
(
pReq
->
subKey
+
tlen
+
1
,
pTopic
->
topicName
);
pReq
->
blockingTime
=
blocking
Time
;
pReq
->
waitTime
=
wait
Time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
epoch
=
tmq
->
epoch
;
pReq
->
currentOffset
=
reqOffset
;
...
...
@@ -1130,7 +1136,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
blocking
Time
)
{
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
wait
Time
)
{
/*printf("call poll\n");*/
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
...
...
@@ -1151,17 +1157,17 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
#endif
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
SMqPollReqV2
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blocking
Time
,
pTopic
,
pVg
);
SMqPollReqV2
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
wait
Time
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
SMqPollCbParam
*
pParam
=
taosMemoryMalloc
(
sizeof
(
SMqPollCbParam
));
if
(
pParam
==
NULL
)
{
taosMemoryFree
(
pReq
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
pParam
->
tmq
=
tmq
;
...
...
@@ -1176,7 +1182,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
taosMemoryFree
(
pReq
);
taosMemoryFree
(
pParam
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
}
...
...
@@ -1222,7 +1228,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return
0
;
}
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
blocking
Time
,
bool
pollIfReset
)
{
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
wait
Time
,
bool
pollIfReset
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
...
@@ -1261,37 +1267,41 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
taosFreeQitem
(
rspWrapper
);
if
(
pollIfReset
&&
reset
)
{
tscDebug
(
"consumer %ld reset and repoll"
,
tmq
->
consumerId
);
tmqPollImpl
(
tmq
,
blocking
Time
);
tmqPollImpl
(
tmq
,
wait
Time
);
}
}
}
}
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking
_time
)
{
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
wait
_time
)
{
SMqRspObj
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
rspObj
=
tmqHandleAllRsp
(
tmq
,
blocking
_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait
_time
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
if
(
atomic_load_8
(
&
tmq
->
status
)
!=
TMQ_CONSUMER_STATUS__READY
)
{
return
NULL
;
}
while
(
1
)
{
tmqHandleAllDelayedTask
(
tmq
);
tmqPollImpl
(
tmq
,
blocking_time
);
/*tsem_wait(&tmq->rspSem);*/
tmqPollImpl
(
tmq
,
wait_time
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
blocking
_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait
_time
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
if
(
blocking
_time
!=
0
)
{
if
(
wait
_time
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
if
(
endTime
-
startTime
>
blocking_time
)
{
int64_t
leftTime
=
endTime
-
startTime
;
if
(
leftTime
>
wait_time
)
{
tscDebug
(
"consumer %ld (epoch %d) timeout, no rsp"
,
tmq
->
consumerId
,
tmq
->
epoch
);
return
NULL
;
}
tsem_timewait
(
&
tmq
->
rspSem
,
leftTime
*
1000
);
}
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
c06b5bbd
...
...
@@ -78,7 +78,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pExec
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
...
...
@@ -210,35 +210,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
tmsgPutToQueue
(
&
pTq
->
pVnode
->
msgCb
,
FETCH_QUEUE
,
&
req
);
#if 0
void* pIter = taosHashIterate(pTq->tqPushMgr->pHash, NULL);
while (pIter != NULL) {
STqPusher* pusher = *(STqPusher**)pIter;
if (pusher->type == TQ_PUSHER_TYPE__STREAM) {
STqStreamPusher* streamPusher = (STqStreamPusher*)pusher;
// repack
STqStreamToken* token = taosMemoryMalloc(sizeof(STqStreamToken));
if (token == NULL) {
taosHashCancelIterate(pTq->tqPushMgr->pHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
token->type = TQ_STREAM_TOKEN__DATA;
token->data = msg;
// set input
// exec
}
// send msg to ep
}
// iterate hash
// process all msg
// if waiting
// memcpy and send msg to fetch thread
// TODO: add reference
// if handle waiting, launch query and response to consumer
//
// if no waiting handle, return
#endif
return
0
;
}
...
...
@@ -377,7 +348,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReqV2
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
waitTime
=
pReq
->
blocking
Time
;
int64_t
waitTime
=
pReq
->
wait
Time
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
c06b5bbd
...
...
@@ -13,6 +13,7 @@
./test.sh -f tsim/db/basic6.sim
./test.sh -f tsim/db/basic7.sim
./test.sh -f tsim/db/error1.sim
./test.sh -f tsim/db/taosdlog.sim
# ---- dnode
./test.sh -f tsim/dnode/basic1.sim
...
...
tests/script/tsim/db/taosdlog.sim
0 → 100644
浏览文件 @
c06b5bbd
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system rm -rf ../../sim/dnode1/log
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== create database
sql create database d1 vgroups 2
sql show databases
if $rows != 3 then
return -1
endi
print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
sleep 2000
system rm -rf ../../sim/dnode1/log
system sh/exec.sh -n dnode1 -s start
sleep 2000
print =============== show databases
sql create database d2 vgroups 6
sql show databases
if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/system-test/0-others/taosdlog.py
0 → 100644
浏览文件 @
c06b5bbd
import
taos
import
sys
import
time
import
os
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
run
(
self
):
# sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql
.
prepare
()
# time.sleep(2)
tdSql
.
query
(
"create user testpy pass 'testpy'"
)
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
tdLog
.
exit
(
"taosd not found!"
)
else
:
tdLog
.
info
(
"taosd found in %s"
%
buildPath
)
logPath
=
buildPath
+
"/../sim/dnode1/log"
tdLog
.
info
(
"log path: %s"
%
logPath
)
tdDnodes
.
stop
(
1
)
time
.
sleep
(
2
)
tdSql
.
error
(
"show databases"
)
os
.
system
(
"rm -rf %s"
%
logPath
)
if
os
.
path
.
exists
(
logPath
)
==
True
:
tdLog
.
exit
(
"log pat still exist!"
)
tdDnodes
.
start
(
1
)
time
.
sleep
(
2
)
if
os
.
path
.
exists
(
logPath
)
!=
True
:
tdLog
.
exit
(
"log pat is not generated!"
)
tdSql
.
query
(
"show databases"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录