Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eb86ecfa
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eb86ecfa
编写于
6月 23, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/dnode
上级
ef00464f
89fe5df6
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
662 addition
and
175 deletion
+662
-175
examples/c/tmq.c
examples/c/tmq.c
+11
-3
include/client/taos.h
include/client/taos.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-0
source/client/src/tmq.c
source/client/src/tmq.c
+13
-15
source/common/src/ttime.c
source/common/src/ttime.c
+44
-14
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-0
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+88
-32
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+0
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+2
-2
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+34
-17
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+13
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+183
-67
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+2
-2
source/dnode/vnode/src/sma/smaUtil.c
source/dnode/vnode/src/sma/smaUtil.c
+238
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-3
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+18
-15
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-1
未找到文件。
examples/c/tmq.c
浏览文件 @
eb86ecfa
...
...
@@ -26,6 +26,14 @@ static void msg_process(TAOS_RES* msg) {
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"db: %s
\n
"
,
tmq_get_db_name
(
msg
));
printf
(
"vg: %d
\n
"
,
tmq_get_vgroup_id
(
msg
));
if
(
tmq_get_res_type
(
msg
)
==
TMQ_RES_TABLE_META
)
{
void
*
meta
;
int32_t
metaLen
;
tmq_get_raw_meta
(
msg
,
&
meta
,
&
metaLen
);
printf
(
"meta, len is %d
\n
"
,
metaLen
);
return
;
}
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
...
...
@@ -129,8 +137,8 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as database abc1");*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1, c2, c3 from st1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column with meta as database abc1"
);
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -191,7 +199,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"true"
);
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"
tru
e"
);
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"
fals
e"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
tmq_t
*
tmq
=
tmq_consumer_new
(
conf
,
NULL
,
0
);
...
...
include/client/taos.h
浏览文件 @
eb86ecfa
...
...
@@ -261,7 +261,7 @@ enum tmq_res_t {
typedef
enum
tmq_res_t
tmq_res_t
;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
const
void
**
raw_meta
,
int32_t
*
raw_meta_len
);
DLL_EXPORT
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
void
**
raw_meta
,
int32_t
*
raw_meta_len
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
...
...
include/util/taoserror.h
浏览文件 @
eb86ecfa
...
...
@@ -71,6 +71,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0032)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
...
...
source/client/src/clientMain.c
浏览文件 @
eb86ecfa
...
...
@@ -290,6 +290,8 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
}
else
if
(
TD_RES_TMQ_META
(
res
))
{
return
NULL
;
}
else
{
// assert to avoid un-initialization error
ASSERT
(
0
);
...
...
source/client/src/tmq.c
浏览文件 @
eb86ecfa
...
...
@@ -1160,8 +1160,6 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// handle meta rsp
int8_t
rspType
=
((
SMqRspHead
*
)
pMsg
->
pData
)
->
mqMsgType
;
if
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
}
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
);
if
(
pRspWrapper
==
NULL
)
{
...
...
@@ -1174,19 +1172,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
pRspWrapper
->
vgHandle
=
pVg
;
pRspWrapper
->
topicHandle
=
pTopic
;
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
if
(
rspType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqDataBlkRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
dataRsp
);
}
else
{
ASSERT
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
);
memcpy
(
&
pRspWrapper
->
metaRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqMetaRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
metaRsp
);
}
taosMemoryFree
(
pMsg
->
pData
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
,
pRspWrapper
->
dataRsp
.
rspOffset
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld
, type %d
"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
,
pRspWrapper
->
dataRsp
.
rspOffset
,
rspType
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -1558,7 +1556,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
SMqMetaRspObj
*
tmqBuildMetaRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqMetaRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqMetaRspObj
));
pRspObj
->
resType
=
RES_TYPE__TMQ
;
pRspObj
->
resType
=
RES_TYPE__TMQ
_META
;
tstrncpy
(
pRspObj
->
topic
,
pWrapper
->
topicHandle
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
pRspObj
->
db
,
pWrapper
->
topicHandle
->
db
,
TSDB_DB_FNAME_LEN
);
pRspObj
->
vgId
=
pWrapper
->
vgHandle
->
vgId
;
...
...
@@ -1676,7 +1674,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return
0
;
}
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
timeout
,
bool
pollIfReset
)
{
void
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
timeout
,
bool
pollIfReset
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
...
@@ -1716,18 +1714,18 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
else
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
SMqPollRspWrapper
*
pollRspWrapper
=
(
SMqPollRspWrapper
*
)
rspWrapper
;
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
da
taRsp
.
head
.
epoch
==
consumerEpoch
)
{
if
(
pollRspWrapper
->
me
taRsp
.
head
.
epoch
==
consumerEpoch
)
{
SMqClientVg
*
pVg
=
pollRspWrapper
->
vgHandle
;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg
->
currentOffset
=
pollRspWrapper
->
da
taRsp
.
rspOffset
;
pVg
->
currentOffset
=
pollRspWrapper
->
me
taRsp
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
// build rsp
SMq
RspObj
*
pRsp
=
tmqBuild
RspFromWrapper
(
pollRspWrapper
);
SMq
MetaRspObj
*
pRsp
=
tmqBuildMeta
RspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
}
else
{
tscDebug
(
"msg discard since epoch mismatch: msg epoch %d, consumer epoch %d
\n
"
,
pollRspWrapper
->
da
taRsp
.
head
.
epoch
,
consumerEpoch
);
pollRspWrapper
->
me
taRsp
.
head
.
epoch
,
consumerEpoch
);
taosFreeQitem
(
pollRspWrapper
);
}
}
else
{
...
...
@@ -1744,8 +1742,8 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
SMqRspObj
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
void
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
#if 0
tmqHandleAllDelayedTask(tmq);
...
...
@@ -1873,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return
NULL
;
}
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
const
void
**
raw_meta
,
int32_t
*
raw_meta_len
)
{
int32_t
tmq_get_raw_meta
(
TAOS_RES
*
res
,
void
**
raw_meta
,
int32_t
*
raw_meta_len
)
{
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
*
raw_meta
=
pMetaRspObj
->
metaRsp
.
metaRsp
;
...
...
source/common/src/ttime.c
浏览文件 @
eb86ecfa
...
...
@@ -76,22 +76,28 @@ void deltaToUtcInitOnce() {
static
int64_t
parseFraction
(
char
*
str
,
char
**
end
,
int32_t
timePrec
);
static
int32_t
parseTimeWithTz
(
const
char
*
timestr
,
int64_t
*
time
,
int32_t
timePrec
,
char
delim
);
static
int32_t
parseLocaltime
(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
);
static
int32_t
parseLocaltimeDst
(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
);
static
int32_t
parseLocaltime
(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
,
char
delim
);
static
int32_t
parseLocaltimeDst
(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
,
char
delim
);
static
char
*
forwardToTimeStringEnd
(
char
*
str
);
static
bool
checkTzPresent
(
const
char
*
str
,
int32_t
len
);
static
int32_t
(
*
parseLocaltimeFp
[])(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
)
=
{
parseLocaltime
,
parseLocaltimeDst
};
static
int32_t
(
*
parseLocaltimeFp
[])(
char
*
timestr
,
int32_t
len
,
int64_t
*
utime
,
int32_t
timePrec
,
char
delim
)
=
{
parseLocaltime
,
parseLocaltimeDst
};
int32_t
taosParseTime
(
const
char
*
timestr
,
int64_t
*
utime
,
int32_t
len
,
int32_t
timePrec
,
int8_t
day_light
)
{
/* parse datatime string in with tz */
if
(
strnchr
(
timestr
,
'T'
,
len
,
false
)
!=
NULL
)
{
return
parseTimeWithTz
(
timestr
,
utime
,
timePrec
,
'T'
);
}
else
if
(
checkTzPresent
(
timestr
,
len
))
{
return
parseTimeWithTz
(
timestr
,
utime
,
timePrec
,
0
);
if
(
checkTzPresent
(
timestr
,
len
))
{
return
parseTimeWithTz
(
timestr
,
utime
,
timePrec
,
'T'
);
}
else
{
return
(
*
parseLocaltimeFp
[
day_light
])((
char
*
)
timestr
,
len
,
utime
,
timePrec
,
'T'
);
}
}
else
{
return
(
*
parseLocaltimeFp
[
day_light
])((
char
*
)
timestr
,
len
,
utime
,
timePrec
);
if
(
checkTzPresent
(
timestr
,
len
))
{
return
parseTimeWithTz
(
timestr
,
utime
,
timePrec
,
0
);
}
else
{
return
(
*
parseLocaltimeFp
[
day_light
])((
char
*
)
timestr
,
len
,
utime
,
timePrec
,
0
);
}
}
}
...
...
@@ -333,13 +339,25 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) {
return
true
;
}
int32_t
parseLocaltime
(
char
*
timestr
,
int32_t
len
,
int64_t
*
time
,
int32_t
timePrec
)
{
int32_t
parseLocaltime
(
char
*
timestr
,
int32_t
len
,
int64_t
*
time
,
int32_t
timePrec
,
char
delim
)
{
*
time
=
0
;
struct
tm
tm
=
{
0
};
char
*
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d %H:%M:%S"
,
&
tm
);
char
*
str
;
if
(
delim
==
'T'
)
{
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%dT%H:%M:%S"
,
&
tm
);
}
else
if
(
delim
==
0
)
{
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d %H:%M:%S"
,
&
tm
);
}
else
{
str
=
NULL
;
}
if
(
str
==
NULL
||
(((
str
-
timestr
)
<
len
)
&&
(
*
str
!=
'.'
))
||
!
validateTm
(
&
tm
))
{
return
-
1
;
//if parse failed, try "%Y-%m-%d" format
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d"
,
&
tm
);
if
(
str
==
NULL
||
(((
str
-
timestr
)
<
len
)
&&
(
*
str
!=
'.'
))
||
!
validateTm
(
&
tm
))
{
return
-
1
;
}
}
#ifdef _MSC_VER
...
...
@@ -367,14 +385,26 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr
return
0
;
}
int32_t
parseLocaltimeDst
(
char
*
timestr
,
int32_t
len
,
int64_t
*
time
,
int32_t
timePrec
)
{
int32_t
parseLocaltimeDst
(
char
*
timestr
,
int32_t
len
,
int64_t
*
time
,
int32_t
timePrec
,
char
delim
)
{
*
time
=
0
;
struct
tm
tm
=
{
0
};
tm
.
tm_isdst
=
-
1
;
char
*
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d %H:%M:%S"
,
&
tm
);
char
*
str
;
if
(
delim
==
'T'
)
{
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%dT%H:%M:%S"
,
&
tm
);
}
else
if
(
delim
==
0
)
{
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d %H:%M:%S"
,
&
tm
);
}
else
{
str
=
NULL
;
}
if
(
str
==
NULL
||
(((
str
-
timestr
)
<
len
)
&&
(
*
str
!=
'.'
))
||
!
validateTm
(
&
tm
))
{
return
-
1
;
//if parse failed, try "%Y-%m-%d" format
str
=
taosStrpTime
(
timestr
,
"%Y-%m-%d"
,
&
tm
);
if
(
str
==
NULL
||
(((
str
-
timestr
)
<
len
)
&&
(
*
str
!=
'.'
))
||
!
validateTm
(
&
tm
))
{
return
-
1
;
}
}
/* mktime will be affected by TZ, set by using taos_options */
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
eb86ecfa
...
...
@@ -29,6 +29,7 @@ target_sources(
# sma
"src/sma/sma.c"
"src/sma/smaEnv.c"
"src/sma/smaUtil.c"
"src/sma/smaOpen.c"
"src/sma/smaRollup.c"
"src/sma/smaTimeRange.c"
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
eb86ecfa
...
...
@@ -34,7 +34,8 @@ extern "C" {
typedef
struct
SSmaEnv
SSmaEnv
;
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaStatItem
SSmaStatItem
;
typedef
struct
STSmaStat
STSmaStat
;
typedef
struct
SRSmaStat
SRSmaStat
;
typedef
struct
SSmaKey
SSmaKey
;
typedef
struct
SRSmaInfo
SRSmaInfo
;
typedef
struct
SRSmaInfoItem
SRSmaInfoItem
;
...
...
@@ -45,26 +46,38 @@ struct SSmaEnv {
SSmaStat
*
pStat
;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem)
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
struct
S
SmaStatItem
{
struct
S
TSmaStat
{
int8_t
state
;
// ETsdbSmaStat
STSma
*
pTSma
;
// cache schema
STSchema
*
pTSchema
;
};
struct
SRSmaStat
{
SSma
*
pSma
;
void
*
tmrHandle
;
tmr_h
tmrId
;
int8_t
tmrStat
;
int32_t
tmrSeconds
;
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
struct
SSmaStat
{
union
{
S
SmaStatItem
tsmaStatItem
;
S
HashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
S
TSmaStat
tsmaStat
;
// time-range-wise sma
S
RSmaStat
rsmaStat
;
// rollup sma
};
T_REF_DECLARE
()
};
#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
#define SMA_TSMA_STAT(s) (&(s)->tsmaStat)
#define SMA_RSMA_STAT(s) (&(s)->rsmaStat)
#define SMA_RSMA_INFO_HASH(s) ((s)->rsmaStat.rsmaInfoHash)
#define SMA_RSMA_TMR_HANDLE(s) ((s)->rsmaStat.tmrHandle)
#define SMA_RSMA_TMR_STAT(s) ((s)->rsmaStat.tmrStat)
#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash)
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
...
...
@@ -107,53 +120,51 @@ static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) {
return
0
;
}
static
FORCE_INLINE
int8_t
tdSmaStat
(
S
SmaStatItem
*
pStatItem
)
{
if
(
p
StatItem
)
{
return
atomic_load_8
(
&
p
StatItem
->
state
);
static
FORCE_INLINE
int8_t
tdSmaStat
(
S
TSmaStat
*
pTStat
)
{
if
(
p
TStat
)
{
return
atomic_load_8
(
&
p
TStat
->
state
);
}
return
TSDB_SMA_STAT_UNKNOWN
;
}
static
FORCE_INLINE
bool
tdSmaStatIsOK
(
S
SmaStatItem
*
pStatItem
,
int8_t
*
state
)
{
if
(
!
p
StatItem
)
{
static
FORCE_INLINE
bool
tdSmaStatIsOK
(
S
TSmaStat
*
pTStat
,
int8_t
*
state
)
{
if
(
!
p
TStat
)
{
return
false
;
}
if
(
state
)
{
*
state
=
atomic_load_8
(
&
p
StatItem
->
state
);
*
state
=
atomic_load_8
(
&
p
TStat
->
state
);
return
*
state
==
TSDB_SMA_STAT_OK
;
}
return
atomic_load_8
(
&
p
StatItem
->
state
)
==
TSDB_SMA_STAT_OK
;
return
atomic_load_8
(
&
p
TStat
->
state
)
==
TSDB_SMA_STAT_OK
;
}
static
FORCE_INLINE
bool
tdSmaStatIsExpired
(
S
SmaStatItem
*
pStatItem
)
{
return
p
StatItem
?
(
atomic_load_8
(
&
pStatItem
->
state
)
&
TSDB_SMA_STAT_EXPIRED
)
:
true
;
static
FORCE_INLINE
bool
tdSmaStatIsExpired
(
S
TSmaStat
*
pTStat
)
{
return
p
TStat
?
(
atomic_load_8
(
&
pTStat
->
state
)
&
TSDB_SMA_STAT_EXPIRED
)
:
true
;
}
static
FORCE_INLINE
bool
tdSmaStatIsDropped
(
S
SmaStatItem
*
pStatItem
)
{
return
p
StatItem
?
(
atomic_load_8
(
&
pStatItem
->
state
)
&
TSDB_SMA_STAT_DROPPED
)
:
true
;
static
FORCE_INLINE
bool
tdSmaStatIsDropped
(
S
TSmaStat
*
pTStat
)
{
return
p
TStat
?
(
atomic_load_8
(
&
pTStat
->
state
)
&
TSDB_SMA_STAT_DROPPED
)
:
true
;
}
static
FORCE_INLINE
void
tdSmaStatSetOK
(
S
SmaStatItem
*
pStatItem
)
{
if
(
p
StatItem
)
{
atomic_store_8
(
&
p
StatItem
->
state
,
TSDB_SMA_STAT_OK
);
static
FORCE_INLINE
void
tdSmaStatSetOK
(
S
TSmaStat
*
pTStat
)
{
if
(
p
TStat
)
{
atomic_store_8
(
&
p
TStat
->
state
,
TSDB_SMA_STAT_OK
);
}
}
static
FORCE_INLINE
void
tdSmaStatSetExpired
(
S
SmaStatItem
*
pStatItem
)
{
if
(
p
StatItem
)
{
atomic_or_fetch_8
(
&
p
StatItem
->
state
,
TSDB_SMA_STAT_EXPIRED
);
static
FORCE_INLINE
void
tdSmaStatSetExpired
(
S
TSmaStat
*
pTStat
)
{
if
(
p
TStat
)
{
atomic_or_fetch_8
(
&
p
TStat
->
state
,
TSDB_SMA_STAT_EXPIRED
);
}
}
static
FORCE_INLINE
void
tdSmaStatSetDropped
(
S
SmaStatItem
*
pStatItem
)
{
if
(
p
StatItem
)
{
atomic_or_fetch_8
(
&
p
StatItem
->
state
,
TSDB_SMA_STAT_DROPPED
);
static
FORCE_INLINE
void
tdSmaStatSetDropped
(
S
TSmaStat
*
pTStat
)
{
if
(
p
TStat
)
{
atomic_or_fetch_8
(
&
p
TStat
->
state
,
TSDB_SMA_STAT_DROPPED
);
}
}
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
...
...
@@ -163,6 +174,51 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessTSmaGetDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
typedef
struct
STFInfo
STFInfo
;
typedef
struct
STFile
STFile
;
struct
STFInfo
{
uint32_t
magic
;
uint32_t
ftype
;
uint32_t
fver
;
uint64_t
fsize
;
};
struct
STFile
{
STFInfo
info
;
STfsFile
f
;
TdFilePtr
pFile
;
uint8_t
state
;
};
#define TD_FILE_F(tf) (&((tf)->f))
#define TD_FILE_PFILE(tf) ((tf)->pFile)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname)
#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname)
#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL)
#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf))
#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL)
#define TD_FILE_STATE(tf) ((tf)->state)
#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s))
#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did)
#define TD_FILE_IS_OK(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_OK)
#define TD_FILE_IS_BAD(tf) (TD_FILE_STATE(tf) == TD_FILE_STATE_BAD)
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
);
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
);
int32_t
tdOpenTFile
(
STFile
*
pTFile
,
int
flags
);
int64_t
tdReadTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
);
int64_t
tdSeekTFile
(
STFile
*
pTFile
,
int64_t
offset
,
int
whence
);
int64_t
tdWriteTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
);
int64_t
tdAppendTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
);
int32_t
tdRemoveTFile
(
STFile
*
pTFile
);
int32_t
tdLoadTFileHeader
(
STFile
*
pTFile
,
STFInfo
*
pInfo
);
int32_t
tdUpdateTFileHeader
(
STFile
*
pTFile
);
void
tdUpdateTFileMagic
(
STFile
*
pTFile
,
void
*
pCksm
);
void
tdCloseTFile
(
STFile
*
pTFile
);
void
tdGetVndFileName
(
int32_t
vid
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
eb86ecfa
...
...
@@ -247,7 +247,6 @@ struct SVnode {
struct
STbUidStore
{
tb_uid_t
suid
;
tb_uid_t
uid
;
// TODO: just for debugging, remove when uid provided in SSDataBlock
SArray
*
tbUids
;
SHashObj
*
uidHash
;
};
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
eb86ecfa
...
...
@@ -264,8 +264,8 @@ struct SMCtbCursor {
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
SMCtbCursor
*
pCtbCur
=
NULL
;
SCtbIdxKey
ctbIdxKey
;
int
ret
;
int
c
;
int
ret
=
0
;
int
c
=
0
;
pCtbCur
=
(
SMCtbCursor
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pCtbCur
));
if
(
pCtbCur
==
NULL
)
{
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
eb86ecfa
...
...
@@ -21,9 +21,10 @@ typedef struct SSmaStat SSmaStat;
// declaration of static functions
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
,
const
SSma
*
pSma
);
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
);
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SSmaEnv
**
pEnv
);
static
void
*
tdFreeTSmaStat
(
STSmaStat
*
pStat
);
// implementation
...
...
@@ -45,7 +46,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path)
return
NULL
;
}
if
(
tdInitSmaStat
(
&
SMA_ENV_STAT
(
pEnv
),
smaType
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tdInitSmaStat
(
&
SMA_ENV_STAT
(
pEnv
),
smaType
,
pSma
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeSmaEnv
(
pEnv
);
return
NULL
;
}
...
...
@@ -105,7 +106,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
return
0
;
}
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
)
{
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
,
const
SSma
*
pSma
)
{
ASSERT
(
pSmaStat
!=
NULL
);
if
(
*
pSmaStat
)
{
// no lock
...
...
@@ -125,10 +126,23 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
}
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
SMA_STAT_INFO_HASH
(
*
pSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
SMA_RSMA_STAT
(
*
pSmaStat
)
->
pSma
=
(
SSma
*
)
pSma
;
// init timer
SMA_RSMA_TMR_HANDLE
(
*
pSmaStat
)
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA_G"
);
if
(
!
SMA_RSMA_TMR_HANDLE
(
*
pSmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
atomic_store_8
(
&
SMA_RSMA_TMR_STAT
(
*
pSmaStat
),
TASK_TRIGGER_STATUS__ACTIVE
);
if
(
!
SMA_STAT_INFO_HASH
(
*
pSmaStat
))
{
// init hash
SMA_RSMA_INFO_HASH
(
*
pSmaStat
)
=
taosHashInit
(
RSMA_TASK_INFO_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
SMA_RSMA_INFO_HASH
(
*
pSmaStat
))
{
if
(
SMA_RSMA_TMR_HANDLE
(
*
pSmaStat
))
{
taosTmrCleanUp
(
SMA_RSMA_TMR_HANDLE
(
*
pSmaStat
));
}
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
...
...
@@ -141,16 +155,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
}
void
*
tdFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
)
{
if
(
pS
maStatItem
)
{
tDestroyTSma
(
pS
maStatItem
->
pTSma
);
taosMemoryFreeClear
(
pS
maStatItem
->
pTSma
);
taosMemoryFreeClear
(
pS
maStatItem
);
static
void
*
tdFreeTSmaStat
(
STSmaStat
*
pStat
)
{
if
(
pS
tat
)
{
tDestroyTSma
(
pS
tat
->
pTSma
);
taosMemoryFreeClear
(
pS
tat
->
pTSma
);
taosMemoryFreeClear
(
pS
tat
);
}
return
NULL
;
}
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
tdDestroySmaState
(
pSmaStat
,
smaType
);
taosMemoryFreeClear
(
pSmaStat
);
return
NULL
;
...
...
@@ -165,16 +179,19 @@ void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) {
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
if
(
pSmaStat
)
{
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
tdFree
SmaStatItem
(
&
pSmaStat
->
tsmaStatItem
);
tdFree
TSmaStat
(
&
pSmaStat
->
tsmaStat
);
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
if
(
SMA_RSMA_TMR_HANDLE
(
pSmaStat
))
{
taosTmrCleanUp
(
SMA_RSMA_TMR_HANDLE
(
pSmaStat
));
}
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void
*
infoHash
=
taosHashIterate
(
SMA_
STAT
_INFO_HASH
(
pSmaStat
),
NULL
);
void
*
infoHash
=
taosHashIterate
(
SMA_
RSMA
_INFO_HASH
(
pSmaStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pInfoHash
=
*
(
SRSmaInfo
**
)
infoHash
;
tdFreeRSmaInfo
(
pInfoHash
);
infoHash
=
taosHashIterate
(
SMA_
STAT
_INFO_HASH
(
pSmaStat
),
infoHash
);
infoHash
=
taosHashIterate
(
SMA_
RSMA
_INFO_HASH
(
pSmaStat
),
infoHash
);
}
taosHashCleanup
(
SMA_
STAT
_INFO_HASH
(
pSmaStat
));
taosHashCleanup
(
SMA_
RSMA
_INFO_HASH
(
pSmaStat
));
}
else
{
ASSERT
(
0
);
}
...
...
@@ -273,4 +290,4 @@ void smaTimerCleanUp(void *timer, int8_t *initFlag) {
taosTmrCleanUp
(
timer
);
atomic_store_8
(
initFlag
,
0
);
}
}
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
eb86ecfa
...
...
@@ -137,4 +137,17 @@ int32_t smaClose(SSma *pSma) {
taosMemoryFreeClear
(
pSma
);
}
return
0
;
}
/**
* @brief rsma env restore
*
* @param pSma
* @return int32_t
*/
int32_t
smaRestore
(
SSma
*
pSma
)
{
if
(
!
pSma
)
return
0
;
// iterate all stables to restore the rsma env
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
eb86ecfa
...
...
@@ -16,35 +16,17 @@
#include "sma.h"
#include "tstream.h"
static
FORCE_INLINE
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
FORCE_INLINE
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
rsmaItem
,
tb_uid_t
suid
,
int8_t
level
);
#define SET_RSMA_INFO_ITEM_PARAMS(__idx, __level) \
if (param->qmsg[__idx]) { \
pRSmaInfo->items[__idx].pRsmaInfo = pRSmaInfo; \
pRSmaInfo->items[__idx].taskInfo = qCreateStreamExecTaskInfo(param->qmsg[0], &handle); \
if (!pRSmaInfo->items[__idx].taskInfo) { \
goto _err; \
} \
pRSmaInfo->items[__idx].triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; \
if (param->maxdelay[__idx] < 1) { \
int64_t msInterval = \
convertTimeFromPrecisionToUnit(pRetention[__level].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); \
pRSmaInfo->items[__idx].maxDelay = msInterval; \
} else { \
pRSmaInfo->items[__idx].maxDelay = param->maxdelay[__idx]; \
} \
if (pRSmaInfo->items[__idx].maxDelay > TSDB_MAX_ROLLUP_MAX_DELAY) { \
pRSmaInfo->items[__idx].maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; \
} \
pRSmaInfo->items[__idx].level = TSDB_RETENTION_L##__level; \
pRSmaInfo->items[__idx].tmrHandle = taosTmrInit(10000, 100, 10000, "RSMA"); \
if (!pRSmaInfo->items[__idx].tmrHandle) { \
goto _err; \
} \
}
typedef
enum
{
TD_QTASK_TMP_FILE
=
0
,
TD_QTASK_CUR_FILE
}
TD_QTASK_FILE_T
;
static
const
char
*
tdQTaskInfoFname
[]
=
{
"qtaskinfo.t"
,
"qtaskinfo"
};
static
int32_t
tdUidStorePut
(
STbUidStore
*
pStore
,
tb_uid_t
suid
,
tb_uid_t
*
uid
);
static
int32_t
tdUpdateTbUidListImpl
(
SSma
*
pSma
,
tb_uid_t
*
suid
,
SArray
*
tbUids
);
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaInfo
*
pRSmaInfo
,
SReadHandle
*
handle
,
int8_t
idx
);
static
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
rsmaItem
,
tb_uid_t
suid
,
int8_t
level
);
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
);
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
);
struct
SRSmaInfoItem
{
SRSmaInfo
*
pRsmaInfo
;
...
...
@@ -56,14 +38,6 @@ struct SRSmaInfoItem {
int8_t
triggerStatus
;
// TASK_TRIGGER_STATUS__IN_ACTIVE/TASK_TRIGGER_STATUS__ACTIVE
int32_t
maxDelay
;
};
typedef
struct
{
int64_t
suid
;
SRSmaInfoItem
*
pItem
;
SSma
*
pSma
;
STSchema
*
pTSchema
;
}
SRSmaTriggerParam
;
struct
SRSmaInfo
{
STSchema
*
pTSchema
;
SSma
*
pSma
;
...
...
@@ -81,7 +55,7 @@ static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle) {
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
)
{
if
(
pInfo
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_
MAX
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_
L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
&
pInfo
->
items
[
i
];
if
(
pItem
->
taskInfo
)
{
tdFreeTaskHandle
(
pItem
->
taskInfo
);
...
...
@@ -118,7 +92,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
return
TSDB_CODE_FAILED
;
}
pRSmaInfo
=
taosHashGet
(
SMA_
STAT
_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA
_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
...
...
@@ -187,7 +161,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
infoHash
=
NULL
;
if
(
!
pStat
||
!
(
infoHash
=
SMA_
STAT
_INFO_HASH
(
pStat
)))
{
if
(
!
pStat
||
!
(
infoHash
=
SMA_
RSMA
_INFO_HASH
(
pStat
)))
{
terrno
=
TSDB_CODE_RSMA_INVALID_STAT
;
return
TSDB_CODE_FAILED
;
}
...
...
@@ -213,6 +187,40 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tdSetRSmaInfoItemParams
(
SSma
*
pSma
,
SRSmaParam
*
param
,
SRSmaInfo
*
pRSmaInfo
,
SReadHandle
*
pReadHandle
,
int8_t
idx
)
{
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
STsdbCfg
*
pTsdbCfg
=
SMA_TSDB_CFG
(
pSma
);
if
(
param
->
qmsg
[
idx
])
{
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
pRsmaInfo
=
pRSmaInfo
;
pItem
->
taskInfo
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
0
],
pReadHandle
);
if
(
!
pItem
->
taskInfo
)
{
goto
_err
;
}
pItem
->
triggerStatus
=
TASK_TRIGGER_STATUS__IN_ACTIVE
;
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
int64_t
msInterval
=
convertTimeFromPrecisionToUnit
(
pRetention
[
idx
+
1
].
freq
,
pTsdbCfg
->
precision
,
TIME_UNIT_MILLISECOND
);
pItem
->
maxDelay
=
(
int32_t
)
msInterval
;
}
else
{
pItem
->
maxDelay
=
(
int32_t
)
param
->
maxdelay
[
idx
];
}
if
(
pItem
->
maxDelay
>
TSDB_MAX_ROLLUP_MAX_DELAY
)
{
pItem
->
maxDelay
=
TSDB_MAX_ROLLUP_MAX_DELAY
;
}
pItem
->
level
=
(
idx
==
0
?
TSDB_RETENTION_L1
:
TSDB_RETENTION_L2
);
pItem
->
tmrHandle
=
taosTmrInit
(
10000
,
100
,
10000
,
"RSMA"
);
if
(
!
pItem
->
tmrHandle
)
{
goto
_err
;
}
}
return
TSDB_CODE_SUCCESS
;
_err:
return
TSDB_CODE_FAILED
;
}
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
*
...
...
@@ -246,7 +254,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_
STAT
_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA
_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
));
if
(
pRSmaInfo
)
{
ASSERT
(
0
);
// TODO: free original pRSmaInfo is exists abnormally
smaWarn
(
"vgId:%d, rsma info already exists for stb: %s, %"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
name
,
pReq
->
suid
);
...
...
@@ -282,14 +290,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
pRSmaInfo
->
pSma
=
pSma
;
pRSmaInfo
->
suid
=
pReq
->
suid
;
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
STsdbCfg
*
pTsdbCfg
=
SMA_TSDB_CFG
(
pSma
);
SET_RSMA_INFO_ITEM_PARAMS
(
0
,
1
);
SET_RSMA_INFO_ITEM_PARAMS
(
1
,
2
);
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
0
)
<
0
)
{
goto
_err
;
}
if
(
tdSetRSmaInfoItemParams
(
pSma
,
param
,
pRSmaInfo
,
&
handle
,
1
)
<
0
)
{
goto
_err
;
}
if
(
taosHashPut
(
SMA_STAT_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
!=
TSDB_CODE_SUCCESS
)
{
if
(
taosHashPut
(
SMA_RSMA_INFO_HASH
(
pStat
),
&
pReq
->
suid
,
sizeof
(
tb_uid_t
),
&
pRSmaInfo
,
sizeof
(
pRSmaInfo
))
<
0
)
{
goto
_err
;
}
else
{
smaDebug
(
"vgId:%d, register rsma info succeed for suid:%"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
suid
);
...
...
@@ -418,7 +426,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
if
(
!
pBlock
)
break
;
tdUidStorePut
(
pStore
,
msgIter
.
suid
,
NULL
);
pStore
->
uid
=
msgIter
.
uid
;
// TODO: remove, just for debugging
}
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
...
...
@@ -439,8 +446,9 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
if
(
!
output
)
{
break
;
}
if
(
!
pResult
)
{
pResult
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
pResult
=
taosArrayInit
(
1
,
sizeof
(
SSDataBlock
));
if
(
!
pResult
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
TSDB_CODE_FAILED
;
...
...
@@ -451,7 +459,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
}
if
(
taosArrayGetSize
(
pResult
)
>
0
)
{
#if
0
#if
1
char
flag
[
10
]
=
{
0
};
snprintf
(
flag
,
10
,
"level %"
PRIi8
,
pItem
->
level
);
blockDebugShowData
(
pResult
,
flag
);
...
...
@@ -459,14 +467,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
STsdb
*
sinkTsdb
=
(
pItem
->
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
SSubmitReq
*
pReq
=
NULL
;
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pRSmaInfo
->
pTSchema
,
SMA_VID
(
pSma
),
pRSmaInfo
->
suid
)
<
0
)
{
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_FAILED
;
goto
_err
;
}
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
INT64_MAX
,
pReq
)
<
0
)
{
taosArrayDestroy
(
pResult
);
taosMemoryFreeClear
(
pReq
);
return
TSDB_CODE_FAILED
;
goto
_err
;
}
taosMemoryFreeClear
(
pReq
);
...
...
@@ -479,7 +485,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
}
taosArrayDestroy
(
pResult
);
return
0
;
return
TSDB_CODE_SUCCESS
;
_err:
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_FAILED
;
}
/**
...
...
@@ -488,13 +497,12 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType)
* @param param
* @param tmrId
*/
static
void
rsmaTriggerByTimer
(
void
*
param
,
void
*
tmrId
)
{
// SRSmaTriggerParam *pParam = (SRSmaTriggerParam *)param;
// SRSmaInfoItem *pItem = pParam->pItem;
static
void
tdRSmaFetchTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaInfoItem
*
pItem
=
param
;
if
(
atomic_load_8
(
&
pItem
->
triggerStatus
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
smaTrace
(
"level %"
PRIi8
" status is active for tb suid:%"
PRIi64
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" level %"
PRIi8
" status is active for tb suid:%"
PRIi64
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
(),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
atomic_store_8
(
&
pItem
->
triggerStatus
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
...
...
@@ -502,10 +510,11 @@ static void rsmaTriggerByTimer(void *param, void *tmrId) {
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
}
else
{
smaTrace
(
"level %"
PRIi8
" status is inactive for tb suid:%"
PRIi64
,
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" level %"
PRIi8
" status is inactive for tb suid:%"
PRIi64
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
(),
pItem
->
level
,
pItem
->
pRsmaInfo
->
suid
);
}
// taosTmrReset(
rsmaTriggerByTim
er, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
// taosTmrReset(
tdRSmaFetchTrigg
er, pItem->maxDelay, pItem, pItem->tmrHandle, &pItem->tmrId);
}
static
FORCE_INLINE
int32_t
tdExecuteRSmaImpl
(
SSma
*
pSma
,
const
void
*
pMsg
,
int32_t
inputType
,
SRSmaInfoItem
*
pItem
,
...
...
@@ -518,16 +527,20 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
pItem
->
taskInfo
,
suid
);
// inputType = STREAM_DATA_TYPE_SUBMIT_BLOCK(1)
if
(
qSetStreamInput
(
pItem
->
taskInfo
,
pMsg
,
inputType
,
true
)
<
0
)
{
if
(
qSetStreamInput
(
pItem
->
taskInfo
,
pMsg
,
inputType
,
true
)
<
0
)
{
// STREAM_DATA_TYPE_SUBMIT_BLOCK
smaError
(
"vgId:%d, rsma % "
PRIi8
" qSetStreamInput failed since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
}
// SRSmaTriggerParam triggerParam = {.suid = suid, .pItem = pItem, .pSma = pSma, .pTSchema = pTSchema};
tdFetchAndSubmitRSmaResult
(
pItem
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
atomic_store_8
(
&
pItem
->
triggerStatus
,
TASK_TRIGGER_STATUS__ACTIVE
);
taosTmrReset
(
rsmaTriggerByTimer
,
pItem
->
maxDelay
,
pItem
,
pItem
->
tmrHandle
,
&
pItem
->
tmrId
);
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" process rsma insert"
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
());
SSmaEnv
*
pEnv
=
SMA_RSMA_ENV
(
pSma
);
SRSmaStat
*
pStat
=
SMA_RSMA_STAT
(
pEnv
->
pStat
);
taosTmrStart
(
tdRSmaPersistTrigger
,
5000
,
pStat
,
pStat
->
tmrHandle
);
taosTmrReset
(
tdRSmaFetchTrigger
,
pItem
->
maxDelay
,
pItem
,
pItem
->
tmrHandle
,
&
pItem
->
tmrId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -542,7 +555,7 @@ static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SRSmaInfo
*
pRSmaInfo
=
NULL
;
pRSmaInfo
=
taosHashGet
(
SMA_
STAT
_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
pRSmaInfo
=
taosHashGet
(
SMA_
RSMA
_INFO_HASH
(
pStat
),
&
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaDebug
(
"vgId:%d, return as no rsma info for suid:%"
PRIu64
,
SMA_VID
(
pSma
),
suid
);
...
...
@@ -594,3 +607,106 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
}
return
TSDB_CODE_SUCCESS
;
}
void
tdRSmaQTaskGetFName
(
int32_t
vid
,
int8_t
ftype
,
char
*
outputName
)
{
tdGetVndFileName
(
vid
,
"rsma"
,
tdQTaskInfoFname
[
ftype
],
outputName
);
}
static
void
*
tdRSmaPersistExec
(
void
*
param
)
{
setThreadName
(
"rsma-task-persist"
);
SRSmaStat
*
pRSmaStat
=
param
;
SSma
*
pSma
=
pRSmaStat
->
pSma
;
STfs
*
pTfs
=
pSma
->
pVnode
->
pTfs
;
int64_t
toffset
=
0
;
void
*
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
NULL
);
if
(
!
infoHash
)
{
goto
_end
;
}
STFile
tFile
=
{
0
};
int32_t
vid
=
2
;
char
qTaskInfoFName
[
TSDB_FILENAME_LEN
];
tdRSmaQTaskGetFName
(
vid
,
TD_QTASK_TMP_FILE
,
qTaskInfoFName
);
tdInitTFile
(
&
tFile
,
pTfs
,
qTaskInfoFName
);
tdCreateTFile
(
&
tFile
,
pTfs
,
true
,
-
1
);
while
(
infoHash
)
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
char
*
pOutput
=
NULL
;
int32_t
len
=
0
;
if
(
qSerializeTaskStatus
(
pRSmaInfo
->
items
[
0
].
taskInfo
,
&
pOutput
,
&
len
)
<
0
)
{
smaError
(
"serialize rsma task for table %"
PRIi64
" failed since %s"
,
pRSmaInfo
->
items
[
0
].
pRsmaInfo
->
suid
,
terrstr
(
terrno
));
}
else
{
smaWarn
(
"serialize rsma task for table %"
PRIi64
" success and len is %d"
,
pRSmaInfo
->
items
[
0
].
pRsmaInfo
->
suid
,
len
);
}
tdAppendTFile
(
&
tFile
,
&
len
,
sizeof
(
len
),
&
toffset
);
tdAppendTFile
(
&
tFile
,
pOutput
,
len
,
&
toffset
);
taosMemoryFree
(
pOutput
);
infoHash
=
taosHashIterate
(
RSMA_INFO_HASH
(
pRSmaStat
),
infoHash
);
}
_end:
if
(
tdUpdateTFileHeader
(
&
tFile
)
<
0
)
{
smaError
(
"vgId:%d, failed to update tfile %s header since %s"
,
vid
,
TD_FILE_FULL_NAME
(
&
tFile
),
tstrerror
(
terrno
));
tdCloseTFile
(
&
tFile
);
tdRemoveTFile
(
&
tFile
);
return
NULL
;
}
tdCloseTFile
(
&
tFile
);
char
newFName
[
TSDB_FILENAME_LEN
];
strncpy
(
newFName
,
TD_FILE_FULL_NAME
(
&
tFile
),
TSDB_FILENAME_LEN
);
char
*
pos
=
strstr
(
newFName
,
tdQTaskInfoFname
[
TD_QTASK_TMP_FILE
]);
strncpy
(
pos
,
tdQTaskInfoFname
[
TD_QTASK_CUR_FILE
],
TSDB_FILENAME_LEN
-
POINTER_DISTANCE
(
pos
,
newFName
));
taosRenameFile
(
TD_FILE_FULL_NAME
(
&
tFile
),
newFName
);
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__ACTIVE
);
return
NULL
;
_err:
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__ACTIVE
);
// remove the .tmp file
return
NULL
;
}
static
void
tdRSmaPersistTask
(
SRSmaStat
*
pRSmaStat
)
{
smaWarn
(
"%s:%d entry "
,
__func__
,
__LINE__
);
TdThread
threadId
;
TdThreadAttr
thAttr
;
taosThreadAttrInit
(
&
thAttr
);
taosThreadAttrSetDetachState
(
&
thAttr
,
PTHREAD_CREATE_DETACHED
);
if
(
taosThreadCreate
(
&
threadId
,
&
thAttr
,
tdRSmaPersistExec
,
pRSmaStat
)
!=
0
)
{
smaError
(
"failed to create thread to persist rsma qTaskInfo since %s"
,
strerror
(
errno
));
}
taosThreadAttrDestroy
(
&
thAttr
);
smaWarn
(
"%s:%d end "
,
__func__
,
__LINE__
);
}
/**
* @brief trigger to persist rsma qTaskInfo
*
* @param param
* @param tmrId
*/
static
void
tdRSmaPersistTrigger
(
void
*
param
,
void
*
tmrId
)
{
SRSmaStat
*
pRSmaStat
=
param
;
if
(
atomic_load_8
(
&
pRSmaStat
->
tmrStat
)
==
TASK_TRIGGER_STATUS__ACTIVE
)
{
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" rsma persistence start since active"
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
());
atomic_store_8
(
&
pRSmaStat
->
tmrStat
,
TASK_TRIGGER_STATUS__IN_ACTIVE
);
// execution
tdRSmaPersistTask
(
pRSmaStat
);
}
else
{
smaWarn
(
"%s:%d THREAD:%"
PRIi64
" rsma persistence not start since inactive"
,
__func__
,
__LINE__
,
taosGetSelfPthreadId
());
}
taosTmrReset
(
tdRSmaPersistTrigger
,
3600000
,
pRSmaStat
,
pRSmaStat
->
tmrHandle
,
&
pRSmaStat
->
tmrId
);
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
eb86ecfa
...
...
@@ -129,7 +129,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
SSmaEnv
*
pEnv
=
SMA_TSMA_ENV
(
pSma
);
SSmaStat
*
pStat
=
NULL
;
S
SmaStatItem
*
pItem
=
NULL
;
S
TSmaStat
*
pItem
=
NULL
;
if
(
!
pEnv
||
!
(
pStat
=
SMA_ENV_STAT
(
pEnv
)))
{
terrno
=
TSDB_CODE_TSMA_INVALID_STAT
;
...
...
@@ -137,7 +137,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
}
tdRefSmaStat
(
pSma
,
pStat
);
pItem
=
&
pStat
->
tsmaStat
Item
;
pItem
=
&
pStat
->
tsmaStat
;
ASSERT
(
pItem
);
...
...
source/dnode/vnode/src/sma/smaUtil.c
0 → 100644
浏览文件 @
eb86ecfa
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1
#define TD_FILE_INIT_MAGIC 0xFFFFFFFF
static
int32_t
tdEncodeTFInfo
(
void
**
buf
,
STFInfo
*
pInfo
);
static
void
*
tdDecodeTFInfo
(
void
*
buf
,
STFInfo
*
pInfo
);
static
int32_t
tdEncodeTFInfo
(
void
**
buf
,
STFInfo
*
pInfo
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
magic
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
ftype
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pInfo
->
fver
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pInfo
->
fsize
);
return
tlen
;
}
static
void
*
tdDecodeTFInfo
(
void
*
buf
,
STFInfo
*
pInfo
)
{
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
magic
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
ftype
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pInfo
->
fver
));
buf
=
taosDecodeFixedU64
(
buf
,
&
(
pInfo
->
fsize
));
return
buf
;
}
int64_t
tdWriteTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
int64_t
nwrite
=
taosWriteFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
if
(
nwrite
<
nbyte
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
nwrite
;
}
int64_t
tdSeekTFile
(
STFile
*
pTFile
,
int64_t
offset
,
int
whence
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
int64_t
loffset
=
taosLSeekFile
(
TD_FILE_PFILE
(
pTFile
),
offset
,
whence
);
if
(
loffset
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
loffset
;
}
int64_t
tdReadTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
int64_t
nread
=
taosReadFile
(
pTFile
->
pFile
,
buf
,
nbyte
);
if
(
nread
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
nread
;
}
int32_t
tdUpdateTFileHeader
(
STFile
*
pTFile
)
{
char
buf
[
TD_FILE_HEAD_SIZE
]
=
"
\0
"
;
if
(
tdSeekTFile
(
pTFile
,
0
,
SEEK_SET
)
<
0
)
{
return
-
1
;
}
void
*
ptr
=
buf
;
tdEncodeTFInfo
(
&
ptr
,
&
(
pTFile
->
info
));
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
buf
,
TD_FILE_HEAD_SIZE
);
if
(
tdWriteTFile
(
pTFile
,
buf
,
TD_FILE_HEAD_SIZE
)
<
0
)
{
return
-
1
;
}
return
0
;
}
int32_t
tdLoadTFileHeader
(
STFile
*
pTFile
,
STFInfo
*
pInfo
)
{
char
buf
[
TD_FILE_HEAD_SIZE
]
=
"
\0
"
;
uint32_t
_version
;
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
if
(
tdSeekTFile
(
pTFile
,
0
,
SEEK_SET
)
<
0
)
{
return
-
1
;
}
if
(
tdReadTFile
(
pTFile
,
buf
,
TD_FILE_HEAD_SIZE
)
<
0
)
{
return
-
1
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
buf
,
TD_FILE_HEAD_SIZE
))
{
terrno
=
TSDB_CODE_FILE_CORRUPTED
;
return
-
1
;
}
void
*
pBuf
=
buf
;
pBuf
=
tdDecodeTFInfo
(
pBuf
,
pInfo
);
return
0
;
}
void
tdUpdateTFileMagic
(
STFile
*
pTFile
,
void
*
pCksm
)
{
pTFile
->
info
.
magic
=
taosCalcChecksum
(
pTFile
->
info
.
magic
,
(
uint8_t
*
)(
pCksm
),
sizeof
(
TSCKSUM
));
}
int64_t
tdAppendTFile
(
STFile
*
pTFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
)
{
ASSERT
(
TD_FILE_OPENED
(
pTFile
));
int64_t
toffset
;
if
((
toffset
=
tdSeekTFile
(
pTFile
,
0
,
SEEK_END
))
<
0
)
{
return
-
1
;
}
ASSERT
(
pTFile
->
info
.
fsize
==
toffset
);
if
(
offset
)
{
*
offset
=
toffset
;
}
if
(
tdWriteTFile
(
pTFile
,
buf
,
nbyte
)
<
0
)
{
return
-
1
;
}
pTFile
->
info
.
fsize
+=
nbyte
;
return
nbyte
;
}
int32_t
tdOpenTFile
(
STFile
*
pTFile
,
int
flags
)
{
ASSERT
(
!
TD_FILE_OPENED
(
pTFile
));
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
flags
);
if
(
pTFile
->
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
return
0
;
}
void
tdCloseTFile
(
STFile
*
pTFile
)
{
if
(
TD_FILE_OPENED
(
pTFile
))
{
taosCloseFile
(
&
pTFile
->
pFile
);
TD_FILE_SET_CLOSED
(
pTFile
);
}
}
void
tdGetVndFileName
(
int32_t
vid
,
const
char
*
dname
,
const
char
*
fname
,
char
*
outputName
)
{
snprintf
(
outputName
,
TSDB_FILENAME_LEN
,
"vnode/vnode%d/%s/%s"
,
vid
,
dname
,
fname
);
}
int32_t
tdInitTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
const
char
*
fname
)
{
char
fullname
[
TSDB_FILENAME_LEN
];
SDiskID
did
=
{
0
};
TD_FILE_SET_STATE
(
pTFile
,
TD_FILE_STATE_OK
);
TD_FILE_SET_CLOSED
(
pTFile
);
memset
(
&
(
pTFile
->
info
),
0
,
sizeof
(
pTFile
->
info
));
pTFile
->
info
.
magic
=
TD_FILE_INIT_MAGIC
;
if
(
tfsAllocDisk
(
pTfs
,
0
,
&
did
)
<
0
)
{
terrno
=
TSDB_CODE_NO_AVAIL_DISK
;
return
-
1
;
}
tfsInitFile
(
pTfs
,
&
(
pTFile
->
f
),
did
,
fname
);
return
0
;
}
int32_t
tdCreateTFile
(
STFile
*
pTFile
,
STfs
*
pTfs
,
bool
updateHeader
,
int8_t
fType
)
{
ASSERT
(
pTFile
->
info
.
fsize
==
0
&&
pTFile
->
info
.
magic
==
TD_FILE_INIT_MAGIC
);
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pTFile
->
pFile
==
NULL
)
{
if
(
errno
==
ENOENT
)
{
// Try to create directory recursively
char
*
s
=
strdup
(
TD_FILE_REL_NAME
(
pTFile
));
if
(
tfsMkdirRecurAt
(
pTfs
,
taosDirName
(
s
),
TD_FILE_DID
(
pTFile
))
<
0
)
{
taosMemoryFreeClear
(
s
);
return
-
1
;
}
taosMemoryFreeClear
(
s
);
pTFile
->
pFile
=
taosOpenFile
(
TD_FILE_FULL_NAME
(
pTFile
),
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pTFile
->
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
else
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
if
(
!
updateHeader
)
{
return
0
;
}
pTFile
->
info
.
fsize
+=
TD_FILE_HEAD_SIZE
;
pTFile
->
info
.
fver
=
0
;
if
(
tdUpdateTFileHeader
(
pTFile
)
<
0
)
{
tdCloseTFile
(
pTFile
);
tdRemoveTFile
(
pTFile
);
return
-
1
;
}
return
0
;
}
int32_t
tdRemoveTFile
(
STFile
*
pTFile
)
{
return
tfsRemoveFile
(
TD_FILE_F
(
pTFile
));
}
\ No newline at end of file
source/dnode/vnode/src/tq/tq.c
浏览文件 @
eb86ecfa
...
...
@@ -252,15 +252,17 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
#if 1
if
(
pReq
->
useSnapshot
)
{
tqInfo
(
"retrieve using snapshot"
);
// TODO set ver into snapshot
int64_t
lastVer
=
walGetCommittedVer
(
pTq
->
pWal
);
if
(
rsp
.
reqOffset
<
lastVer
)
{
tqInfo
(
"retrieve using snapshot req offset %ld last ver %ld"
,
rsp
.
reqOffset
,
lastVer
);
tqScanSnapshot
(
pTq
,
&
pHandle
->
execHandle
,
&
rsp
,
workerId
);
if
(
rsp
.
blockNum
!=
0
)
{
rsp
.
withTbName
=
false
;
rsp
.
rspOffset
=
lastVer
;
tqInfo
(
"direct send by snapshot rsp offset %ld"
,
lastVer
);
tqInfo
(
"direct send by snapshot req offset %ld rsp offset %ld"
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
fetchOffset
=
lastVer
;
goto
SEND_RSP
;
}
}
...
...
@@ -304,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pHead
->
msgType
==
TDMT_VND_DROP_STB
||
pHead
->
msgType
==
TDMT_VND_CREATE_TABLE
||
pHead
->
msgType
==
TDMT_VND_ALTER_TABLE
||
pHead
->
msgType
==
TDMT_VND_DROP_TABLE
||
pHead
->
msgType
==
TDMT_VND_DROP_TTL_TABLE
);
// return
tqInfo
(
"fetch meta msg, ver: %ld, type: %d"
,
pHead
->
version
,
pHead
->
msgType
);
SMqMetaRsp
metaRsp
=
{
0
};
metaRsp
.
reqOffset
=
pReq
->
currentOffset
;
metaRsp
.
rspOffset
=
fetchOffset
;
...
...
@@ -387,6 +389,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
epoch
=
-
1
;
pHandle
->
execHandle
.
subType
=
req
.
subType
;
pHandle
->
fetchMeta
=
req
.
withMeta
;
pHandle
->
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
eb86ecfa
...
...
@@ -340,7 +340,7 @@ typedef struct SStreamBlockScanInfo {
SReadHandle
readHandle
;
uint64_t
tableUid
;
// queried super table uid
EStreamScanMode
scanMode
;
SOperatorInfo
*
p
OperatorDumy
;
SOperatorInfo
*
p
SnapshotReadOp
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SArray
*
childIds
;
SessionWindowSupporter
sessionSup
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
eb86ecfa
...
...
@@ -337,7 +337,8 @@ void addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
colDataAppend
(
pColInfoData
,
i
,
data
,
(
data
==
NULL
)
||
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
colDataAppend
(
pColInfoData
,
i
,
data
,
(
data
==
NULL
)
||
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
}
if
(
data
&&
(
pColInfoData
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
)
&&
p
!=
NULL
&&
...
...
@@ -776,7 +777,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
if
(
!
needRead
)
{
return
false
;
}
STableScanInfo
*
pTableScanInfo
=
pInfo
->
p
OperatorDumy
->
info
;
STableScanInfo
*
pTableScanInfo
=
pInfo
->
p
SnapshotReadOp
->
info
;
pTableScanInfo
->
cond
.
twindows
[
0
]
=
win
;
pTableScanInfo
->
curTWinIdx
=
0
;
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
...
...
@@ -821,11 +822,11 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_
static
SSDataBlock
*
doDataScan
(
SStreamBlockScanInfo
*
pInfo
)
{
while
(
1
)
{
SSDataBlock
*
pResult
=
NULL
;
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
if
(
pResult
==
NULL
)
{
if
(
prepareDataScan
(
pInfo
))
{
// scan next window data
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
}
}
if
(
!
pResult
)
{
...
...
@@ -860,11 +861,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
ASSERT
(
pBlock
->
info
.
numOfCols
==
pUpdateBlock
->
info
.
numOfCols
);
int32_t
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
pInfo
->
tsArrayIndex
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
p
OperatorDumy
,
pBlock
,
rowId
);
pInfo
->
groupId
=
getGroupId
(
pInfo
->
p
SnapshotReadOp
,
pBlock
,
rowId
);
int32_t
i
=
0
;
for
(;
i
<
size
;
i
++
)
{
rowId
=
*
(
int32_t
*
)
taosArrayGet
(
pInfo
->
tsArray
,
i
+
pInfo
->
tsArrayIndex
);
uint64_t
id
=
getGroupId
(
pInfo
->
p
OperatorDumy
,
pBlock
,
rowId
);
uint64_t
id
=
getGroupId
(
pInfo
->
p
SnapshotReadOp
,
pBlock
,
rowId
);
if
(
pInfo
->
groupId
!=
id
)
{
break
;
}
...
...
@@ -1063,7 +1064,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
return
(
pBlockInfo
->
rows
==
0
)
?
NULL
:
pInfo
->
pRes
;
}
else
if
(
pInfo
->
blockType
==
STREAM_DATA_TYPE_FROM_SNAPSHOT
)
{
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
p
OperatorDumy
);
SSDataBlock
*
pResult
=
doTableScan
(
pInfo
->
p
SnapshotReadOp
);
if
(
pResult
)
{
return
pResult
->
info
.
rows
>
0
?
pResult
:
NULL
;
}
...
...
@@ -1135,7 +1136,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
}
else
{
pInfo
->
pUpdateInfo
=
NULL
;
}
pInfo
->
p
OperatorDumy
=
pTableScanDummy
;
pInfo
->
p
SnapshotReadOp
=
pTableScanDummy
;
pInfo
->
interval
=
pSTInfo
->
interval
;
pInfo
->
readHandle
=
*
pHandle
;
...
...
@@ -1557,7 +1558,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
int32_t
msgType
=
(
strcasecmp
(
name
,
TSDB_INS_TABLE_DNODE_VARIABLES
)
==
0
)
?
TDMT_DND_SYSTABLE_RETRIEVE
:
TDMT_MND_SYSTABLE_RETRIEVE
;
int32_t
msgType
=
(
strcasecmp
(
name
,
TSDB_INS_TABLE_DNODE_VARIABLES
)
==
0
)
?
TDMT_DND_SYSTABLE_RETRIEVE
:
TDMT_MND_SYSTABLE_RETRIEVE
;
pMsgSendInfo
->
param
=
pOperator
;
pMsgSendInfo
->
msgInfo
.
pData
=
buf1
;
...
...
@@ -1843,7 +1845,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
else
{
data
=
(
char
*
)
p
;
}
colDataAppend
(
pDst
,
count
,
data
,
(
data
==
NULL
)
||
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
colDataAppend
(
pDst
,
count
,
data
,
(
data
==
NULL
)
||
(
pDst
->
info
.
type
==
TSDB_DATA_TYPE_JSON
&&
tTagIsJsonNull
(
data
)));
if
(
pDst
->
info
.
type
!=
TSDB_DATA_TYPE_JSON
&&
p
!=
NULL
&&
IS_VAR_DATA_TYPE
(((
const
STagVal
*
)
p
)
->
type
)
&&
data
!=
NULL
)
{
...
...
@@ -1896,11 +1899,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto
_error
;
}
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pInfo
->
pTableList
=
pTableListInfo
;
pInfo
->
pColMatchInfo
=
colList
;
pInfo
->
pRes
=
createResDataBlock
(
pDescNode
);
pInfo
->
readHandle
=
*
pReadHandle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
eb86ecfa
...
...
@@ -2238,7 +2238,8 @@ static void clearUpdateDataBlock(SSDataBlock* pBlock) {
}
void
copyUpdateDataBlock
(
SSDataBlock
*
pDest
,
SSDataBlock
*
pSource
,
int32_t
tsColIndex
)
{
ASSERT
(
pDest
->
info
.
capacity
>=
pSource
->
info
.
rows
);
// ASSERT(pDest->info.capacity >= pSource->info.rows);
blockDataEnsureCapacity
(
pDest
,
pSource
->
info
.
rows
);
clearUpdateDataBlock
(
pDest
);
SColumnInfoData
*
pDestCol
=
taosArrayGet
(
pDest
->
pDataBlock
,
0
);
SColumnInfoData
*
pSourceCol
=
taosArrayGet
(
pSource
->
pDataBlock
,
tsColIndex
);
...
...
source/util/src/terror.c
浏览文件 @
eb86ecfa
...
...
@@ -76,6 +76,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR
(
TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE
,
"Out of memory in rpc queue"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INVALID_TIMESTAMP
,
"Invalid timestamp format"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MSG_DECODE_ERROR
,
"Msg decode error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NO_AVAIL_DISK
,
"No available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
...
...
@@ -356,7 +357,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, "No table data in memo
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_FILE_ALREADY_EXISTS
,
"File already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TABLE_RECONFIGURE
,
"Need to reconfigure table"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO
,
"Invalid information to create table"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_AVAIL_DISK
,
"
N
o available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_AVAIL_DISK
,
"
TSDB n
o available disk"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_MESSED_MSG
,
"TSDB messed message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_IVLD_TAG_VAL
,
"TSDB invalid tag value"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录