Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cb7d0c22
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cb7d0c22
编写于
6月 14, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: tsma insert and query
上级
b6af6292
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
304 addition
and
391 deletion
+304
-391
include/common/tmsg.h
include/common/tmsg.h
+4
-3
include/util/taoserror.h
include/util/taoserror.h
+9
-6
source/common/src/tmsg.c
source/common/src/tmsg.c
+20
-8
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-1
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+12
-22
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+3
-2
source/dnode/vnode/src/meta/metaEntry.c
source/dnode/vnode/src/meta/metaEntry.c
+1
-1
source/dnode/vnode/src/sma/sma.c
source/dnode/vnode/src/sma/sma.c
+206
-0
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+22
-153
source/dnode/vnode/src/sma/smaOpen.c
source/dnode/vnode/src/sma/smaOpen.c
+3
-1
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+1
-1
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+16
-16
source/dnode/vnode/src/sma/smaTimeRange2.c
source/dnode/vnode/src/sma/smaTimeRange2.c
+0
-170
source/dnode/vnode/src/tq/tqSink.c
source/dnode/vnode/src/tq/tqSink.c
+1
-4
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+3
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
cb7d0c22
...
...
@@ -2427,6 +2427,7 @@ typedef struct {
static
FORCE_INLINE
void
tDestroyTSma
(
STSma
*
pSma
)
{
if
(
pSma
)
{
taosMemoryFreeClear
(
pSma
->
dstTbName
);
taosMemoryFreeClear
(
pSma
->
expr
);
taosMemoryFreeClear
(
pSma
->
tagsFilter
);
}
...
...
@@ -2455,7 +2456,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq);
int32_t
tDecodeSVCreateTSmaReq
(
SDecoder
*
pCoder
,
SVCreateTSmaReq
*
pReq
);
int32_t
tEncodeTSma
(
SEncoder
*
pCoder
,
const
STSma
*
pSma
);
int32_t
tDecodeTSma
(
SDecoder
*
pCoder
,
STSma
*
pSma
);
int32_t
tDecodeTSma
(
SDecoder
*
pCoder
,
STSma
*
pSma
,
bool
deepCopy
);
static
int32_t
tEncodeTSmaWrapper
(
SEncoder
*
pEncoder
,
const
STSmaWrapper
*
pReq
)
{
if
(
tEncodeI32
(
pEncoder
,
pReq
->
number
)
<
0
)
return
-
1
;
...
...
@@ -2465,10 +2466,10 @@ static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq)
return
0
;
}
static
int32_t
tDecodeTSmaWrapper
(
SDecoder
*
pDecoder
,
STSmaWrapper
*
pReq
)
{
static
int32_t
tDecodeTSmaWrapper
(
SDecoder
*
pDecoder
,
STSmaWrapper
*
pReq
,
bool
deepCopy
)
{
if
(
tDecodeI32
(
pDecoder
,
&
pReq
->
number
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
number
;
++
i
)
{
tDecodeTSma
(
pDecoder
,
pReq
->
tSma
+
i
);
tDecodeTSma
(
pDecoder
,
pReq
->
tSma
+
i
,
deepCopy
);
}
return
0
;
}
...
...
include/util/taoserror.h
浏览文件 @
cb7d0c22
...
...
@@ -684,12 +684,15 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
//tsma
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100)
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101)
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102)
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105)
#define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100)
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3101)
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3102)
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3105)
#define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3106)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3107)
//rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
...
...
source/common/src/tmsg.c
浏览文件 @
cb7d0c22
...
...
@@ -2653,7 +2653,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
}
int32_t
numOfIndex
=
taosArrayGetSize
(
pRsp
->
pIndexRsp
);
if
(
tEncodeI32
(
&
encoder
,
numOfIndex
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
numOfIndex
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
numOfIndex
;
++
i
)
{
STableIndexRsp
*
pIndexRsp
=
taosArrayGet
(
pRsp
->
pIndexRsp
,
i
);
if
(
tEncodeCStr
(
&
encoder
,
pIndexRsp
->
tbName
)
<
0
)
return
-
1
;
...
...
@@ -2738,7 +2738,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
}
taosArrayPush
(
pRsp
->
pIndexRsp
,
&
tableIndexRsp
);
}
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
...
...
@@ -4000,7 +4000,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
return
0
;
}
int32_t
tDecodeTSma
(
SDecoder
*
pCoder
,
STSma
*
pSma
)
{
int32_t
tDecodeTSma
(
SDecoder
*
pCoder
,
STSma
*
pSma
,
bool
deepCopy
)
{
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
slidingUnit
)
<
0
)
return
-
1
;
...
...
@@ -4012,17 +4012,30 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
indexUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
tableUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
dstTbName
)
<
0
)
return
-
1
;
if
(
deepCopy
)
{
if
(
tDecodeCStrAlloc
(
pCoder
,
&
pSma
->
dstTbName
)
<
0
)
return
-
1
;
}
else
{
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
dstTbName
)
<
0
)
return
-
1
;
}
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
interval
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
offset
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
sliding
)
<
0
)
return
-
1
;
if
(
pSma
->
exprLen
>
0
)
{
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
expr
)
<
0
)
return
-
1
;
if
(
deepCopy
)
{
if
(
tDecodeCStrAlloc
(
pCoder
,
&
pSma
->
expr
)
<
0
)
return
-
1
;
}
else
{
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
expr
)
<
0
)
return
-
1
;
}
}
else
{
pSma
->
expr
=
NULL
;
}
if
(
pSma
->
tagsFilterLen
>
0
)
{
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
tagsFilter
)
<
0
)
return
-
1
;
if
(
deepCopy
)
{
if
(
tDecodeCStrAlloc
(
pCoder
,
&
pSma
->
tagsFilter
)
<
0
)
return
-
1
;
}
else
{
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
tagsFilter
)
<
0
)
return
-
1
;
}
}
else
{
pSma
->
tagsFilter
=
NULL
;
}
...
...
@@ -4045,7 +4058,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder *pCoder, const SVCreateTSmaReq *pReq) {
int32_t
tDecodeSVCreateTSmaReq
(
SDecoder
*
pCoder
,
SVCreateTSmaReq
*
pReq
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
tDecodeTSma
(
pCoder
,
pReq
);
tDecodeTSma
(
pCoder
,
pReq
,
false
);
tEndDecode
(
pCoder
);
return
0
;
...
...
@@ -4879,4 +4892,3 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
if
(
tDecodeCStrTo
(
pDecoder
,
pOffset
->
subKey
)
<
0
)
return
-
1
;
return
0
;
}
source/dnode/vnode/CMakeLists.txt
浏览文件 @
cb7d0c22
...
...
@@ -31,7 +31,7 @@ target_sources(
"src/sma/smaEnv.c"
"src/sma/smaOpen.c"
"src/sma/smaRollup.c"
"src/sma/smaTimeRange
2
.c"
"src/sma/smaTimeRange.c"
# tsdb
"src/tsdb/tsdbCommit.c"
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
cb7d0c22
...
...
@@ -38,8 +38,6 @@ typedef struct SSmaStatItem SSmaStatItem;
typedef
struct
SSmaKey
SSmaKey
;
typedef
struct
SRSmaInfo
SRSmaInfo
;
#define SMA_IVLD_FID INT_MIN
struct
SSmaEnv
{
TdThreadRwlock
lock
;
int8_t
type
;
...
...
@@ -49,45 +47,38 @@ struct SSmaEnv {
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEM
S(env) ((env)->pStat->smaStatItems
)
#define SMA_ENV_STAT_ITEM
(env) ((env)->pStat->tsmaStatItem
)
struct
SSmaStatItem
{
int8_t
state
;
// ETsdbSmaStat
STSma
*
pTSma
;
// cache schema
int8_t
state
;
// ETsdbSmaStat
STSma
*
pTSma
;
// cache schema
STSchema
*
pTSchema
;
};
struct
SSmaStat
{
union
{
S
HashObj
*
smaStatItems
;
// key: indexUid, value: SSmaStatItem for tsma
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
S
SmaStatItem
tsmaStatItem
;
SHashObj
*
rsmaInfoHash
;
// key: stbUid, value: SRSmaInfo;
};
T_REF_DECLARE
()
};
#define SMA_STAT_ITEM
S(s) ((s)->smaStatItems
)
#define SMA_STAT_ITEM
(s) ((s)->tsmaStatItem
)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
#if 0
int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result);
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
#endif
int32_t
tdInitSma
(
SSma
*
pSma
);
int32_t
tdDropTSma
(
SSma
*
pSma
,
char
*
pMsg
);
int32_t
tdDropTSmaData
(
SSma
*
pSma
,
int64_t
indexUid
);
int32_t
tdInsertRSmaData
(
SSma
*
pSma
,
char
*
msg
);
int32_t
tdRefSmaStat
(
SSma
*
pSma
,
SSmaStat
*
pStat
);
int32_t
tdUnRefSmaStat
(
SSma
*
pSma
,
SSmaStat
*
pStat
);
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
bool
onlyCheck
);
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
);
int32_t
tdLockSma
(
SSma
*
pSma
);
int32_t
tdUnLockSma
(
SSma
*
pSma
);
static
FORCE_INLINE
int16_t
tdTSmaAdd
(
SSma
*
pSma
,
int16_t
n
)
{
return
atomic_add_fetch_16
(
&
SMA_TSMA_NUM
(
pSma
),
n
);
}
static
FORCE_INLINE
int16_t
tdTSmaSub
(
SSma
*
pSma
,
int16_t
n
)
{
return
atomic_sub_fetch_16
(
&
SMA_TSMA_NUM
(
pSma
),
n
);
}
static
FORCE_INLINE
int32_t
tdRLockSmaEnv
(
SSmaEnv
*
pEnv
)
{
int
code
=
taosThreadRwlockRdlock
(
&
(
pEnv
->
lock
));
if
(
code
!=
0
)
{
...
...
@@ -160,11 +151,10 @@ static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) {
}
}
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
);
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
,
SSmaEnv
**
pEnv
);
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
);
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
cb7d0c22
...
...
@@ -147,6 +147,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
int32_t
tqProcessTaskDispatchRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverRsp
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
);
// sma
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaClose
(
SSma
*
pSma
);
...
...
@@ -245,7 +248,6 @@ struct STbUidStore {
};
struct
SSma
{
int16_t
nTSma
;
bool
locked
;
TdThreadMutex
mutex
;
SVnode
*
pVnode
;
...
...
@@ -261,7 +263,6 @@ struct SSma {
#define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode)
#define SMA_TFS(s) ((s)->pVnode->pTfs)
#define SMA_TSMA_NUM(s) ((s)->nTSma)
#define SMA_TSMA_ENV(s) ((s)->pTSmaEnv)
#define SMA_RSMA_ENV(s) ((s)->pRSmaEnv)
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
...
...
source/dnode/vnode/src/meta/metaEntry.c
浏览文件 @
cb7d0c22
...
...
@@ -75,7 +75,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
tDecodeTSma
(
pCoder
,
pME
->
smaEntry
.
tsma
)
<
0
)
return
-
1
;
if
(
tDecodeTSma
(
pCoder
,
pME
->
smaEntry
.
tsma
,
true
)
<
0
)
return
-
1
;
}
else
{
ASSERT
(
0
);
}
...
...
source/dnode/vnode/src/sma/sma.c
浏览文件 @
cb7d0c22
...
...
@@ -44,3 +44,209 @@ int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t*
smaDebug
(
"vgId:%d, get tsma days %d"
,
pCfg
->
vgId
,
*
days
);
return
code
;
}
#if 0
/**
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlocks
* @param vgId
* @param suid // TODO: check with Liao whether suid response is reasonable
*
* TODO: colId should be set
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t suid, const char* stbName, bool isCreateCtb) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols));
bufSize += sizeof(SSubmitBlk);
}
*pReq = taosMemoryCalloc(1, bufSize);
if (!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
void* pDataBuf = *pReq;
SArray* pTagArray = NULL;
int32_t msgLen = sizeof(SSubmitReq);
int32_t numOfBlks = 0;
int32_t schemaLen = 0;
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
SDataBlockInfo* pDataBlkInfo = &pDataBlock->info;
int32_t colNum = pDataBlkInfo->numOfCols;
int32_t rows = pDataBlkInfo->rows;
int32_t rowSize = pDataBlkInfo->rowSize;
int64_t groupId = pDataBlkInfo->groupId;
if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
}
if(isCreateCtb) {
SMetaReader mr = {0};
const char* ctbName = buildCtbNameByGroupId(stbName, pDataBlock->info.groupId);
if (metaGetTableEntryByName(&mr, ctbName) != 0) {
smaDebug("vgId:%d, no tsma ctb %s exists", vgId, ctbName);
}
SVCreateTbReq ctbReq = {0};
ctbReq.name = ctbName;
ctbReq.type = TSDB_CHILD_TABLE;
ctbReq.ctb.suid = suid;
STagVal tagVal = {.cid = colNum + PRIMARYKEY_TIMESTAMP_COL_ID,
.type = TSDB_DATA_TYPE_BIGINT,
.i64 = groupId};
STag* pTag = NULL;
if(!pTagArray) {
pTagArray = taosArrayInit(1, sizeof(STagVal));
if (!pTagArray) goto _err;
}
taosArrayClear(pTagArray);
taosArrayPush(pTagArray, &tagVal);
tTagNew(pTagArray, 1, false, &pTag);
if (pTag == NULL) {
tdDestroySVCreateTbReq(&ctbReq);
goto _err;
}
ctbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &ctbReq, schemaLen, code);
tdDestroySVCreateTbReq(&ctbReq);
if (code < 0) {
goto _err;
}
}
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
pSubmitBlk->suid = suid;
pSubmitBlk->uid = groupId;
pSubmitBlk->numOfRows = rows;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
bool isStartKey = false;
int32_t offset = 0;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
STColumn* pCol = &pTSchema->columns[k];
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) {
isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true,
offset, k);
} else {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var,
true, offset, k);
}
break;
case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true,
offset, k);
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
TASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (pCol->type == pColInfoData->info.type) {
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset,
k);
} else {
char tv[8] = {0};
if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = 0;
GET_TYPED_DATA(v, float, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) {
double v = 0;
GET_TYPED_DATA(v, double, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) {
int64_t v = 0;
GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
} else {
uint64_t v = 0;
GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var);
SET_TYPED_DATA(&tv, pCol->type, v);
}
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset,
k);
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0);
}
break;
}
offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation
}
dataLen += TD_ROW_LEN(rb.pBuf);
#ifdef TD_DEBUG_PRINT_ROW
tdSRowPrint(rb.pBuf, pTSchema, __func__);
#endif
}
++numOfBlks;
pSubmitBlk->dataLen = dataLen;
msgLen += pSubmitBlk->dataLen;
}
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
return TSDB_CODE_SUCCESS;
_err:
taosMemoryFreeClear(*pReq);
taosArrayDestroy(pTagArray);
return TSDB_CODE_FAILED;
}
#endif
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
cb7d0c22
...
...
@@ -17,123 +17,17 @@
typedef
struct
SSmaStat
SSmaStat
;
static
const
char
*
TSDB_SMA_DNAME
[]
=
{
""
,
// TSDB_SMA_TYPE_BLOCK
"tsma"
,
// TSDB_SMA_TYPE_TIME_RANGE
"rsma"
,
// TSDB_SMA_TYPE_ROLLUP
};
#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test
#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test
#define SMA_STATE_HASH_SLOT 4
#define RSMA_TASK_INFO_HASH_SLOT 8
typedef
struct
SPoolMem
{
int64_t
size
;
struct
SPoolMem
*
prev
;
struct
SPoolMem
*
next
;
}
SPoolMem
;
// declaration of static functions
// insert data
static
void
tdGetSmaDir
(
int32_t
vgId
,
ETsdbSmaType
smaType
,
char
dirName
[]);
// Pool Memory
static
SPoolMem
*
openPool
();
static
void
clearPool
(
SPoolMem
*
pPool
);
static
void
closePool
(
SPoolMem
*
pPool
);
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
);
static
void
poolFree
(
void
*
arg
,
void
*
ptr
);
static
int32_t
tdInitSmaStat
(
SSmaStat
**
pSmaStat
,
int8_t
smaType
);
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
);
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SSmaEnv
**
pEnv
);
// implementation
static
SPoolMem
*
openPool
()
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
taosMemoryMalloc
(
sizeof
(
*
pPool
));
pPool
->
prev
=
pPool
->
next
=
pPool
;
pPool
->
size
=
0
;
return
pPool
;
}
static
void
clearPool
(
SPoolMem
*
pPool
)
{
if
(
!
pPool
)
return
;
SPoolMem
*
pMem
;
do
{
pMem
=
pPool
->
next
;
if
(
pMem
==
pPool
)
break
;
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
taosMemoryFree
(
pMem
);
}
while
(
1
);
assert
(
pPool
->
size
==
0
);
}
static
void
closePool
(
SPoolMem
*
pPool
)
{
if
(
pPool
)
{
clearPool
(
pPool
);
taosMemoryFree
(
pPool
);
}
}
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
)
{
void
*
ptr
=
NULL
;
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
(
SPoolMem
*
)
taosMemoryMalloc
(
sizeof
(
*
pMem
)
+
size
);
if
(
!
pMem
)
{
assert
(
0
);
}
pMem
->
size
=
sizeof
(
*
pMem
)
+
size
;
pMem
->
next
=
pPool
->
next
;
pMem
->
prev
=
pPool
;
pPool
->
next
->
prev
=
pMem
;
pPool
->
next
=
pMem
;
pPool
->
size
+=
pMem
->
size
;
ptr
=
(
void
*
)(
&
pMem
[
1
]);
return
ptr
;
}
static
void
poolFree
(
void
*
arg
,
void
*
ptr
)
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
&
(((
SPoolMem
*
)
ptr
)[
-
1
]);
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
taosMemoryFree
(
pMem
);
}
int32_t
tdInitSma
(
SSma
*
pSma
)
{
int32_t
numOfTSma
=
taosArrayGetSize
(
metaGetSmaTbUids
(
SMA_META
(
pSma
)));
if
(
numOfTSma
>
0
)
{
atomic_store_16
(
&
SMA_TSMA_NUM
(
pSma
),
(
int16_t
)
numOfTSma
);
}
return
TSDB_CODE_SUCCESS
;
}
static
void
tdGetSmaDir
(
int32_t
vgId
,
ETsdbSmaType
smaType
,
char
dirName
[])
{
snprintf
(
dirName
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d%s%s"
,
TD_DIRSEP
,
vgId
,
TD_DIRSEP
,
TSDB_SMA_DNAME
[
smaType
]);
}
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SDiskID
did
)
{
static
SSmaEnv
*
tdNewSmaEnv
(
const
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
)
{
SSmaEnv
*
pEnv
=
NULL
;
pEnv
=
(
SSmaEnv
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSmaEnv
));
...
...
@@ -156,18 +50,17 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path,
return
NULL
;
}
return
pEnv
;
}
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
S
DiskID
did
,
S
SmaEnv
**
pEnv
)
{
static
int32_t
tdInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
const
char
*
path
,
SSmaEnv
**
pEnv
)
{
if
(
!
pEnv
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
TSDB_CODE_FAILED
;
}
if
(
!
(
*
pEnv
))
{
if
(
!
(
*
pEnv
=
tdNewSmaEnv
(
pSma
,
smaType
,
path
,
did
)))
{
if
(
!
(
*
pEnv
=
tdNewSmaEnv
(
pSma
,
smaType
,
path
)))
{
return
TSDB_CODE_FAILED
;
}
}
...
...
@@ -183,15 +76,16 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskI
*/
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
if
(
pSmaEnv
)
{
tdDestroySmaState
(
pSmaEnv
->
pStat
,
SMA_ENV_TYPE
(
pSmaEnv
));
taosMemoryFreeClear
(
pSmaEnv
->
pStat
);
pSmaEnv
->
pStat
=
tdFreeSmaState
(
pSmaEnv
->
pStat
,
SMA_ENV_TYPE
(
pSmaEnv
));
taosThreadRwlockDestroy
(
&
(
pSmaEnv
->
lock
));
}
}
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
)
{
tdDestroySmaEnv
(
pSmaEnv
);
taosMemoryFreeClear
(
pSmaEnv
);
if
(
pSmaEnv
)
{
tdDestroySmaEnv
(
pSmaEnv
);
taosMemoryFreeClear
(
pSmaEnv
);
}
return
NULL
;
}
...
...
@@ -239,13 +133,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
return
TSDB_CODE_FAILED
;
}
}
else
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
SMA_STAT_ITEMS
(
*
pSmaStat
)
=
taosHashInit
(
SMA_STATE_HASH_SLOT
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
if
(
!
SMA_STAT_ITEMS
(
*
pSmaStat
))
{
taosMemoryFreeClear
(
*
pSmaStat
);
return
TSDB_CODE_FAILED
;
}
// TODO
}
else
{
ASSERT
(
0
);
}
...
...
@@ -262,6 +150,12 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
return
NULL
;
}
void
*
tdFreeSmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
tdDestroySmaState
(
pSmaStat
,
smaType
);
taosMemoryFreeClear
(
pSmaStat
);
return
NULL
;
}
/**
* @brief Release resources allocated for its member fields, not including itself.
*
...
...
@@ -270,16 +164,10 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
*/
int32_t
tdDestroySmaState
(
SSmaStat
*
pSmaStat
,
int8_t
smaType
)
{
if
(
pSmaStat
)
{
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
if
(
smaType
==
TSDB_SMA_TYPE_TIME_RANGE
)
{
void
*
item
=
taosHashIterate
(
SMA_STAT_ITEMS
(
pSmaStat
),
NULL
);
while
(
item
)
{
SSmaStatItem
*
pItem
=
*
(
SSmaStatItem
**
)
item
;
tdFreeSmaStatItem
(
pItem
);
item
=
taosHashIterate
(
SMA_STAT_ITEMS
(
pSmaStat
),
item
);
}
taosHashCleanup
(
SMA_STAT_ITEMS
(
pSmaStat
));
tdFreeSmaStatItem
(
&
pSmaStat
->
tsmaStatItem
);
}
else
if
(
smaType
==
TSDB_SMA_TYPE_ROLLUP
)
{
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
void
*
infoHash
=
taosHashIterate
(
SMA_STAT_INFO_HASH
(
pSmaStat
),
NULL
);
while
(
infoHash
)
{
SRSmaInfo
*
pInfoHash
=
*
(
SRSmaInfo
**
)
infoHash
;
...
...
@@ -317,7 +205,7 @@ int32_t tdUnLockSma(SSma *pSma) {
return
0
;
}
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
bool
onlyCheck
)
{
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
)
{
SSmaEnv
*
pEnv
=
NULL
;
// return if already init
...
...
@@ -344,26 +232,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) {
if
(
!
pEnv
)
{
char
rname
[
TSDB_FILENAME_LEN
]
=
{
0
};
SDiskID
did
=
{
0
};
if
(
tfsAllocDisk
(
SMA_TFS
(
pSma
),
TFS_PRIMARY_LEVEL
,
&
did
)
<
0
)
{
tdUnLockSma
(
pSma
);
return
TSDB_CODE_FAILED
;
}
if
(
did
.
level
<
0
||
did
.
id
<
0
)
{
tdUnLockSma
(
pSma
);
smaError
(
"vgId:%d, init sma env failed since invalid did(%d,%d)"
,
SMA_VID
(
pSma
),
did
.
level
,
did
.
id
);
return
TSDB_CODE_FAILED
;
}
tdGetSmaDir
(
SMA_VID
(
pSma
),
smaType
,
rname
);
if
(
tfsMkdirRecurAt
(
SMA_TFS
(
pSma
),
rname
,
did
)
<
0
)
{
tdUnLockSma
(
pSma
);
return
TSDB_CODE_FAILED
;
}
if
(
tdInitSmaEnv
(
pSma
,
smaType
,
rname
,
did
,
&
pEnv
)
<
0
)
{
if
(
tdInitSmaEnv
(
pSma
,
smaType
,
rname
,
&
pEnv
)
<
0
)
{
tdUnLockSma
(
pSma
);
return
TSDB_CODE_FAILED
;
}
...
...
source/dnode/vnode/src/sma/smaOpen.c
浏览文件 @
cb7d0c22
...
...
@@ -132,7 +132,9 @@ int32_t smaClose(SSma *pSma) {
if
SMA_RSMA_TSDB0
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB0
(
pSma
));
if
SMA_RSMA_TSDB1
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB1
(
pSma
));
if
SMA_RSMA_TSDB2
(
pSma
)
tsdbClose
(
&
SMA_RSMA_TSDB2
(
pSma
));
taosMemoryFree
(
pSma
);
// SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma));
// SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma));
taosMemoryFreeClear
(
pSma
);
}
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
cb7d0c22
...
...
@@ -181,7 +181,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
return
TSDB_CODE_SUCCESS
;
}
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_ROLLUP
,
false
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_ROLLUP
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
...
...
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
cb7d0c22
...
...
@@ -142,7 +142,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
ASSERT
(
pItem
);
if
(
!
pItem
->
pTSma
)
{
// cache smaMeta
STSma
*
pTSma
=
metaGetSmaInfoByIndex
(
SMA_META
(
pSma
),
indexUid
);
if
(
!
pTSma
)
{
terrno
=
TSDB_CODE_TSMA_NO_INDEX_IN_META
;
...
...
@@ -150,27 +149,28 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
return
TSDB_CODE_FAILED
;
}
pItem
->
pTSma
=
pTSma
;
pItem
->
pTSchema
=
metaGetTbTSchema
(
SMA_META
(
pSma
),
pTSma
->
dstTbUid
,
-
1
);
ASSERT
(
pItem
->
pTSchema
);
// TODO
}
STSma
*
pTSma
=
pItem
->
pTSma
;
ASSERT
(
pItem
->
pTSma
->
indexUid
==
indexUid
)
;
ASSERT
(
pTSma
->
indexUid
==
indexUid
)
;
SSubmitReq
*
pSubmitReq
=
NULL
;
SMetaReader
mr
=
{
0
};
pSubmitReq
=
tdBlockToSubmit
((
const
SArray
*
)
msg
,
pItem
->
pTSchema
,
true
,
pItem
->
pTSma
->
dstTbUid
,
pItem
->
pTSma
->
dstTbName
,
pItem
->
pTSma
->
dstVgId
);
const
char
*
dbName
=
"testDb"
;
if
(
metaGetTableEntryByName
(
&
mr
,
dbName
)
!=
0
)
{
smaDebug
(
"vgId:%d, tsma no table testTb exists for smaIndex %"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
indexUid
,
tstrerror
(
terrno
));
SVCreateStbReq
pReq
=
{
0
};
pReq
.
name
=
dbName
;
pReq
.
suid
=
pTSma
->
dstTbUid
;
pReq
.
schemaRow
=
pCfg
->
schemaRow
;
pReq
.
schemaTag
=
pCfg
->
schemaTag
;
}
ASSERT
(
pSubmitReq
);
// TODO
SSubmitReq
*
pSubmitReq
=
NULL
;
buildSubmitReqFromDataBlock
(
&
pSubmitReq
,
(
const
SArray
*
)
msg
,
NULL
,
pItem
->
pTSma
->
dstVgId
,
pItem
->
pTSma
->
dstTbUid
);
ASSERT
(
!
strncasecmp
(
"td.tsma.rst.tb"
,
pItem
->
pTSma
->
dstTbName
,
14
));
SRpcMsg
submitReqMsg
=
{
.
msgType
=
TDMT_VND_SUBMIT
,
.
pCont
=
pSubmitReq
,
.
contLen
=
ntohl
(
pSubmitReq
->
length
),
};
ASSERT
(
tmsgPutToQueue
(
&
pSma
->
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
submitReqMsg
)
==
0
);
tdUnRefSmaStat
(
pSma
,
pStat
);
...
...
source/dnode/vnode/src/sma/smaTimeRange2.c
已删除
100644 → 0
浏览文件 @
b6af6292
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
#include "tsdb.h"
typedef
STsdbCfg
STSmaKeepCfg
;
#undef _TEST_SMA_PRINT_DEBUG_LOG_
#define SMA_STORAGE_MINUTES_MAX 86400
#define SMA_STORAGE_MINUTES_DAY 1440
#define SMA_STORAGE_MINUTES_MIN 1440
#define SMA_STORAGE_TSDB_MINUTES 86400
#define SMA_STORAGE_TSDB_TIMES 10
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file TODO: the feasible value?
#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
#define SMA_STATE_ITEM_HASH_SLOT 32
// static func
/**
* @brief Judge the tsma file split days
*
* @param pCfg
* @param pCont
* @param contLen
* @param days unit is minute
* @return int32_t
*/
int32_t
tdProcessTSmaGetDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
)
{
SDecoder
coder
=
{
0
};
tDecoderInit
(
&
coder
,
pCont
,
contLen
);
STSma
tsma
=
{
0
};
if
(
tDecodeSVCreateTSmaReq
(
&
coder
,
&
tsma
)
<
0
)
{
terrno
=
TSDB_CODE_MSG_DECODE_ERROR
;
goto
_err
;
}
STsdbCfg
*
pTsdbCfg
=
&
pCfg
->
tsdbCfg
;
int64_t
sInterval
=
convertTimeFromPrecisionToUnit
(
tsma
.
interval
,
pTsdbCfg
->
precision
,
TIME_UNIT_SECOND
);
if
(
sInterval
<=
0
)
{
*
days
=
pTsdbCfg
->
days
;
return
0
;
}
int64_t
records
=
pTsdbCfg
->
days
*
60
/
sInterval
;
if
(
records
>=
SMA_STORAGE_SPLIT_FACTOR
)
{
*
days
=
pTsdbCfg
->
days
;
}
else
{
int64_t
mInterval
=
convertTimeFromPrecisionToUnit
(
tsma
.
interval
,
pTsdbCfg
->
precision
,
TIME_UNIT_MINUTE
);
int64_t
daysPerFile
=
mInterval
*
SMA_STORAGE_MINUTES_DAY
*
2
;
if
(
daysPerFile
>
SMA_STORAGE_MINUTES_MAX
)
{
*
days
=
SMA_STORAGE_MINUTES_MAX
;
}
else
{
*
days
=
(
int32_t
)
daysPerFile
;
}
if
(
*
days
<
pTsdbCfg
->
days
)
{
*
days
=
pTsdbCfg
->
days
;
}
}
tDecoderClear
(
&
coder
);
return
0
;
_err:
tDecoderClear
(
&
coder
);
return
-
1
;
}
// read data
// implementation
/**
* @brief Insert/Update Time-range-wise SMA data.
* - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g.
* v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files.
* - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The
* days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d).
* - The destination file of one data block for some interval is determined by its start TS key.
*
* @param pSma
* @param msg
* @return int32_t
*/
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
)
{
STsdbCfg
*
pCfg
=
SMA_TSDB_CFG
(
pSma
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
// TODO: destroy SSDataBlocks(msg)
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_TIME_RANGE
,
false
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
if
(
!
pDataBlocks
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
smaWarn
(
"vgId:%d, insert tsma data failed since pDataBlocks is NULL"
,
SMA_VID
(
pSma
));
return
terrno
;
}
if
(
taosArrayGetSize
(
pDataBlocks
)
<=
0
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
smaWarn
(
"vgId:%d, insert tsma data failed since pDataBlocks is empty"
,
SMA_VID
(
pSma
));
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
SMA_TSMA_ENV
(
pSma
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SSmaStatItem
*
pItem
=
NULL
;
tdRefSmaStat
(
pSma
,
pStat
);
if
(
pStat
&&
SMA_STAT_ITEMS
(
pStat
))
{
pItem
=
taosHashGet
(
SMA_STAT_ITEMS
(
pStat
),
&
indexUid
,
sizeof
(
indexUid
));
}
if
(
!
pItem
||
!
(
pItem
=
*
(
SSmaStatItem
**
)
pItem
)
||
tdSmaStatIsDropped
(
pItem
))
{
terrno
=
TSDB_CODE_TSMA_INVALID_STAT
;
tdUnRefSmaStat
(
pSma
,
pStat
);
return
TSDB_CODE_FAILED
;
}
STSma
*
pTSma
=
pItem
->
pTSma
;
tdUnRefSmaStat
(
pSma
,
pStat
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
tdProcessTSmaCreateImpl
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
pMsg
)
{
SSmaCfg
*
pCfg
=
(
SSmaCfg
*
)
pMsg
;
if
(
metaCreateTSma
(
SMA_META
(
pSma
),
version
,
pCfg
)
<
0
)
{
return
-
1
;
}
if
(
TD_VID
(
pSma
->
pVnode
)
==
pCfg
->
dstVgId
)
{
// create stable to save tsma result in dstVgId
SVCreateStbReq
pReq
=
{
0
};
pReq
.
name
=
pCfg
->
dstTbName
;
pReq
.
suid
=
pCfg
->
dstTbUid
;
pReq
.
schemaRow
=
pCfg
->
schemaRow
;
pReq
.
schemaTag
=
pCfg
->
schemaTag
;
if
(
metaCreateSTable
(
SMA_META
(
pSma
),
version
,
&
pReq
)
<
0
)
{
return
-
1
;
}
}
tdTSmaAdd
(
pSma
,
1
);
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/tq/tqSink.c
浏览文件 @
cb7d0c22
...
...
@@ -15,10 +15,7 @@
#include "tq.h"
static
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
);
static
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pTSchema
,
bool
createTb
,
int64_t
suid
,
SSubmitReq
*
tdBlockToSubmit
(
const
SArray
*
pBlocks
,
const
STSchema
*
pTSchema
,
bool
createTb
,
int64_t
suid
,
const
char
*
stbFullName
,
int32_t
vgId
)
{
SSubmitReq
*
ret
=
NULL
;
SArray
*
tagArray
=
taosArrayInit
(
1
,
sizeof
(
STagVal
));
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
cb7d0c22
...
...
@@ -284,7 +284,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
//
blockDebugShowData(data, __func__);
blockDebugShowData
(
data
,
__func__
);
tdProcessTSmaInsert
(((
SVnode
*
)
pVnode
)
->
pSma
,
smaId
,
(
const
char
*
)
data
);
}
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
cb7d0c22
...
...
@@ -121,7 +121,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// decode
STSmaWrapper
dstTSmaWrapper
=
{
0
};
void
*
result
=
tDecodeTSmaWrapper
(
pSW
,
&
dstTSmaWrapper
);
void
*
result
=
tDecodeTSmaWrapper
(
pSW
,
&
dstTSmaWrapper
,
false
);
EXPECT_NE
(
result
,
nullptr
);
EXPECT_EQ
(
tSmaWrapper
.
number
,
dstTSmaWrapper
.
number
);
...
...
source/util/src/terror.c
浏览文件 @
cb7d0c22
...
...
@@ -559,8 +559,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_ALREADY_EXIST, "Tsma already exists
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_NO_INDEX_IN_META
,
"No tsma index in meta"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_ENV
,
"Invalid tsma env"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_STAT
,
"Invalid tsma state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_PTR
,
"Invalid tsma pointer"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_PARA
,
"Invalid tsma parameters"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_NO_INDEX_IN_CACHE
,
"No tsma index in cache"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_RM_SKEY_IN_HASH
,
"Rm tsma skey in cache"
)
//rsma
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_ENV
,
"Invalid rsma env"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录