Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
192060eb
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
192060eb
编写于
4月 02, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-11463-3.0
上级
11653686
6561851b
变更
26
显示空白变更内容
内联
并排
Showing
26 changed file
with
1122 addition
and
678 deletion
+1122
-678
include/common/tcommon.h
include/common/tcommon.h
+13
-23
include/libs/function/function.h
include/libs/function/function.h
+6
-5
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+0
-1
include/util/tdef.h
include/util/tdef.h
+1
-1
source/dnode/mgmt/main/inc/dnd.h
source/dnode/mgmt/main/inc/dnd.h
+22
-13
source/dnode/mgmt/main/inc/dndInt.h
source/dnode/mgmt/main/inc/dndInt.h
+9
-11
source/dnode/mgmt/main/src/dndEnv.c
source/dnode/mgmt/main/src/dndEnv.c
+0
-37
source/dnode/mgmt/main/src/dndInt.c
source/dnode/mgmt/main/src/dndInt.c
+15
-1
source/dnode/mgmt/main/src/dndMsg.c
source/dnode/mgmt/main/src/dndMsg.c
+26
-3
source/dnode/vnode/inc/tsdb.h
source/dnode/vnode/inc/tsdb.h
+2
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+51
-41
source/libs/command/src/command.c
source/libs/command/src/command.c
+11
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+13
-10
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+96
-438
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+5
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+4
-3
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+215
-11
source/libs/function/src/taggfunction.c
source/libs/function/src/taggfunction.c
+8
-28
source/libs/index/src/indexFst.c
source/libs/index/src/indexFst.c
+1
-2
source/libs/index/test/fstTest.cc
source/libs/index/test/fstTest.cc
+68
-5
source/libs/parser/inc/parInsertData.h
source/libs/parser/inc/parInsertData.h
+4
-3
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+12
-7
source/libs/parser/src/parInsertData.c
source/libs/parser/src/parInsertData.c
+40
-31
source/libs/parser/test/parserInsertTest.cpp
source/libs/parser/test/parserInsertTest.cpp
+13
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/db/create_all_options.sim
tests/script/tsim/db/create_all_options.sim
+486
-0
未找到文件。
include/common/tcommon.h
浏览文件 @
192060eb
...
...
@@ -193,20 +193,19 @@ typedef struct SColumn {
uint8_t
scale
;
}
SColumn
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
SOrder
{
uint32_t
order
;
SColumn
col
;
}
SOrder
;
typedef
struct
SGroupbyExpr
{
SArray
*
columnInfo
;
// SArray<SColIndex>, group by columns information
bool
groupbyTag
;
// group by tag or column
}
SGroupbyExpr
;
typedef
struct
STableBlockDistInfo
{
uint16_t
rowSize
;
uint16_t
numOfFiles
;
uint32_t
numOfTables
;
uint64_t
totalSize
;
uint64_t
totalRows
;
int32_t
maxRows
;
int32_t
minRows
;
int32_t
firstSeekTimeUs
;
uint32_t
numOfRowsInMemTable
;
uint32_t
numOfSmallBlocks
;
SArray
*
dataBlockInfos
;
}
STableBlockDistInfo
;
enum
{
FUNC_PARAM_TYPE_VALUE
=
0x1
,
...
...
@@ -241,15 +240,6 @@ typedef struct SExprInfo {
struct
tExprNode
*
pExpr
;
}
SExprInfo
;
typedef
struct
SStateWindow
{
SColumn
col
;
}
SStateWindow
;
typedef
struct
SSessionWindow
{
int64_t
gap
;
// gap between two session window(in microseconds)
SColumn
col
;
}
SSessionWindow
;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
...
...
include/libs/function/function.h
浏览文件 @
192060eb
...
...
@@ -52,6 +52,11 @@ typedef struct SFuncExecFuncs {
FExecFinalize
finalize
;
}
SFuncExecFuncs
;
typedef
struct
SFileBlockInfo
{
int32_t
numBlocksOfStep
;
}
SFileBlockInfo
;
#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_TYPE_SCALAR 1
...
...
@@ -101,10 +106,6 @@ typedef struct SFuncExecFuncs {
#define FUNCTION_DERIVATIVE 32
#define FUNCTION_BLKINFO 33
#define FUNCTION_HISTOGRAM 34
#define FUNCTION_HLL 35
#define FUNCTION_MODE 36
#define FUNCTION_SAMPLE 37
#define FUNCTION_COV 38
...
...
include/libs/nodes/querynodes.h
浏览文件 @
192060eb
...
...
@@ -277,7 +277,6 @@ typedef struct SVnodeModifOpStmt {
ENodeType
nodeType
;
ENodeType
sqlNodeType
;
SArray
*
pDataBlocks
;
// data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t
schemaAttache
;
// denote if submit block is built with table schema or not
uint8_t
payloadType
;
// EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t
insertType
;
// insert data from [file|sql statement| bound statement]
const
char
*
sql
;
// current sql statement position
...
...
include/util/tdef.h
浏览文件 @
192060eb
...
...
@@ -307,7 +307,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_TOTAL_BLOCKS 10000
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
#define TSDB_MIN_DAYS_PER_FILE
(1 * 1440)
// unit minute
#define TSDB_MIN_DAYS_PER_FILE
60
// unit minute
#define TSDB_MAX_DAYS_PER_FILE (3650 * 1440)
#define TSDB_DEFAULT_DAYS_PER_FILE (10 * 1440)
...
...
source/dnode/mgmt/main/inc/dnd.h
浏览文件 @
192060eb
...
...
@@ -134,33 +134,42 @@ typedef struct SDnode {
SMgmtWrapper
wrappers
[
NODE_MAX
];
}
SDnode
;
const
char
*
dndNodeLogStr
(
ENodeType
ntype
);
const
char
*
dndNodeProcStr
(
ENodeType
ntype
);
const
char
*
dndEventStr
(
EDndEvent
ev
);
// dndFile.h
int32_t
dndReadFile
(
SMgmtWrapper
*
pWrapper
,
bool
*
pDeployed
);
int32_t
dndWriteFile
(
SMgmtWrapper
*
pWrapper
,
bool
deployed
);
// dndInt.h
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
);
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
stat
);
void
dndSetMsgHandle
(
SMgmtWrapper
*
pWrapper
,
tmsg_t
msgType
,
NodeMsgFp
nodeMsgFp
,
int8_t
vgId
);
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
ENodeType
nodeType
);
int32_t
dndMarkWrapper
(
SMgmtWrapper
*
pWrapper
);
void
dndReleaseWrapper
(
SMgmtWrapper
*
pWrapper
);
// dndMonitor.h
void
dndSendMonitorReport
(
SDnode
*
pDnode
);
// dndMsg.h
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
);
int32_t
dndProcessNodeMsg
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
);
// dndStr.h
const
char
*
dndStatStr
(
EDndStatus
stat
);
const
char
*
dndNodeLogStr
(
ENodeType
ntype
);
const
char
*
dndNodeProcStr
(
ENodeType
ntype
);
const
char
*
dndEventStr
(
EDndEvent
ev
);
// dndTransport.h
int32_t
dndInitServer
(
SDnode
*
pDnode
);
void
dndCleanupServer
(
SDnode
*
pDnode
);
int32_t
dndInitClient
(
SDnode
*
pDnode
);
void
dndCleanupClient
(
SDnode
*
pDnode
);
int32_t
dndProcessNodeMsg
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
);
int32_t
dndSendReqToMnode
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
int32_t
dndSendReq
(
SMgmtWrapper
*
pWrapper
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
dndSendRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
void
dndRegisterBrokenLinkArg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
);
SMsgCb
dndCreateMsgcb
(
SMgmtWrapper
*
pWrapper
);
int32_t
dndReadFile
(
SMgmtWrapper
*
pWrapper
,
bool
*
pDeployed
);
int32_t
dndWriteFile
(
SMgmtWrapper
*
pWrapper
,
bool
deployed
);
SMgmtWrapper
*
dndAcquireWrapper
(
SDnode
*
pDnode
,
ENodeType
nodeType
);
int32_t
dndMarkWrapper
(
SMgmtWrapper
*
pWrapper
);
void
dndReleaseWrapper
(
SMgmtWrapper
*
pWrapper
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mgmt/main/inc/dndInt.h
浏览文件 @
192060eb
...
...
@@ -29,26 +29,24 @@
extern
"C"
{
#endif
// dnd
Int.c
// dnd
Env.h
int32_t
dndInit
();
void
dndCleanup
();
const
char
*
dndStatStr
(
EDndStatus
stat
);
void
dndGetStartup
(
SDnode
*
pDnode
,
SStartupReq
*
pStartup
);
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
// dndMsg.c
void
dndProcessRpcMsg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// dndExec.
c
// dndExec.
h
int32_t
dndOpenNode
(
SMgmtWrapper
*
pWrapper
);
void
dndCloseNode
(
SMgmtWrapper
*
pWrapper
);
int32_t
dndRun
(
SDnode
*
pDnode
);
// dnd
Obj
.c
// dnd
Int
.c
SDnode
*
dndCreate
(
const
SDnodeOpt
*
pOption
);
void
dndClose
(
SDnode
*
pDnode
);
void
dndHandleEvent
(
SDnode
*
pDnode
,
EDndEvent
event
);
// dndMsg.c
void
dndProcessRpcMsg
(
SMgmtWrapper
*
pWrapper
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
);
// dndTransport.c
int32_t
dndInitMsgHandle
(
SDnode
*
pDnode
);
void
dndSendRpcRsp
(
SMgmtWrapper
*
pWrapper
,
const
SRpcMsg
*
pRsp
);
...
...
source/dnode/mgmt/main/src/dndEnv.c
浏览文件 @
192060eb
...
...
@@ -57,40 +57,3 @@ void dndCleanup() {
taosStopCacheRefreshWorker
();
dInfo
(
"dnode env is cleaned up"
);
}
void
dndSetMsgHandle
(
SMgmtWrapper
*
pWrapper
,
tmsg_t
msgType
,
NodeMsgFp
nodeMsgFp
,
int8_t
vgId
)
{
pWrapper
->
msgFps
[
TMSG_INDEX
(
msgType
)]
=
nodeMsgFp
;
pWrapper
->
msgVgIds
[
TMSG_INDEX
(
msgType
)]
=
vgId
;
}
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
)
{
return
pDnode
->
status
;
}
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
status
)
{
if
(
pDnode
->
status
!=
status
)
{
dDebug
(
"dnode status set from %s to %s"
,
dndStatStr
(
pDnode
->
status
),
dndStatStr
(
status
));
pDnode
->
status
=
status
;
}
}
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
)
{
SStartupReq
*
pStartup
=
&
pDnode
->
startup
;
tstrncpy
(
pStartup
->
name
,
pName
,
TSDB_STEP_NAME_LEN
);
tstrncpy
(
pStartup
->
desc
,
pDesc
,
TSDB_STEP_DESC_LEN
);
pStartup
->
finished
=
0
;
}
void
dndGetStartup
(
SDnode
*
pDnode
,
SStartupReq
*
pStartup
)
{
memcpy
(
pStartup
,
&
pDnode
->
startup
,
sizeof
(
SStartupReq
));
pStartup
->
finished
=
(
dndGetStatus
(
pDnode
)
==
DND_STAT_RUNNING
);
}
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
dDebug
(
"startup req is received"
);
SStartupReq
*
pStartup
=
rpcMallocCont
(
sizeof
(
SStartupReq
));
dndGetStartup
(
pDnode
,
pStartup
);
dDebug
(
"startup req is sent, step:%s desc:%s finished:%d"
,
pStartup
->
name
,
pStartup
->
desc
,
pStartup
->
finished
);
SRpcMsg
rpcRsp
=
{
.
handle
=
pReq
->
handle
,
.
pCont
=
pStartup
,
.
contLen
=
sizeof
(
SStartupReq
),
.
ahandle
=
pReq
->
ahandle
};
rpcSendResponse
(
&
rpcRsp
);
}
source/dnode/mgmt/main/src/dndInt.c
浏览文件 @
192060eb
...
...
@@ -189,3 +189,17 @@ void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
taosRUnLockLatch
(
&
pWrapper
->
latch
);
dTrace
(
"node:%s, is released, refCount:%d"
,
pWrapper
->
name
,
refCount
);
}
void
dndSetMsgHandle
(
SMgmtWrapper
*
pWrapper
,
tmsg_t
msgType
,
NodeMsgFp
nodeMsgFp
,
int8_t
vgId
)
{
pWrapper
->
msgFps
[
TMSG_INDEX
(
msgType
)]
=
nodeMsgFp
;
pWrapper
->
msgVgIds
[
TMSG_INDEX
(
msgType
)]
=
vgId
;
}
EDndStatus
dndGetStatus
(
SDnode
*
pDnode
)
{
return
pDnode
->
status
;
}
void
dndSetStatus
(
SDnode
*
pDnode
,
EDndStatus
status
)
{
if
(
pDnode
->
status
!=
status
)
{
dDebug
(
"dnode status set from %s to %s"
,
dndStatStr
(
pDnode
->
status
),
dndStatStr
(
status
));
pDnode
->
status
=
status
;
}
}
source/dnode/mgmt/main/src/dndMsg.c
浏览文件 @
192060eb
...
...
@@ -66,8 +66,8 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
dTrace
(
"msg:%p, is created, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
code
=
(
*
msgFp
)(
pWrapper
,
pMsg
);
}
else
if
(
pWrapper
->
procType
==
PROC_PARENT
)
{
dTrace
(
"msg:%p, is created and put into child queue, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
p
Rpc
->
ahandle
,
p
Msg
->
user
);
dTrace
(
"msg:%p, is created and put into child queue, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
pMsg
->
user
);
code
=
taosProcPutToChildQ
(
pWrapper
->
pProc
,
pMsg
,
sizeof
(
SNodeMsg
),
pRpc
->
pCont
,
pRpc
->
contLen
,
PROC_REQ
);
}
else
{
dTrace
(
"msg:%p, should not processed in child process, handle:%p app:%p user:%s"
,
pMsg
,
pRpc
->
handle
,
pRpc
->
ahandle
,
...
...
@@ -172,3 +172,26 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
return
-
1
;
}
}
void
dndReportStartup
(
SDnode
*
pDnode
,
const
char
*
pName
,
const
char
*
pDesc
)
{
SStartupReq
*
pStartup
=
&
pDnode
->
startup
;
tstrncpy
(
pStartup
->
name
,
pName
,
TSDB_STEP_NAME_LEN
);
tstrncpy
(
pStartup
->
desc
,
pDesc
,
TSDB_STEP_DESC_LEN
);
pStartup
->
finished
=
0
;
}
void
dndGetStartup
(
SDnode
*
pDnode
,
SStartupReq
*
pStartup
)
{
memcpy
(
pStartup
,
&
pDnode
->
startup
,
sizeof
(
SStartupReq
));
pStartup
->
finished
=
(
dndGetStatus
(
pDnode
)
==
DND_STAT_RUNNING
);
}
void
dndProcessStartupReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
dDebug
(
"startup req is received"
);
SStartupReq
*
pStartup
=
rpcMallocCont
(
sizeof
(
SStartupReq
));
dndGetStartup
(
pDnode
,
pStartup
);
dDebug
(
"startup req is sent, step:%s desc:%s finished:%d"
,
pStartup
->
name
,
pStartup
->
desc
,
pStartup
->
finished
);
SRpcMsg
rpcRsp
=
{
.
handle
=
pReq
->
handle
,
.
pCont
=
pStartup
,
.
contLen
=
sizeof
(
SStartupReq
),
.
ahandle
=
pReq
->
ahandle
};
rpcSendResponse
(
&
rpcRsp
);
}
source/dnode/vnode/inc/tsdb.h
浏览文件 @
192060eb
...
...
@@ -172,6 +172,8 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
void
*
pMemRef
);
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
queryHandle
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pTsdbReadHandle
);
/**
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
192060eb
...
...
@@ -14,7 +14,7 @@
*/
#include "tsdbDef.h"
#include
<tdatablock.h>
#include
"tdatablock.h"
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
...
...
@@ -31,6 +31,7 @@
#include "tlosertree.h"
#include "tsdbDef.h"
#include "tmsg.h"
#include "tsdbCommit.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...
...
@@ -209,34 +210,34 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
return
pLocalIdList
;
}
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
//
// int64_t rows = 0;
// STsdbMemTable* pMemTable = pTsdbReadHandle->pMemTable;
// if (pMemTable == NULL) { return rows; }
//
//// STableData* pMem = NULL;
//// STableData* pIMem = NULL;
//
//// SMemTable* pMemT = pMemRef->snapshot.mem;
//// SMemTable* pIMemT = pMemRef->snapshot.imem;
//
// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) {
// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
//
//// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
//// pMem = pMemT->tData[pCheckInfo->tableId];
//// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
//// }
//// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
//// pIMem = pIMemT->tData[pCheckInfo->tableId];
//// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
//// }
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
pHandle
;
int64_t
rows
=
0
;
STsdbMemTable
*
pMemTable
=
NULL
;
//pTsdbReadHandle->pMemTable;
if
(
pMemTable
==
NULL
)
{
return
rows
;
}
// STableData* pMem = NULL;
// STableData* pIMem = NULL;
// SMemTable* pMemT = pMemRef->snapshot.mem;
// SMemTable* pIMemT = pMemRef->snapshot.imem;
size_t
size
=
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
STableCheckInfo
*
pCheckInfo
=
taosArrayGet
(
pTsdbReadHandle
->
pTableCheckInfo
,
i
);
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// return rows;
//}
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
}
return
rows
;
}
static
SArray
*
createCheckInfoFromTableGroup
(
STsdbReadHandle
*
pTsdbReadHandle
,
STableGroupInfo
*
pGroupList
)
{
size_t
numOfGroup
=
taosArrayGetSize
(
pGroupList
->
pGroupList
);
...
...
@@ -2261,12 +2262,13 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
cur
->
mixBlock
=
false
;
cur
->
blockCompleted
=
false
;
}
#if 0
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTableBlockInfo) {
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
queryHandle
,
STableBlockDist
Info
*
pTableBlockInfo
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
(
STsdbReadHandle
*
)
queryHandle
;
pTableBlockInfo
->
totalSize
=
0
;
pTableBlockInfo
->
totalRows
=
0
;
STsdbFS
*
pFileHandle
=
REPO_FS
(
pTsdbReadHandle
->
pTsdb
);
// find the start data block in file
...
...
@@ -2284,9 +2286,11 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
numOfBlocks
=
0
;
int32_t
numOfTables
=
(
int32_t
)
taosArrayGetSize
(
pTsdbReadHandle
->
pTableCheckInfo
);
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
int
defaultRows
=
4096
;
//
TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
STimeWindow
win
=
TSWINDOW_INITIALIZER
;
bool
ascTraverse
=
ASCENDING_TRAVERSE
(
pTsdbReadHandle
->
order
);
while
(
true
)
{
numOfBlocks
=
0
;
tsdbRLockFS
(
REPO_FS
(
pTsdbReadHandle
->
pTsdb
));
...
...
@@ -2299,8 +2303,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
pTsdbReadHandle
->
pFileGroup
->
fid
,
&
win
.
skey
,
&
win
.
ekey
);
// current file are not overlapped with query time window, ignore remain files
if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) ||
(!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) {
if
((
ascTraverse
&&
win
.
skey
>
pTsdbReadHandle
->
window
.
ekey
)
||
(
!
ascTraverse
&&
win
.
ekey
<
pTsdbReadHandle
->
window
.
ekey
))
{
tsdbUnLockFS
(
REPO_FS
(
pTsdbReadHandle
->
pTsdb
));
tsdbDebug
(
"%p remain files are not qualified for qrange:%"
PRId64
"-%"
PRId64
", ignore, %s"
,
pTsdbReadHandle
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
...
...
@@ -2342,19 +2345,26 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
int32_t
numOfRows
=
pBlock
[
j
].
numOfRows
;
pTableBlockInfo
->
totalRows
+=
numOfRows
;
if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows;
if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows;
if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1;
int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
blockInfo->numBlocksOfStep++;
if
(
numOfRows
>
pTableBlockInfo
->
maxRows
)
{
pTableBlockInfo
->
maxRows
=
numOfRows
;
}
if
(
numOfRows
<
pTableBlockInfo
->
minRows
)
{
pTableBlockInfo
->
minRows
=
numOfRows
;
}
if
(
numOfRows
<
defaultRows
)
{
pTableBlockInfo
->
numOfSmallBlocks
+=
1
;
}
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
// blockInfo->numBlocksOfStep++;
}
}
}
return
code
;
}
#endif
static
int32_t
getDataBlocksInFiles
(
STsdbReadHandle
*
pTsdbReadHandle
,
bool
*
exists
)
{
STsdbFS
*
pFileHandle
=
REPO_FS
(
pTsdbReadHandle
->
pTsdb
);
...
...
source/libs/command/src/command.c
浏览文件 @
192060eb
...
...
@@ -16,7 +16,16 @@
#include "command.h"
#include "tdatablock.h"
// #define SET_VARSTR(pData, val, pOffset)
static
int32_t
getSchemaBytes
(
const
SSchema
*
pSchema
)
{
switch
(
pSchema
->
type
)
{
case
TSDB_DATA_TYPE_BINARY
:
return
(
pSchema
->
bytes
-
VARSTR_HEADER_SIZE
);
case
TSDB_DATA_TYPE_NCHAR
:
return
(
pSchema
->
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
default:
return
pSchema
->
bytes
;
}
}
static
void
buildRspData
(
const
STableMeta
*
pMeta
,
char
*
pData
)
{
int32_t
*
pColSizes
=
(
int32_t
*
)
pData
;
...
...
@@ -50,7 +59,7 @@ static void buildRspData(const STableMeta* pMeta, char* pData) {
// Length
pData
+=
BitmapLen
(
numOfRows
);
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
++
i
)
{
*
(
int32_t
*
)
pData
=
pMeta
->
schema
[
i
].
bytes
;
*
(
int32_t
*
)
pData
=
getSchemaBytes
(
pMeta
->
schema
+
i
)
;
pData
+=
sizeof
(
int32_t
);
}
pColSizes
[
2
]
=
sizeof
(
int32_t
)
*
numOfRows
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
192060eb
...
...
@@ -128,6 +128,11 @@ typedef struct {
int64_t
sumRunTimes
;
}
SOperatorProfResult
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
...
...
@@ -163,6 +168,11 @@ typedef struct SOperatorCostInfo {
uint64_t
execCost
;
}
SOperatorCostInfo
;
typedef
struct
SOrder
{
uint32_t
order
;
SColumn
col
;
}
SOrder
;
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef
struct
STaskAttr
{
...
...
@@ -196,7 +206,6 @@ typedef struct STaskAttr {
STimeWindow
window
;
SInterval
interval
;
SSessionWindow
sw
;
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
...
...
@@ -206,13 +215,8 @@ typedef struct STaskAttr {
int32_t
intermediateResultRowSize
;
// intermediate result row size, in case of top-k query.
int32_t
maxTableColumnWidth
;
int32_t
tagLen
;
// tag value length of current query
SGroupbyExpr
*
pGroupbyExpr
;
SExprInfo
*
pExpr1
;
SExprInfo
*
pExpr2
;
int32_t
numOfExpr2
;
SExprInfo
*
pExpr3
;
int32_t
numOfExpr3
;
SColumnInfo
*
tableCols
;
SColumnInfo
*
tagColList
;
...
...
@@ -220,8 +224,6 @@ typedef struct STaskAttr {
int64_t
*
fillVal
;
SSingleColumnFilterInfo
*
pFilterInfo
;
// SFilterInfo *pFilters;
void
*
tsdb
;
STableGroupInfo
tableGroupInfo
;
// table <tid, last_key> list SArray<STableKeyInfo>
int32_t
vgId
;
...
...
@@ -384,7 +386,7 @@ typedef struct SExchangeInfo {
}
SExchangeInfo
;
typedef
struct
STableScanInfo
{
void
*
pTsdbReadHandle
;
void
*
dataReader
;
int32_t
numOfBlocks
;
// extract basic running information.
int32_t
numOfSkipped
;
int32_t
numOfBlockStatis
;
...
...
@@ -644,12 +646,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDistinctOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
);
SOperatorInfo
*
createTableBlockInfoScanOperator
(
void
*
pTsdbReadHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
pTsdbReadHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
);
SOperatorInfo
*
createAllTimeIntervalOperatorInfo
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SOperatorInfo
*
downstream
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
192060eb
...
...
@@ -249,7 +249,6 @@ static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static
int32_t
doCopyToSDataBlock
(
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
orderType
,
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
,
int32_t
*
rowCellOffset
);
static
int32_t
getGroupbyColumnIndex
(
SGroupbyExpr
*
pGroupbyExpr
,
SSDataBlock
*
pDataBlock
);
static
int32_t
setGroupResultOutputBuf_rv
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SExecTaskInfo
*
pTaskInfo
,
SAggSupporter
*
pAggSup
);
...
...
@@ -287,44 +286,6 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
return
(
in1
!=
NULL
&&
in2
!=
NULL
)
?
supporter
->
comFunc
(
in1
,
in2
)
:
0
;
}
static
void
sortGroupResByOrderList
(
SGroupResInfo
*
pGroupResInfo
,
STaskRuntimeEnv
*
pRuntimeEnv
,
SSDataBlock
*
pDataBlock
)
{
SArray
*
columnOrderList
=
getOrderCheckColumns
(
pRuntimeEnv
->
pQueryAttr
);
size_t
size
=
taosArrayGetSize
(
columnOrderList
);
taosArrayDestroy
(
columnOrderList
);
if
(
size
<=
0
)
{
return
;
}
int32_t
orderId
=
pRuntimeEnv
->
pQueryAttr
->
order
.
col
.
colId
;
if
(
orderId
<=
0
)
{
return
;
}
bool
found
=
false
;
int16_t
dataOffset
=
0
;
for
(
int32_t
j
=
0
;
j
<
pDataBlock
->
info
.
numOfCols
;
++
j
)
{
SColumnInfoData
*
pColInfoData
=
(
SColumnInfoData
*
)
taosArrayGet
(
pDataBlock
->
pDataBlock
,
j
);
if
(
orderId
==
j
)
{
found
=
true
;
break
;
}
dataOffset
+=
pColInfoData
->
info
.
bytes
;
}
if
(
found
==
false
)
{
return
;
}
int16_t
type
=
pRuntimeEnv
->
pQueryAttr
->
pExpr1
[
orderId
].
base
.
resSchema
.
type
;
SRowCompSupporter
support
=
{.
pRuntimeEnv
=
pRuntimeEnv
,
.
dataOffset
=
dataOffset
,
.
comFunc
=
getComparFunc
(
type
,
0
)};
taosArraySortPWithExt
(
pGroupResInfo
->
pRows
,
compareRowData
,
&
support
);
}
// setup the output buffer for each operator
SSDataBlock
*
createOutputBuf_rv1
(
SDataBlockDescNode
*
pNode
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pNode
->
pSlots
);
...
...
@@ -375,17 +336,6 @@ static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput
// return (numOfSelectivity > 0 && hasTags);
}
static
bool
isProjQuery
(
STaskAttr
*
pQueryAttr
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
int32_t
functId
=
getExprFunctionId
(
&
pQueryAttr
->
pExpr1
[
i
]);
if
(
functId
!=
FUNCTION_PRJ
&&
functId
!=
FUNCTION_TAGPRJ
)
{
return
false
;
}
}
return
true
;
}
static
bool
hasNull
(
SColumn
*
pColumn
,
SColumnDataAgg
*
pStatis
)
{
if
(
TSDB_COL_IS_TAG
(
pColumn
->
flag
)
||
TSDB_COL_IS_UD_COL
(
pColumn
->
flag
)
||
pColumn
->
colId
==
PRIMARYKEY_TIMESTAMP_COL_ID
)
{
...
...
@@ -1411,15 +1361,10 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
}
static
bool
setTimeWindowInterpolationStartTs
(
SOperatorInfo
*
pOperatorInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
pos
,
int32_t
numOfRows
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
)
{
STaskRuntimeEnv
*
pRuntimeEnv
=
pOperatorInfo
->
pRuntimeEnv
;
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pQueryAttr
);
int32_t
numOfRows
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
)
{
bool
ascQuery
=
true
;
TSKEY
curTs
=
tsCols
[
pos
];
TSKEY
lastTs
=
*
(
TSKEY
*
)
pRuntimeEnv
->
prevRow
[
0
];
TSKEY
lastTs
=
0
;
//
*(TSKEY*)pRuntimeEnv->prevRow[0];
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
// start exactly from this point, no need to do interpolation
...
...
@@ -1434,27 +1379,24 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF
return
true
;
}
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQueryAttr
->
order
.
order
);
int32_t
step
=
1
;
//
GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
TSKEY
prevTs
=
((
pos
==
0
&&
ascQuery
)
||
(
pos
==
(
numOfRows
-
1
)
&&
!
ascQuery
))
?
lastTs
:
tsCols
[
pos
-
step
];
doTimeWindowInterpolation
(
pOperatorInfo
,
pOperatorInfo
->
info
,
pDataBlock
,
prevTs
,
pos
-
step
,
curTs
,
pos
,
key
,
RESULT_ROW_START_INTERP
);
doTimeWindowInterpolation
(
pOperatorInfo
,
pOperatorInfo
->
info
,
pDataBlock
,
prevTs
,
pos
-
step
,
curTs
,
pos
,
key
,
RESULT_ROW_START_INTERP
);
return
true
;
}
static
bool
setTimeWindowInterpolationEndTs
(
SOperatorInfo
*
pOperatorInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
endRowIndex
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
TSKEY
blockEkey
,
STimeWindow
*
win
)
{
STaskRuntimeEnv
*
pRuntimeEnv
=
pOperatorInfo
->
pRuntimeEnv
;
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
numOfOutput
=
pOperatorInfo
->
numOfOutput
;
TSKEY
actualEndKey
=
tsCols
[
endRowIndex
];
TSKEY
key
=
QUERY_IS_ASC_QUERY
(
pQueryAttr
)
?
win
->
ekey
:
win
->
skey
;
TSKEY
key
=
order
?
win
->
ekey
:
win
->
skey
;
// not ended in current data block, do not invoke interpolation
if
((
key
>
blockEkey
&&
QUERY_IS_ASC_QUERY
(
pQueryAttr
))
||
(
key
<
blockEkey
&&
!
QUERY_IS_ASC_QUERY
(
pQueryAttr
)
))
{
if
((
key
>
blockEkey
/*&& QUERY_IS_ASC_QUERY(pQueryAttr)*/
)
||
(
key
<
blockEkey
/*&& !QUERY_IS_ASC_QUERY(pQueryAttr)*/
))
{
setNotInterpoWindowKey
(
pCtx
,
numOfOutput
,
RESULT_ROW_END_INTERP
);
return
false
;
}
...
...
@@ -1465,7 +1407,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
return
true
;
}
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQueryAttr
->
order
.
order
);
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
order
);
int32_t
nextRowIndex
=
endRowIndex
+
step
;
assert
(
nextRowIndex
>=
0
);
...
...
@@ -1667,10 +1609,9 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
STaskRuntimeEnv
*
pRuntimeEnv
=
pOperatorInfo
->
pRuntimeEnv
;
int32_t
numOfOutput
=
pOperatorInfo
->
numOfOutput
;
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
step
=
GET_FORWARD_DIRECTION_FACTOR
(
pQueryAttr
->
order
.
order
);
bool
ascQuery
=
QUERY_IS_ASC_QUERY
(
pQueryAttr
)
;
int32_t
step
=
1
;
//
GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool
ascQuery
=
true
;
TSKEY
*
tsCols
=
NULL
;
if
(
pSDataBlock
->
pDataBlock
!=
NULL
)
{
...
...
@@ -1683,7 +1624,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
int32_t
startPos
=
ascQuery
?
0
:
(
pSDataBlock
->
info
.
rows
-
1
);
TSKEY
ts
=
getStartTsKey
(
&
pSDataBlock
->
info
.
window
,
tsCols
,
pSDataBlock
->
info
.
rows
,
ascQuery
);
STimeWindow
win
=
getCurrentActiveTimeWindow
(
pResultRowInfo
,
ts
,
pQueryAttr
);
STimeWindow
win
=
{
0
};
//
getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool
masterScan
=
IS_MAIN_SCAN
(
pRuntimeEnv
);
SResultRow
*
pResult
=
NULL
;
...
...
@@ -1699,7 +1640,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
TSKEY
ekey
=
reviseWindowEkey
(
pQueryAttr
,
&
win
);
TSKEY
ekey
=
0
;
//
reviseWindowEkey(pQueryAttr, &win);
// forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey,
// binarySearchForKey, true);
...
...
@@ -1713,31 +1654,31 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey,
// prevEndPos);
if
(
startPos
<
0
)
{
if
((
ascQuery
&&
win
.
skey
<=
pQueryAttr
->
window
.
ekey
)
||
((
!
ascQuery
)
&&
win
.
ekey
>=
pQueryAttr
->
window
.
ekey
))
{
int32_t
code
=
setResultOutputBufByKey
(
pRuntimeEnv
,
pResultRowInfo
,
pSDataBlock
->
info
.
uid
,
&
win
,
masterScan
,
&
pResult
,
tableGroupId
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
longjmp
(
pRuntimeEnv
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
startPos
=
pSDataBlock
->
info
.
rows
-
1
;
//
if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
//
int32_t code =
//
setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
//
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
//
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
//
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
//
}
//
//
startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
//
}
break
;
}
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
}
if
(
pQueryAttr
->
timeWindowInterpo
)
{
int32_t
rowIndex
=
ascQuery
?
(
pSDataBlock
->
info
.
rows
-
1
)
:
0
;
//
if (pQueryAttr->timeWindowInterpo) {
//
int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
// saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
//
}
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
...
...
@@ -2023,28 +1964,6 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
getGroupbyColumnIndex
(
SGroupbyExpr
*
pGroupbyExpr
,
SSDataBlock
*
pDataBlock
)
{
size_t
num
=
taosArrayGetSize
(
pGroupbyExpr
->
columnInfo
);
for
(
int32_t
k
=
0
;
k
<
num
;
++
k
)
{
SColIndex
*
pColIndex
=
taosArrayGet
(
pGroupbyExpr
->
columnInfo
,
k
);
if
(
TSDB_COL_IS_TAG
(
pColIndex
->
flag
))
{
continue
;
}
int32_t
colId
=
pColIndex
->
colId
;
for
(
int32_t
i
=
0
;
i
<
pDataBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
i
);
if
(
pColInfo
->
info
.
colId
==
colId
)
{
return
i
;
}
}
}
assert
(
0
);
return
-
1
;
}
static
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
)
{
struct
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
...
...
@@ -2264,64 +2183,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return
NULL
;
}
static
int32_t
setupQueryRuntimeEnv
(
STaskRuntimeEnv
*
pRuntimeEnv
,
int32_t
numOfTables
,
SArray
*
pOperator
,
void
*
merger
)
{
// qDebug("QInfo:0x%"PRIx64" setup runtime env", GET_TASKID(pRuntimeEnv));
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
pRuntimeEnv
->
prevGroupId
=
INT32_MIN
;
pRuntimeEnv
->
pQueryAttr
=
pQueryAttr
;
pRuntimeEnv
->
pResultRowHashTable
=
taosHashInit
(
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_NO_LOCK
);
pRuntimeEnv
->
pResultRowListSet
=
taosHashInit
(
numOfTables
*
10
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
pRuntimeEnv
->
keyBuf
=
taosMemoryMalloc
(
pQueryAttr
->
maxTableColumnWidth
+
sizeof
(
int64_t
)
+
POINTER_BYTES
);
// pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv
->
pResultRowArrayList
=
taosArrayInit
(
numOfTables
,
sizeof
(
SResultRowCell
));
pRuntimeEnv
->
prevRow
=
taosMemoryMalloc
(
POINTER_BYTES
*
pQueryAttr
->
numOfCols
+
pQueryAttr
->
srcRowSize
);
pRuntimeEnv
->
tagVal
=
taosMemoryMalloc
(
pQueryAttr
->
tagLen
);
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pRuntimeEnv
->
pTableRetrieveTsMap
=
taosHashInit
(
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
// pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput);
if
(
pRuntimeEnv
->
scalarSup
==
NULL
||
pRuntimeEnv
->
pResultRowHashTable
==
NULL
||
pRuntimeEnv
->
keyBuf
==
NULL
||
pRuntimeEnv
->
prevRow
==
NULL
||
pRuntimeEnv
->
tagVal
==
NULL
)
{
goto
_clean
;
}
if
(
pQueryAttr
->
numOfCols
)
{
char
*
start
=
POINTER_BYTES
*
pQueryAttr
->
numOfCols
+
(
char
*
)
pRuntimeEnv
->
prevRow
;
pRuntimeEnv
->
prevRow
[
0
]
=
start
;
for
(
int32_t
i
=
1
;
i
<
pQueryAttr
->
numOfCols
;
++
i
)
{
pRuntimeEnv
->
prevRow
[
i
]
=
pRuntimeEnv
->
prevRow
[
i
-
1
]
+
pQueryAttr
->
tableCols
[
i
-
1
].
bytes
;
}
if
(
pQueryAttr
->
tableCols
[
0
].
type
==
TSDB_DATA_TYPE_TIMESTAMP
)
{
*
(
int64_t
*
)
pRuntimeEnv
->
prevRow
[
0
]
=
INT64_MIN
;
}
}
// qDebug("QInfo:0x%"PRIx64" init runtime environment completed", GET_TASKID(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
// interval (down sampling operation)
return
TSDB_CODE_SUCCESS
;
_clean:
// destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput);
taosMemoryFreeClear
(
pRuntimeEnv
->
pResultRowHashTable
);
taosMemoryFreeClear
(
pRuntimeEnv
->
keyBuf
);
taosMemoryFreeClear
(
pRuntimeEnv
->
prevRow
);
taosMemoryFreeClear
(
pRuntimeEnv
->
tagVal
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
static
void
doFreeQueryHandle
(
STaskRuntimeEnv
*
pRuntimeEnv
)
{
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
...
...
@@ -2400,17 +2261,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q
// return false;
//}
static
bool
isFirstLastRowQuery
(
STaskAttr
*
pQueryAttr
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
int32_t
functionID
=
getExprFunctionId
(
&
pQueryAttr
->
pExpr1
[
i
]);
if
(
functionID
==
FUNCTION_LAST_ROW
)
{
return
true
;
}
}
return
false
;
}
static
bool
isCachedLastQuery
(
STaskAttr
*
pQueryAttr
)
{
for
(
int32_t
i
=
0
;
i
<
pQueryAttr
->
numOfOutput
;
++
i
)
{
int32_t
functionId
=
getExprFunctionId
(
&
pQueryAttr
->
pExpr1
[
i
]);
...
...
@@ -2493,18 +2343,6 @@ static bool onlyOneQueryType(STaskAttr* pQueryAttr, int32_t functId, int32_t fun
return
true
;
}
static
bool
onlyFirstQuery
(
STaskAttr
*
pQueryAttr
)
{
return
onlyOneQueryType
(
pQueryAttr
,
FUNCTION_FIRST
,
FUNCTION_FIRST_DST
);
}
static
bool
onlyLastQuery
(
STaskAttr
*
pQueryAttr
)
{
return
onlyOneQueryType
(
pQueryAttr
,
FUNCTION_LAST
,
FUNCTION_LAST_DST
);
}
static
bool
notContainSessionOrStateWindow
(
STaskAttr
*
pQueryAttr
)
{
return
!
(
pQueryAttr
->
sw
.
gap
>
0
||
pQueryAttr
->
stateWindow
);
}
static
int32_t
updateBlockLoadStatus
(
STaskAttr
*
pQuery
,
int32_t
status
)
{
bool
hasFirstLastFunc
=
false
;
bool
hasOtherFunc
=
false
;
...
...
@@ -3006,7 +2844,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
*
status
=
BLK_DATA_ALL_NEEDED
;
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
pTsdbReadHandle
,
NULL
);
SArray
*
pCols
=
tsdbRetrieveDataBlock
(
pTableScanInfo
->
dataReader
,
NULL
);
if
(
pCols
==
NULL
)
{
return
terrno
;
}
...
...
@@ -3899,76 +3737,6 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
return
0
;
}
// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function.
void
setParamForStableStddev
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SExprInfo
*
pExprInfo
)
{
#if 0
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo1 = &(pExprInfo[i]);
if (pExprInfo1->base.functionId != FUNCTION_STDDEV_DST) {
continue;
}
SExprBasicInfo* pExpr = &pExprInfo1->base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (pQueryAttr->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQueryAttr->tagLen) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->info.colId == pExpr->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
#endif
}
void
setParamForStableStddevByColData
(
STaskRuntimeEnv
*
pRuntimeEnv
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SExprInfo
*
pExpr
,
char
*
val
,
int16_t
bytes
)
{
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
#if 0
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprBasicInfo* pExpr1 = &pExpr[i].base;
if (pExpr1->functionId != FUNCTION_STDDEV_DST) {
continue;
}
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->info.colId == pExpr1->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
#endif
}
/*
* There are two cases to handle:
*
...
...
@@ -4679,13 +4447,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
*
newgroup
=
false
;
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
pTsdbReadHandle
))
{
while
(
tsdbNextDataBlock
(
pTableScanInfo
->
dataReader
))
{
if
(
isTaskKilled
(
pOperator
->
pTaskInfo
))
{
longjmp
(
pOperator
->
pTaskInfo
->
env
,
TSDB_CODE_TSC_QUERY_CANCELLED
);
}
pTableScanInfo
->
numOfBlocks
+=
1
;
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
pTsdbReadHandle
,
&
pBlock
->
info
);
tsdbRetrieveDataBlockInfo
(
pTableScanInfo
->
dataReader
,
&
pBlock
->
info
);
// todo opt
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
...
...
@@ -4722,7 +4490,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
// The read handle is not initialized yet, since no qualified tables exists
if
(
pTableScanInfo
->
pTsdbReadHandle
==
NULL
)
{
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
return
NULL
;
}
...
...
@@ -4750,11 +4518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
// if (pTaskInfo->pTsBuf) {
// bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
// assert(ret);
// }
//
if
(
pResultRowInfo
->
size
>
0
)
{
pResultRowInfo
->
curPos
=
0
;
}
...
...
@@ -4790,44 +4553,44 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
newgroup
=
false
;
#if 0
STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables
= (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
STableBlockDist
Info
tableBlockDist
=
{
0
};
tableBlockDist
.
numOfTables
=
1
;
// TODO set the correct number of tables
int32_t
numRowSteps
=
TSDB_DEFAULT_MAX_ROW_FBLOCK
/
TSDB_BLOCK_DIST_STEP_ROWS
;
if
(
TSDB_DEFAULT_MAX_ROW_FBLOCK
%
TSDB_BLOCK_DIST_STEP_ROWS
!=
0
)
{
++
numRowSteps
;
}
tableBlockDist
.
dataBlockInfos
=
taosArrayInit
(
numRowSteps
,
sizeof
(
SFileBlockInfo
));
taosArraySetSize
(
tableBlockDist
.
dataBlockInfos
,
numRowSteps
);
tableBlockDist
.
maxRows
=
INT_MIN
;
tableBlockDist
.
minRows
=
INT_MAX
;
tsdbGetFileBlocksDistInfo(pTableScanInfo->
pTsdbReadHandle
, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->
pTsdbReadHandle
);
tsdbGetFileBlocksDistInfo
(
pTableScanInfo
->
dataReader
,
&
tableBlockDist
);
tableBlockDist
.
numOfRowsInMemTable
=
(
int32_t
)
tsdbGetNumOfRowsInMemTable
(
pTableScanInfo
->
dataReader
);
SSDataBlock
*
pBlock
=
&
pTableScanInfo
->
block
;
pBlock
->
info
.
rows
=
1
;
pBlock
->
info
.
numOfCols
=
1
;
SBufferWriter bw = tbufInitWriter(NULL, false);
blockDistInfoToBinary(&tableBlockDist, &bw);
//
SBufferWriter bw = tbufInitWriter(NULL, false);
//
blockDistInfoToBinary(&tableBlockDist, &bw);
SColumnInfoData
*
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
0
);
int32_t len = (int32_t) tbufTell(&bw);
pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
*(int32_t*) pColInfo->pData = len;
memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
tbufCloseWriter(&bw);
// int32_t len = (int32_t) tbufTell(&bw);
// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
// *(int32_t*) pColInfo->pData = len;
// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
//
// tbufCloseWriter(&bw);
SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv
, 0);
pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
// SArray* g = GET_TABLEGROUP(pOperator->
, 0);
//
pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
pOperator
->
status
=
OP_EXEC_DONE
;
return
pBlock
;
#endif
}
static
void
doClearBufferedBlocks
(
SStreamBlockScanInfo
*
pInfo
)
{
...
...
@@ -5472,7 +5235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
}
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
pTsdbReadHandle
=
pTsdbReadHandle
;
pInfo
->
dataReader
=
pTsdbReadHandle
;
pInfo
->
times
=
repeatTime
;
pInfo
->
reverseTimes
=
reverseTime
;
pInfo
->
order
=
order
;
...
...
@@ -5494,7 +5257,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
pTsdbReadHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
pInfo
->
pTsdbReadHandle
=
pTsdbReadHandle
;
pInfo
->
dataReader
=
pTsdbReadHandle
;
pInfo
->
times
=
1
;
pInfo
->
reverseTimes
=
0
;
pInfo
->
order
=
pRuntimeEnv
->
pQueryAttr
->
order
.
order
;
...
...
@@ -5515,32 +5278,42 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
return
pOperator
;
}
SOperatorInfo
*
create
TableBlockInfoScanOperator
(
void
*
pTsdbReadHandle
,
STaskRuntimeEnv
*
pRuntimeEnv
)
{
SOperatorInfo
*
create
DataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_error
;
}
pInfo
->
pTsdbReadHandle
=
pTsdbReadHandle
;
pInfo
->
dataReader
=
dataReader
;
pInfo
->
block
.
pDataBlock
=
taosArrayInit
(
1
,
sizeof
(
SColumnInfoData
));
SColumnInfoData
infoData
=
{
{
0
}
};
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
TSDB_DATA_TYPE_BINARY
;
infoData
.
info
.
bytes
=
1024
;
infoData
.
info
.
colId
=
0
;
taosArrayPush
(
pInfo
->
block
.
pDataBlock
,
&
infoData
);
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableBlockInfoScanOperator"
;
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator
->
_openFn
=
operatorDummyOpenFn
;
pOperator
->
getNextFn
=
doBlockInfoScan
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
return
pOperator
;
_error:
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
return
NULL
;
}
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
)
{
SStreamBlockScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamBlockScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
...
...
@@ -5589,6 +5362,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pOperator
->
getNextFn
=
doStreamBlockScan
;
pOperator
->
closeFn
=
operatorDummyCloseFn
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
return
pOperator
;
}
...
...
@@ -5922,80 +5696,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return
pOperator
;
}
SArray
*
getOrderCheckColumns
(
STaskAttr
*
pQuery
)
{
int32_t
numOfCols
=
(
pQuery
->
pGroupbyExpr
==
NULL
)
?
0
:
taosArrayGetSize
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
SArray
*
pOrderColumns
=
NULL
;
if
(
numOfCols
>
0
)
{
pOrderColumns
=
taosArrayDup
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
}
else
{
pOrderColumns
=
taosArrayInit
(
4
,
sizeof
(
SColIndex
));
}
if
(
pQuery
->
interval
.
interval
>
0
)
{
if
(
pOrderColumns
==
NULL
)
{
pOrderColumns
=
taosArrayInit
(
1
,
sizeof
(
SColIndex
));
}
SColIndex
colIndex
=
{.
colIndex
=
0
,
.
colId
=
0
,
.
flag
=
TSDB_COL_NORMAL
};
taosArrayPush
(
pOrderColumns
,
&
colIndex
);
}
{
numOfCols
=
(
int32_t
)
taosArrayGetSize
(
pOrderColumns
);
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColIndex
*
index
=
taosArrayGet
(
pOrderColumns
,
i
);
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
SExprBasicInfo
*
pExpr
=
&
pQuery
->
pExpr1
[
j
].
base
;
int32_t
functionId
=
getExprFunctionId
(
&
pQuery
->
pExpr1
[
j
]);
if
(
index
->
colId
==
pExpr
->
pParam
[
0
].
pCol
->
colId
&&
(
functionId
==
FUNCTION_PRJ
||
functionId
==
FUNCTION_TAG
||
functionId
==
FUNCTION_TS
))
{
index
->
colIndex
=
j
;
index
->
colId
=
pExpr
->
resSchema
.
colId
;
}
}
}
}
return
pOrderColumns
;
}
SArray
*
getResultGroupCheckColumns
(
STaskAttr
*
pQuery
)
{
int32_t
numOfCols
=
(
pQuery
->
pGroupbyExpr
==
NULL
)
?
0
:
taosArrayGetSize
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
SArray
*
pOrderColumns
=
NULL
;
if
(
numOfCols
>
0
)
{
pOrderColumns
=
taosArrayDup
(
pQuery
->
pGroupbyExpr
->
columnInfo
);
}
else
{
pOrderColumns
=
taosArrayInit
(
4
,
sizeof
(
SColIndex
));
}
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SColIndex
*
index
=
taosArrayGet
(
pOrderColumns
,
i
);
bool
found
=
false
;
for
(
int32_t
j
=
0
;
j
<
pQuery
->
numOfOutput
;
++
j
)
{
SExprBasicInfo
*
pExpr
=
&
pQuery
->
pExpr1
[
j
].
base
;
int32_t
functionId
=
getExprFunctionId
(
&
pQuery
->
pExpr1
[
j
]);
// FUNCTION_TAG_DUMMY function needs to be ignored
// if (index->colId == pExpr->pColumns->info.colId &&
// ((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) ||
// (TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) {
// index->colIndex = j;
// index->colId = pExpr->resSchema.colId;
// found = true;
// break;
// }
}
assert
(
found
&&
index
->
colIndex
>=
0
&&
index
->
colIndex
<
pQuery
->
numOfOutput
);
}
return
pOrderColumns
;
}
static
int32_t
doInitAggInfoSup
(
SAggSupporter
*
pAggSup
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
const
char
*
pKey
);
static
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
...
...
@@ -7044,9 +6744,6 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
return
pIntervalInfo
->
binfo
.
pRes
;
}
STaskAttr
*
pQueryAttr
=
pRuntimeEnv
->
pQueryAttr
;
int32_t
order
=
pQueryAttr
->
order
.
order
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
while
(
1
)
{
...
...
@@ -7062,14 +6759,14 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
STableQueryInfo
*
pTableQueryInfo
=
pRuntimeEnv
->
current
;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock
(
pOperator
,
pIntervalInfo
->
binfo
.
pCtx
,
pBlock
,
pQueryAttr
->
order
.
order
);
//
setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange
(
pRuntimeEnv
,
pBlock
->
info
.
window
.
skey
);
hashAllIntervalAgg
(
pOperator
,
&
pTableQueryInfo
->
resInfo
,
pBlock
,
pTableQueryInfo
->
groupIndex
);
}
pOperator
->
status
=
OP_RES_TO_RETURN
;
pQueryAttr
->
order
.
order
=
order
;
// TODO : restore the order
//
pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow
(
pRuntimeEnv
);
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
...
...
@@ -8153,7 +7850,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
...
...
@@ -8936,8 +8632,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return
pList
;
}
int32_t
doCreateTableGroup
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableGroupInfo
*
pGroupInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
int32_t
doCreateTableGroup
(
void
*
metaHandle
,
int32_t
tableType
,
uint64_t
tableUid
,
STableGroupInfo
*
pGroupInfo
,
uint64_t
queryId
,
uint64_t
taskId
)
{
int32_t
code
=
0
;
if
(
tableType
==
TSDB_SUPER_TABLE
)
{
code
=
tsdbQuerySTableByTagCond
(
metaHandle
,
tableUid
,
0
,
NULL
,
0
,
0
,
NULL
,
pGroupInfo
,
NULL
,
0
,
queryId
,
taskId
);
...
...
@@ -9041,43 +8736,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return
TSDB_CODE_SUCCESS
;
}
static
void
doUpdateExprColumnIndex
(
STaskAttr
*
pQueryAttr
)
{
assert
(
pQueryAttr
->
pExpr1
!=
NULL
&&
pQueryAttr
!=
NULL
);
for
(
int32_t
k
=
0
;
k
<
pQueryAttr
->
numOfOutput
;
++
k
)
{
SExprBasicInfo
*
pSqlExprMsg
=
&
pQueryAttr
->
pExpr1
[
k
].
base
;
// if (pSqlExprMsg->functionId == FUNCTION_ARITHM) {
// continue;
// }
// todo opt performance
SColIndex
*
pColIndex
=
NULL
;
/*&pSqlExprMsg->colInfo;*/
if
(
TSDB_COL_IS_NORMAL_COL
(
pColIndex
->
flag
))
{
int32_t
f
=
0
;
for
(
f
=
0
;
f
<
pQueryAttr
->
numOfCols
;
++
f
)
{
if
(
pColIndex
->
colId
==
pQueryAttr
->
tableCols
[
f
].
colId
)
{
pColIndex
->
colIndex
=
f
;
break
;
}
}
assert
(
f
<
pQueryAttr
->
numOfCols
);
}
else
if
(
pColIndex
->
colId
<=
TSDB_UD_COLUMN_INDEX
)
{
// do nothing for user-defined constant value result columns
}
else
{
int32_t
f
=
0
;
for
(
f
=
0
;
f
<
pQueryAttr
->
numOfTags
;
++
f
)
{
if
(
pColIndex
->
colId
==
pQueryAttr
->
tagColList
[
f
].
colId
)
{
pColIndex
->
colIndex
=
f
;
break
;
}
}
assert
(
f
<
pQueryAttr
->
numOfTags
||
pColIndex
->
colId
==
TSDB_TBNAME_COLUMN_INDEX
);
}
}
}
void
setResultBufSize
(
STaskAttr
*
pQueryAttr
,
SResultInfo
*
pResultInfo
)
{
const
int32_t
DEFAULT_RESULT_MSG_SIZE
=
1024
*
(
1024
+
512
);
...
...
@@ -9087,16 +8745,16 @@ void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const
float
THRESHOLD_RATIO
=
0
.
85
f
;
if
(
isProjQuery
(
pQueryAttr
))
{
int32_t
numOfRes
=
DEFAULT_RESULT_MSG_SIZE
/
pQueryAttr
->
resultRowSize
;
if
(
numOfRes
<
MIN_ROWS_FOR_PRJ_QUERY
)
{
numOfRes
=
MIN_ROWS_FOR_PRJ_QUERY
;
}
pResultInfo
->
capacity
=
numOfRes
;
}
else
{
// in case of non-prj query, a smaller output buffer will be used.
pResultInfo
->
capacity
=
DEFAULT_MIN_ROWS
;
}
//
if (isProjQuery(pQueryAttr)) {
//
int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize;
//
if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
//
numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
//
}
//
//
pResultInfo->capacity = numOfRes;
//
} else { // in case of non-prj query, a smaller output buffer will be used.
//
pResultInfo->capacity = DEFAULT_MIN_ROWS;
//
}
pResultInfo
->
threshold
=
(
int32_t
)(
pResultInfo
->
capacity
*
THRESHOLD_RATIO
);
pResultInfo
->
totalRows
=
0
;
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
192060eb
...
...
@@ -38,9 +38,14 @@ void minFunction(SqlFunctionCtx* pCtx);
void
maxFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getStddevFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
void
stddevFunction
(
SqlFunctionCtx
*
pCtx
);
void
stddevFinalize
(
SqlFunctionCtx
*
pCtx
);
bool
getPercentileFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
void
percentileFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getFirstLastFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
void
firstFunction
(
SqlFunctionCtx
*
pCtx
);
void
lastFunction
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
192060eb
...
...
@@ -68,9 +68,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
checkFunc
=
stubCheckAndGetResultType
,
.
getEnvFunc
=
getStddevFuncEnv
,
.
initFunc
=
max
FunctionSetup
,
.
processFunc
=
max
Function
,
.
finalizeFunc
=
function
Finalize
.
initFunc
=
stddev
FunctionSetup
,
.
processFunc
=
stddev
Function
,
.
finalizeFunc
=
stddev
Finalize
},
{
.
name
=
"percentile"
,
...
...
@@ -434,6 +434,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break
;
}
case
FUNCTION_TYPE_STDDEV
:
case
FUNCTION_TYPE_SIN
:
case
FUNCTION_TYPE_COS
:
case
FUNCTION_TYPE_TAN
:
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
192060eb
...
...
@@ -14,6 +14,7 @@
*/
#include "builtinsimpl.h"
#include "tpercentile.h"
#include "querynodes.h"
#include "taggfunction.h"
#include "tdatablock.h"
...
...
@@ -453,6 +454,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
}
typedef
struct
SStddevRes
{
double
result
;
int64_t
count
;
union
{
double
quadraticDSum
;
int64_t
quadraticISum
;};
union
{
double
dsum
;
int64_t
isum
;};
...
...
@@ -463,23 +465,64 @@ bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
return
true
;
}
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
SStddevRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
memset
(
pRes
,
0
,
sizeof
(
SStddevRes
));
return
true
;
}
void
stddevFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
SColumnDataAgg
*
pAgg
=
pInput
->
pColumnDataAgg
[
0
];
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SStddevRes
*
pStddevRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
// } else {
// computing based on the true data block
// computing based on the true data block
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
numOfRows
=
pInput
->
numOfRows
;
switch
(
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
plist
=
(
int16_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_INT
:
{
int32_t
*
plist
=
(
int32_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
...
...
@@ -487,14 +530,64 @@ void stddevFunction(SqlFunctionCtx* pCtx) {
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
*
plist
=
(
int64_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
plist
=
(
float
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
*
plist
=
(
double
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
isum
+=
plist
[
i
];
pStddevRes
->
quadraticISum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
default:
break
;
}
// data in the check operation are all null, not output
SET_VAL
(
GET_RES_INFO
(
pCtx
),
numOfElem
,
1
);
}
...
...
@@ -503,11 +596,122 @@ void stddevFinalize(SqlFunctionCtx* pCtx) {
functionFinalize
(
pCtx
);
SStddevRes
*
pStddevRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
double
res
=
pStddevRes
->
quadraticISum
/
pStddevRes
->
count
-
(
pStddevRes
->
isum
/
pStddevRes
->
count
)
*
(
pStddevRes
->
isum
/
pStddevRes
->
count
);
double
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
}
typedef
struct
SPercentileInfo
{
tMemBucket
*
pMemBucket
;
int32_t
stage
;
double
minval
;
double
maxval
;
int64_t
numOfElems
;
}
SPercentileInfo
;
bool
getPercentileFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SPercentileInfo
);
return
true
;
}
bool
percentileFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
// in the first round, get the min-max value of all involved data
SPercentileInfo
*
pInfo
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
SET_DOUBLE_VAL
(
&
pInfo
->
minval
,
DBL_MAX
);
SET_DOUBLE_VAL
(
&
pInfo
->
maxval
,
-
DBL_MAX
);
pInfo
->
numOfElems
=
0
;
return
true
;
}
void
percentileFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
notNullElems
=
0
;
#if 0
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
return;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
}
}
// the first stage, only acquire the min/max value
if (pInfo->stage == 0) {
if (pCtx->preAggVals.isSet) {
double tmin = 0.0, tmax = 0.0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
tmin = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.min);
tmax = (double)GET_INT64_VAL(&pCtx->preAggVals.statis.max);
} else if (IS_FLOAT_TYPE(pCtx->inputType)) {
tmin = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.min);
tmax = GET_DOUBLE_VAL(&pCtx->preAggVals.statis.max);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
tmin = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.min);
tmax = (double)GET_UINT64_VAL(&pCtx->preAggVals.statis.max);
} else {
assert(true);
}
if (GET_DOUBLE_VAL(&pInfo->minval) > tmin) {
SET_DOUBLE_VAL(&pInfo->minval, tmin);
}
if (GET_DOUBLE_VAL(&pInfo->maxval) < tmax) {
SET_DOUBLE_VAL(&pInfo->maxval, tmax);
}
pInfo->numOfElems += (pCtx->size - pCtx->preAggVals.statis.numOfNull);
} else {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v);
}
if (v > GET_DOUBLE_VAL(&pInfo->maxval)) {
SET_DOUBLE_VAL(&pInfo->maxval, v);
}
pInfo->numOfElems += 1;
}
}
return;
}
// the second stage, calculate the true percentile value
for (int32_t i = 0; i < pCtx->size; ++i) {
char *data = GET_INPUT_DATA(pCtx, i);
if (pCtx->hasNull && isNull(data, pCtx->inputType)) {
continue;
}
notNullElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1);
}
SET_VAL(pCtx, notNullElems, 1);
pResInfo->hasResult = DATA_SET_FLAG;
#endif
}
bool
getFirstLastFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SColumnNode
*
pNode
=
nodesListGetNode
(
pFunc
->
pParameterList
,
0
);
...
...
source/libs/function/src/taggfunction.c
浏览文件 @
192060eb
...
...
@@ -176,26 +176,6 @@ typedef struct SResPair {
double
avg
;
}
SResPair
;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef
struct
STableBlockDist
{
uint16_t
rowSize
;
uint16_t
numOfFiles
;
uint32_t
numOfTables
;
uint64_t
totalSize
;
uint64_t
totalRows
;
int32_t
maxRows
;
int32_t
minRows
;
int32_t
firstSeekTimeUs
;
uint32_t
numOfRowsInMemTable
;
uint32_t
numOfSmallBlocks
;
SArray
*
dataBlockInfos
;
}
STableBlockDist
;
typedef
struct
SFileBlockInfo
{
int32_t
numBlocksOfStep
;
}
SFileBlockInfo
;
void
cleanupResultRowEntry
(
struct
SResultRowEntryInfo
*
pCell
)
{
pCell
->
initialized
=
false
;
}
...
...
@@ -3984,7 +3964,7 @@ static void irate_function(SqlFunctionCtx *pCtx) {
}
}
static
void
blockDistInfoFromBinary
(
const
char
*
data
,
int32_t
len
,
STableBlockDist
*
pDist
)
{
static
void
blockDistInfoFromBinary
(
const
char
*
data
,
int32_t
len
,
STableBlockDist
Info
*
pDist
)
{
SBufferReader
br
=
tbufInitReader
(
data
,
len
,
false
);
pDist
->
numOfTables
=
tbufReadUint32
(
&
br
);
...
...
@@ -4024,7 +4004,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi
static
void
blockInfo_func
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDist
Info
*
pDist
=
(
STableBlockDistInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
((
char
*
)
pCtx
->
pInput
+
sizeof
(
int32_t
),
len
,
pDist
);
...
...
@@ -4036,8 +4016,8 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
static
void
mergeTableBlockDist
(
SResultRowEntryInfo
*
pResInfo
,
const
STableBlockDist
*
pSrc
)
{
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
static
void
mergeTableBlockDist
(
SResultRowEntryInfo
*
pResInfo
,
const
STableBlockDist
Info
*
pSrc
)
{
STableBlockDist
Info
*
pDist
=
(
STableBlockDistInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
assert
(
pDist
!=
NULL
&&
pSrc
!=
NULL
);
pDist
->
numOfTables
+=
pSrc
->
numOfTables
;
...
...
@@ -4071,7 +4051,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock
}
void
block_func_merge
(
SqlFunctionCtx
*
pCtx
)
{
STableBlockDist
info
=
{
0
};
STableBlockDist
Info
info
=
{
0
};
int32_t
len
=
*
(
int32_t
*
)
pCtx
->
pInput
;
blockDistInfoFromBinary
(((
char
*
)
pCtx
->
pInput
)
+
sizeof
(
int32_t
),
len
,
&
info
);
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
...
...
@@ -4082,7 +4062,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
void
getPercentiles
(
STableBlockDist
*
pTableBlockDist
,
int64_t
totalBlocks
,
int32_t
numOfPercents
,
void
getPercentiles
(
STableBlockDist
Info
*
pTableBlockDist
,
int64_t
totalBlocks
,
int32_t
numOfPercents
,
double
*
percents
,
int32_t
*
percentiles
)
{
if
(
totalBlocks
==
0
)
{
for
(
int32_t
i
=
0
;
i
<
numOfPercents
;
++
i
)
{
...
...
@@ -4117,7 +4097,7 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32
}
}
void
generateBlockDistResult
(
STableBlockDist
*
pTableBlockDist
,
char
*
result
)
{
void
generateBlockDistResult
(
STableBlockDist
Info
*
pTableBlockDist
,
char
*
result
)
{
if
(
pTableBlockDist
==
NULL
)
{
return
;
}
...
...
@@ -4178,7 +4158,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
void
blockinfo_func_finalizer
(
SqlFunctionCtx
*
pCtx
)
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
pCtx
);
STableBlockDist
*
pDist
=
(
STableBlockDist
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
STableBlockDist
Info
*
pDist
=
(
STableBlockDistInfo
*
)
GET_ROWCELL_INTERBUF
(
pResInfo
);
pDist
->
rowSize
=
(
uint16_t
)
pCtx
->
param
[
0
].
i
;
generateBlockDistResult
(
pDist
,
pCtx
->
pOutput
);
...
...
source/libs/index/src/indexFst.c
浏览文件 @
192060eb
...
...
@@ -1251,7 +1251,6 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
taosArrayPush
(
sws
->
stack
,
&
s
);
out
+=
trn
.
out
;
node
=
fstGetNode
(
sws
->
fst
,
trn
.
addr
);
fstNodeDestroy
(
node
);
}
else
{
// This is a little tricky. We're in this case if the
// given bound is not a prefix of any key in the FST.
...
...
@@ -1349,7 +1348,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
for
(
uint32_t
i
=
0
;
i
<
isz
;
i
++
)
{
buf
[
i
]
=
*
(
uint8_t
*
)
taosArrayGet
(
sws
->
inp
,
i
);
}
FstSlice
slice
=
fstSliceCreate
(
buf
,
taosArrayGetSize
(
sws
->
inp
)
);
FstSlice
slice
=
fstSliceCreate
(
buf
,
isz
);
if
(
fstBoundWithDataExceededBy
(
sws
->
endAt
,
&
slice
))
{
taosArrayDestroyEx
(
sws
->
stack
,
streamStateDestroy
);
sws
->
stack
=
(
SArray
*
)
taosArrayInit
(
256
,
sizeof
(
StreamState
));
...
...
source/libs/index/test/fstTest.cc
浏览文件 @
192060eb
...
...
@@ -510,6 +510,68 @@ void checkFstCheckIteratorRange2() {
}
delete
m
;
}
void
checkFstCheckIteratorRange3
()
{
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
count
=
2
;
// Performance_fstWriteRecords(fw);
int64_t
e
=
taosGetTimestampUs
();
std
::
cout
<<
"insert data count : "
<<
count
<<
"elapas time: "
<<
e
-
s
<<
std
::
endl
;
fw
->
Put
(
"ab"
,
1
);
fw
->
Put
(
"b"
,
2
);
fw
->
Put
(
"cdd"
,
3
);
fw
->
Put
(
"cde"
,
3
);
fw
->
Put
(
"ddd"
,
4
);
fw
->
Put
(
"ed"
,
5
);
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
delete
m
;
return
;
}
{
// range search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"he"
,
AUTOMATION_ALWAYS
);
// [b, e)
m
->
SearchRange
(
ctx
,
"b"
,
GE
,
""
,
(
RangeType
)
10
,
result
);
assert
(
result
.
size
()
==
5
);
automCtxDestroy
(
ctx
);
}
{
// range search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"he"
,
AUTOMATION_ALWAYS
);
// [b, e)
m
->
SearchRange
(
ctx
,
""
,
(
RangeType
)
20
,
"ab"
,
LE
,
result
);
assert
(
result
.
size
()
==
1
);
automCtxDestroy
(
ctx
);
// taosMemoryFree(ctx);
}
{
// range search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"he"
,
AUTOMATION_ALWAYS
);
// [b, e)
m
->
SearchRange
(
ctx
,
""
,
(
RangeType
)
30
,
"ab"
,
LT
,
result
);
assert
(
result
.
size
()
==
0
);
automCtxDestroy
(
ctx
);
}
{
// range search
std
::
vector
<
uint64_t
>
result
;
AutomationCtx
*
ctx
=
automCtxCreate
((
void
*
)
"he"
,
AUTOMATION_ALWAYS
);
// [b, e)
m
->
SearchRange
(
ctx
,
"ed"
,
GT
,
"ed"
,
(
RangeType
)
40
,
result
);
assert
(
result
.
size
()
==
0
);
automCtxDestroy
(
ctx
);
}
delete
m
;
}
void
fst_get
(
Fst
*
fst
)
{
for
(
int
i
=
0
;
i
<
10000
;
i
++
)
{
...
...
@@ -573,11 +635,12 @@ int main(int argc, char* argv[]) {
// path suid colName ver
// iterTFileReader(argv[1], argv[2], argv[3], argv[4]);
//}
checkFstCheckIterator1
();
checkFstCheckIterator2
();
checkFstCheckIteratorPrefix
();
checkFstCheckIteratorRange1
();
checkFstCheckIteratorRange2
();
// checkFstCheckIterator1();
// checkFstCheckIterator2();
// checkFstCheckIteratorPrefix();
// checkFstCheckIteratorRange1();
// checkFstCheckIteratorRange2();
checkFstCheckIteratorRange3
();
// checkFstLongTerm();
// checkFstPrefixSearch();
...
...
source/libs/parser/inc/parInsertData.h
浏览文件 @
192060eb
...
...
@@ -77,7 +77,7 @@ typedef struct STableDataBlocks {
STableMeta
*
pTableMeta
;
// the tableMeta of current table, the table meta will be used during submit, keep a ref to avoid to be removed from cache
char
*
pData
;
bool
cloned
;
int32_t
createTbReqLen
;
SParsedDataColInfo
boundColumnInfo
;
SRowBuilder
rowBuilder
;
}
STableDataBlocks
;
...
...
@@ -118,6 +118,7 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks*
pBlocks
->
suid
=
(
TSDB_NORMAL_TABLE
==
dataBuf
->
pTableMeta
->
tableType
?
dataBuf
->
pTableMeta
->
uid
:
dataBuf
->
pTableMeta
->
suid
);
pBlocks
->
uid
=
dataBuf
->
pTableMeta
->
uid
;
pBlocks
->
sversion
=
dataBuf
->
pTableMeta
->
sversion
;
pBlocks
->
schemaLen
=
dataBuf
->
createTbReqLen
;
if
(
pBlocks
->
numOfRows
+
numOfRows
>=
INT16_MAX
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
...
...
@@ -136,7 +137,7 @@ void destroyBlockHashmap(SHashObj* pDataBlockHash);
int
initRowBuilder
(
SRowBuilder
*
pBuilder
,
int16_t
schemaVer
,
SParsedDataColInfo
*
pColInfo
);
int32_t
allocateMemIfNeed
(
STableDataBlocks
*
pDataBlock
,
int32_t
rowSize
,
int32_t
*
numOfRows
);
int32_t
getDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
);
int32_t
mergeTableDataBlocks
(
SHashObj
*
pHashObj
,
int8_t
schemaAttached
,
uint8_t
payloadType
,
SArray
**
pVgDataBlocks
);
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
,
SVCreateTbReq
*
pCreateTbReq
);
int32_t
mergeTableDataBlocks
(
SHashObj
*
pHashObj
,
uint8_t
payloadType
,
SArray
**
pVgDataBlocks
);
#endif // TDENGINE_DATABLOCKMGT_H
source/libs/parser/src/parInsert.c
浏览文件 @
192060eb
...
...
@@ -753,7 +753,7 @@ static int32_t buildCreateTbReq(SInsertParseContext* pCxt, const SName* pName, S
}
// pSql -> tag1_value, ...)
static
int32_t
parseTagsClause
(
SInsertParseContext
*
pCxt
,
SSchema
*
p
Tags
Schema
,
uint8_t
precision
,
const
SName
*
pName
)
{
static
int32_t
parseTagsClause
(
SInsertParseContext
*
pCxt
,
SSchema
*
pSchema
,
uint8_t
precision
,
const
SName
*
pName
)
{
if
(
tdInitKVRowBuilder
(
&
pCxt
->
tagsBuilder
)
<
0
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
...
...
@@ -763,9 +763,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
char
tmpTokenBuf
[
TSDB_MAX_BYTES_PER_ROW
]
=
{
0
};
// used for deleting Escape character: \\, \', \"
for
(
int
i
=
0
;
i
<
pCxt
->
tags
.
numOfBound
;
++
i
)
{
NEXT_TOKEN_WITH_PREV
(
pCxt
->
pSql
,
sToken
);
SSchema
*
p
Schema
=
&
pTagsSchema
[
pCxt
->
tags
.
boundColumns
[
i
]];
param
.
schema
=
pSchema
;
CHECK_CODE
(
parseValueToken
(
&
pCxt
->
pSql
,
&
sToken
,
pSchema
,
precision
,
tmpTokenBuf
,
KvRowAppend
,
&
param
,
&
pCxt
->
msg
));
SSchema
*
p
TagSchema
=
&
pSchema
[
pCxt
->
tags
.
boundColumns
[
i
]
-
1
];
// colId starts with 1
param
.
schema
=
p
Tag
Schema
;
CHECK_CODE
(
parseValueToken
(
&
pCxt
->
pSql
,
&
sToken
,
p
Tag
Schema
,
precision
,
tmpTokenBuf
,
KvRowAppend
,
&
param
,
&
pCxt
->
msg
));
}
SKVRow
row
=
tdGetKVRowFromBuilder
(
&
pCxt
->
tagsBuilder
);
...
...
@@ -791,6 +791,7 @@ static int32_t storeTableMeta(SHashObj* pHash, const char* pName, int32_t len, S
if
(
TSDB_CODE_SUCCESS
!=
cloneTableMeta
(
pMeta
,
&
pBackup
))
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
pBackup
->
uid
=
tGenIdPI64
();
return
taosHashPut
(
pHash
,
pName
,
len
,
&
pBackup
,
POINTER_BYTES
);
}
...
...
@@ -833,7 +834,11 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
if
(
TK_NK_LP
!=
sToken
.
type
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"( is expected"
,
sToken
.
z
);
}
CHECK_CODE
(
parseTagsClause
(
pCxt
,
pTagsSchema
,
getTableInfo
(
pCxt
->
pTableMeta
).
precision
,
&
name
));
CHECK_CODE
(
parseTagsClause
(
pCxt
,
pCxt
->
pTableMeta
->
schema
,
getTableInfo
(
pCxt
->
pTableMeta
).
precision
,
&
name
));
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
if
(
TK_NK_RP
!=
sToken
.
type
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
") is expected"
,
sToken
.
z
);
}
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1015,7 +1020,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
STableDataBlocks
*
dataBuf
=
NULL
;
CHECK_CODE
(
getDataBlockFromList
(
pCxt
->
pTableBlockHashObj
,
pCxt
->
pTableMeta
->
uid
,
TSDB_DEFAULT_PAYLOAD_SIZE
,
sizeof
(
SSubmitBlk
),
getTableInfo
(
pCxt
->
pTableMeta
).
rowSize
,
pCxt
->
pTableMeta
,
&
dataBuf
,
NULL
));
sizeof
(
SSubmitBlk
),
getTableInfo
(
pCxt
->
pTableMeta
).
rowSize
,
pCxt
->
pTableMeta
,
&
dataBuf
,
NULL
,
&
pCxt
->
createTblReq
));
if
(
TK_NK_LP
==
sToken
.
type
)
{
// pSql -> field1_name, ...)
...
...
@@ -1046,7 +1051,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
}
// merge according to vgId
if
(
!
TSDB_QUERY_HAS_TYPE
(
pCxt
->
pOutput
->
insertType
,
TSDB_QUERY_TYPE_STMT_INSERT
)
&&
taosHashGetSize
(
pCxt
->
pTableBlockHashObj
)
>
0
)
{
CHECK_CODE
(
mergeTableDataBlocks
(
pCxt
->
pTableBlockHashObj
,
pCxt
->
pOutput
->
schemaAttache
,
pCxt
->
pOutput
->
payloadType
,
&
pCxt
->
pVgDataBlocks
));
CHECK_CODE
(
mergeTableDataBlocks
(
pCxt
->
pTableBlockHashObj
,
pCxt
->
pOutput
->
payloadType
,
&
pCxt
->
pVgDataBlocks
));
}
return
buildOutput
(
pCxt
);
}
...
...
source/libs/parser/src/parInsertData.c
浏览文件 @
192060eb
...
...
@@ -149,8 +149,28 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
buildCreateTbMsg
(
STableDataBlocks
*
pBlocks
,
SVCreateTbReq
*
pCreateTbReq
)
{
int32_t
len
=
tSerializeSVCreateTbReq
(
NULL
,
pCreateTbReq
);
if
(
pBlocks
->
nAllocSize
-
pBlocks
->
size
<
len
)
{
pBlocks
->
nAllocSize
+=
len
+
pBlocks
->
rowSize
;
char
*
pTmp
=
taosMemoryRealloc
(
pBlocks
->
pData
,
pBlocks
->
nAllocSize
);
if
(
pTmp
!=
NULL
)
{
pBlocks
->
pData
=
pTmp
;
memset
(
pBlocks
->
pData
+
pBlocks
->
size
,
0
,
pBlocks
->
nAllocSize
-
pBlocks
->
size
);
}
else
{
pBlocks
->
nAllocSize
-=
len
+
pBlocks
->
rowSize
;
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
}
char
*
pBuf
=
pBlocks
->
pData
+
pBlocks
->
size
;
tSerializeSVCreateTbReq
((
void
**
)
&
pBuf
,
pCreateTbReq
);
pBlocks
->
size
+=
len
;
pBlocks
->
createTbReqLen
=
len
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
getDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
)
{
const
STableMeta
*
pTableMeta
,
STableDataBlocks
**
dataBlocks
,
SArray
*
pBlockList
,
SVCreateTbReq
*
pCreateTbReq
)
{
*
dataBlocks
=
NULL
;
STableDataBlocks
**
t1
=
(
STableDataBlocks
**
)
taosHashGet
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
id
));
if
(
t1
!=
NULL
)
{
...
...
@@ -163,6 +183,13 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3
return
ret
;
}
if
(
NULL
!=
pCreateTbReq
&&
NULL
!=
pCreateTbReq
->
ctbCfg
.
pTag
)
{
ret
=
buildCreateTbMsg
(
*
dataBlocks
,
pCreateTbReq
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
}
taosHashPut
(
pHashList
,
(
const
char
*
)
&
id
,
sizeof
(
int64_t
),
(
char
*
)
dataBlocks
,
POINTER_BYTES
);
if
(
pBlockList
)
{
taosArrayPush
(
pBlockList
,
dataBlocks
);
...
...
@@ -294,7 +321,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
int32_t
extendedRowSize
=
getExtendedRowSize
(
dataBuf
);
SBlockKeyTuple
*
pBlkKeyTuple
=
pBlkKeyInfo
->
pKeyTuple
;
char
*
pBlockData
=
pBlocks
->
data
;
char
*
pBlockData
=
pBlocks
->
data
+
pBlocks
->
schemaLen
;
int
n
=
0
;
while
(
n
<
nRows
)
{
pBlkKeyTuple
->
skey
=
TD_ROW_KEY
((
STSRow
*
)
pBlockData
);
...
...
@@ -340,44 +367,26 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey
}
// Erase the empty space reserved for binary data
static
int
trimDataBlock
(
void
*
pDataBlock
,
STableDataBlocks
*
pTableDataBlock
,
SBlockKeyTuple
*
blkKeyTuple
,
int8_t
schemaAttached
,
bool
isRawPayload
)
{
static
int
trimDataBlock
(
void
*
pDataBlock
,
STableDataBlocks
*
pTableDataBlock
,
SBlockKeyTuple
*
blkKeyTuple
,
bool
isRawPayload
)
{
// TODO: optimize this function, handle the case while binary is not presented
STableMeta
*
pTableMeta
=
pTableDataBlock
->
pTableMeta
;
STableComInfo
tinfo
=
getTableInfo
(
pTableMeta
);
SSchema
*
pSchema
=
getTableColumnSchema
(
pTableMeta
);
int32_t
nonDataLen
=
sizeof
(
SSubmitBlk
)
+
pTableDataBlock
->
createTbReqLen
;
SSubmitBlk
*
pBlock
=
pDataBlock
;
memcpy
(
pDataBlock
,
pTableDataBlock
->
pData
,
sizeof
(
SSubmitBlk
)
);
pDataBlock
=
(
char
*
)
pDataBlock
+
sizeof
(
SSubmitBlk
)
;
memcpy
(
pDataBlock
,
pTableDataBlock
->
pData
,
nonDataLen
);
pDataBlock
=
(
char
*
)
pDataBlock
+
nonDataLen
;
int32_t
flen
=
0
;
// original total length of row
// schema needs to be included into the submit data block
if
(
schemaAttached
)
{
int32_t
numOfCols
=
getNumOfColumns
(
pTableDataBlock
->
pTableMeta
);
for
(
int32_t
j
=
0
;
j
<
numOfCols
;
++
j
)
{
STColumn
*
pCol
=
(
STColumn
*
)
pDataBlock
;
pCol
->
colId
=
htons
(
pSchema
[
j
].
colId
);
pCol
->
type
=
pSchema
[
j
].
type
;
pCol
->
bytes
=
htons
(
pSchema
[
j
].
bytes
);
pCol
->
offset
=
0
;
pDataBlock
=
(
char
*
)
pDataBlock
+
sizeof
(
STColumn
);
flen
+=
TYPE_BYTES
[
pSchema
[
j
].
type
];
}
int32_t
schemaSize
=
sizeof
(
STColumn
)
*
numOfCols
;
pBlock
->
schemaLen
=
schemaSize
;
}
else
{
if
(
isRawPayload
)
{
for
(
int32_t
j
=
0
;
j
<
tinfo
.
numOfColumns
;
++
j
)
{
flen
+=
TYPE_BYTES
[
pSchema
[
j
].
type
];
}
}
pBlock
->
schemaLen
=
0
;
}
pBlock
->
schemaLen
=
pTableDataBlock
->
createTbReqLen
;
char
*
p
=
pTableDataBlock
->
pData
+
sizeof
(
SSubmitBlk
)
;
char
*
p
=
pTableDataBlock
->
pData
+
nonDataLen
;
pBlock
->
dataLen
=
0
;
int32_t
numOfRows
=
pBlock
->
numOfRows
;
...
...
@@ -414,7 +423,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
return
pBlock
->
dataLen
+
pBlock
->
schemaLen
;
}
int32_t
mergeTableDataBlocks
(
SHashObj
*
pHashObj
,
int8_t
schemaAttached
,
uint8_t
payloadType
,
SArray
**
pVgDataBlocks
)
{
int32_t
mergeTableDataBlocks
(
SHashObj
*
pHashObj
,
uint8_t
payloadType
,
SArray
**
pVgDataBlocks
)
{
const
int
INSERT_HEAD_SIZE
=
sizeof
(
SSubmitReq
);
int
code
=
0
;
bool
isRawPayload
=
IS_RAW_PAYLOAD
(
payloadType
);
...
...
@@ -429,7 +438,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
if
(
pBlocks
->
numOfRows
>
0
)
{
STableDataBlocks
*
dataBuf
=
NULL
;
int32_t
ret
=
getDataBlockFromList
(
pVnodeDataBlockHashList
,
pOneTableBlock
->
vgId
,
TSDB_PAYLOAD_SIZE
,
INSERT_HEAD_SIZE
,
0
,
pOneTableBlock
->
pTableMeta
,
&
dataBuf
,
pVnodeDataBlockList
);
INSERT_HEAD_SIZE
,
0
,
pOneTableBlock
->
pTableMeta
,
&
dataBuf
,
pVnodeDataBlockList
,
NULL
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
taosHashCleanup
(
pVnodeDataBlockHashList
);
destroyBlockArrayList
(
pVnodeDataBlockList
);
...
...
@@ -474,7 +483,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
sizeof
(
STColumn
)
*
getNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
// erase the empty space reserved for binary data
int32_t
finalLen
=
trimDataBlock
(
dataBuf
->
pData
+
dataBuf
->
size
,
pOneTableBlock
,
blkKeyInfo
.
pKeyTuple
,
schemaAttached
,
isRawPayload
);
int32_t
finalLen
=
trimDataBlock
(
dataBuf
->
pData
+
dataBuf
->
size
,
pOneTableBlock
,
blkKeyInfo
.
pKeyTuple
,
isRawPayload
);
assert
(
finalLen
<=
len
);
dataBuf
->
size
+=
(
finalLen
+
sizeof
(
SSubmitBlk
));
...
...
source/libs/parser/test/parserInsertTest.cpp
浏览文件 @
192060eb
...
...
@@ -62,7 +62,7 @@ protected:
void
dumpReslut
()
{
SVnodeModifOpStmt
*
pStmt
=
getVnodeModifStmt
(
res_
);
size_t
num
=
taosArrayGetSize
(
pStmt
->
pDataBlocks
);
cout
<<
"
schemaAttache:"
<<
(
int32_t
)
pStmt
->
schemaAttache
<<
",
payloadType:"
<<
(
int32_t
)
pStmt
->
payloadType
<<
", insertType:"
<<
pStmt
->
insertType
<<
", numOfVgs:"
<<
num
<<
endl
;
cout
<<
"payloadType:"
<<
(
int32_t
)
pStmt
->
payloadType
<<
", insertType:"
<<
pStmt
->
insertType
<<
", numOfVgs:"
<<
num
<<
endl
;
for
(
size_t
i
=
0
;
i
<
num
;
++
i
)
{
SVgDataBlocks
*
vg
=
(
SVgDataBlocks
*
)
taosArrayGetP
(
pStmt
->
pDataBlocks
,
i
);
cout
<<
"vgId:"
<<
vg
->
vg
.
vgId
<<
", numOfTables:"
<<
vg
->
numOfTables
<<
", dataSize:"
<<
vg
->
size
<<
endl
;
...
...
@@ -81,7 +81,6 @@ protected:
void
checkReslut
(
int32_t
numOfTables
,
int16_t
numOfRows1
,
int16_t
numOfRows2
=
-
1
)
{
SVnodeModifOpStmt
*
pStmt
=
getVnodeModifStmt
(
res_
);
ASSERT_EQ
(
pStmt
->
schemaAttache
,
0
);
ASSERT_EQ
(
pStmt
->
payloadType
,
PAYLOAD_TYPE_KV
);
ASSERT_EQ
(
pStmt
->
insertType
,
TSDB_QUERY_TYPE_INSERT
);
size_t
num
=
taosArrayGetSize
(
pStmt
->
pDataBlocks
);
...
...
@@ -168,6 +167,18 @@ TEST_F(InsertTest, multiTableMultiRowTest) {
checkReslut
(
2
,
3
,
2
);
}
// INSERT INTO
// tb1_name USING st1_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
// tb2_name USING st2_name [(tag1_name, ...)] TAGS (tag1_value, ...) VALUES (field1_value, ...)
TEST_F
(
InsertTest
,
autoCreateTableTest
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"insert into st1s1 using st1 tags(1, 'wxy') values (now, 1,
\"
beijing
\"
)(now+1s, 2,
\"
shanghai
\"
)(now+2s, 3,
\"
guangzhou
\"
)"
);
ASSERT_EQ
(
run
(),
TSDB_CODE_SUCCESS
);
dumpReslut
();
checkReslut
(
1
,
3
);
}
TEST_F
(
InsertTest
,
toleranceTest
)
{
setDatabase
(
"root"
,
"test"
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
192060eb
...
...
@@ -5,6 +5,7 @@
./test.sh -f tsim/user/basic1.sim
# ---- db
./test.sh -f tsim/db/create_all_options.sim
./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/basic1.sim
./test.sh -f tsim/db/basic2.sim
...
...
tests/script/tsim/db/create_all_options.sim
0 → 100644
浏览文件 @
192060eb
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
$loop_cnt = 0
check_dnode_ready_1:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> rows: $rows
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
print ===> $data20 $data21 $data22 $data23 $data24 $data25
if $data00 != 1 then
return -1
endi
if $data01 != localhost:7100 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready_1
endi
if $data14 != ready then
goto check_dnode_ready_1
endi
if $data24 != ready then
goto check_dnode_ready_1
endi
print ============= create database with all options
#database_option: {
# | BLOCKS value [3~1000, default: 6]
# | CACHE value [default: 16]
# | CACHELAST value [0, 1, 2, 3, default: 0]
# | COMP [0 | 1 | 2, default: 2]
# | DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day]
# | FSYNC value [0 ~ 180000 ms, default: 3000]
# | MAXROWS value [200~10000, default: 4096]
# | MINROWS value [10~1000, default: 100]
# | KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day]
# | PRECISION ['ms' | 'us' | 'ns', default: ms]
# | QUORUM value [1 | 2, default: 1]
# | REPLICA value [1 | 3, default: 1]
# | TTL value [1d ~ , default: 1]
# | WAL value [1 | 2, default: 1]
# | VGROUPS value [default: 2]
# | SINGLE_STABLE [0 | 1, default: ]
# | STREAM_MODE [0 | 1, default: ]
#
#$data0_db : name
#$data1_db : create_time
#$data2_db : vgroups
#$data3_db : ntables
#$data4_db : replica
#$data5_db : quorum
#$data6_db : days
#$data7_db : keep
#$data8_db : cache
#$data9_db : blocks
#$data10_db : minrows
#$data11_db : maxrows
#$data12_db : wal
#$data13_db : fsync
#$data14_db : comp
#$data15_db : cachelast
#$data16_db : precision
print ====> create database db, with default
sql create database db
sql show databases
print rows: $rows
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $rows != 2 then
return -1
endi
if $data0_db != db then # name
return -1
endi
if $data2_db != 2 then # vgroups
return -1
endi
if $data3_db != 0 then # ntables
return -1
endi
if $data4_db != 1 then # replica
return -1
endi
if $data5_db != 1 then # quorum
return -1
endi
if $data6_db != 14400 then # days
return -1
endi
if $data7_db != 5256000,5256000,5256000 then # keep
return -1
endi
if $data8_db != 16 then # cache
return -1
endi
if $data9_db != 6 then # blocks
return -1
endi
if $data10_db != 100 then # minrows
return -1
endi
if $data11_db != 4096 then # maxrows
return -1
endi
if $data12_db != 1 then # wal
return -1
endi
if $data13_db != 3000 then # fsync
return -1
endi
if $data14_db != 2 then # comp
return -1
endi
if $data15_db != 0 then # cachelast
return -1
endi
if $data16_db != ms then # precision
return -1
endi
sql drop database db
print ====> BLOCKS value [3~1000, default: 6]
sql create database db BLOCKS 3
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data9_db != 3 then
return -1
endi
sql drop database db
sql create database db BLOCKS 1000
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data9_db != 1000 then
return -1
endi
sql drop database db
sql_error create database db BLOCKS 2
sql_error create database db BLOCKS 0
sql_error create database db BLOCKS -1
print ====> CACHE value [default: 16]
sql create database db CACHE 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data8_db != 1 then
return -1
endi
sql drop database db
sql create database db CACHE 128
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data8_db != 128 then
return -1
endi
sql drop database db
print ====> CACHELAST value [0, 1, 2, 3, default: 0]
sql create database db CACHELAST 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data15_db != 1 then
return -1
endi
sql drop database db
sql create database db CACHELAST 2
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data15_db != 2 then
return -1
endi
sql drop database db
sql create database db CACHELAST 3
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data15_db != 3 then
return -1
endi
sql drop database db
sql_error create database db CACHELAST 4
sql_error create database db CACHELAST -1
print ====> COMP [0 | 1 | 2, default: 2]
sql create database db COMP 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data14_db != 1 then
return -1
endi
sql drop database db
sql create database db COMP 0
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data14_db != 0 then
return -1
endi
sql drop database db
sql_error create database db COMP 3
sql_error create database db COMP -1
#print ====> DAYS value [60m ~ min(3650d,keep), default: 10d, unit may be minut/hour/day]
#print ====> KEEP value [max(1d ~ 365000d), default: 1d, unit may be minut/hour/day]
#sql create database db DAYS 60m KEEP 60m
#sql show databases
#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $data6_db != 60 then
# return -1
#endi
#if $data7_db != 60,60,60 then
# return -1
#endi
#sql drop database db
#sql create database db DAYS 60m KEEP 1d
#sql show databases
#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $data6_db != 60 then
# return -1
#endi
#if $data7_db != 1440,1440,1440 then
# return -1
#endi
#sql create database db DAYS 3650d KEEP 365000d
#sql show databases
#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $data6_db != 5256000 then
# return -1
#endi
#if $data7_db != 525600000,525600000,525600000 then
# return -1
#endi
#sql drop database db
#sql_error create database db DAYS -59m
#sql_error create database db DAYS 59m
#sql_error create database db DAYS 5256001m
#sql_error create database db DAYS 3651d
#sql_error create database db KEEP -59m
#sql_error create database db KEEP 14399m
#sql_error create database db KEEP 525600001m
#sql_error create database db KEEP 365001d
print ====> FSYNC value [0 ~ 180000 ms, default: 3000]
sql create database db FSYNC 0
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data13_db != 0 then
return -1
endi
sql drop database db
sql create database db FSYNC 180000
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data13_db != 180000 then
return -1
endi
sql drop database db
sql_error create database db FSYNC 180001
sql_error create database db FSYNC -1
print ====> MAXROWS value [200~10000, default: 4096], MINROWS value [10~1000, default: 100]
sql create database db MAXROWS 10000 MINROWS 1000
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data10_db != 1000 then
return -1
endi
if $data11_db != 10000 then
return -1
endi
sql drop database db
sql create database db MAXROWS 200 MINROWS 10
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data10_db != 10 then
return -1
endi
if $data11_db != 200 then
return -1
endi
sql drop database db
sql_error create database db MAXROWS -1
sql_error create database db MAXROWS 0
sql_error create database db MAXROWS 199
sql_error create database db MAXROWS 10001
sql_error create database db MINROWS -1
sql_error create database db MINROWS 0
sql_error create database db MINROWS 9
sql_error create database db MINROWS 1001
sql_error create database db MAXROWS 500 MINROWS 1000
print ====> PRECISION ['ms' | 'us' | 'ns', default: ms]
sql create database db PRECISION 'us'
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data16_db != us then
return -1
endi
sql drop database db
sql create database db PRECISION 'ns'
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data16_db != ns then
return -1
endi
sql drop database db
sql_error create database db PRECISION 'as'
sql_error create database db PRECISION -1
print ====> QUORUM value [1 | 2, default: 1]
#sql create database db QUORUM 2
#sql show databases
#print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $data5_db != 2 then
# return -1
#endi
#sql drop database db
sql create database db QUORUM 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data5_db != 1 then
return -1
endi
sql drop database db
sql_error create database db QUORUM 3
sql_error create database db QUORUM 0
sql_error create database db QUORUM -1
print ====> REPLICA value [1 | 3, default: 1]
sql create database db REPLICA 3
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data4_db != 3 then
return -1
endi
sql drop database db
sql create database db REPLICA 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data4_db != 1 then
return -1
endi
sql drop database db
sql_error create database db REPLICA 2
sql_error create database db REPLICA 0
sql_error create database db REPLICA -1
sql_error create database db REPLICA 4
print ====> TTL value [1d ~ , default: 1]
sql create database db TTL 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXX_db != 1 then
# return -1
#endi
sql drop database db
sql create database db TTL 10
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXX_db != 10 then
# return -1
#endi
sql drop database db
sql_error create database db TTL 0
sql_error create database db TTL -1
print ====> WAL value [1 | 2, default: 1]
sql create database db WAL 2
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data12_db != 2 then
return -1
endi
sql drop database db
sql create database db WAL 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data12_db != 1 then
return -1
endi
sql drop database db
sql_error create database db WAL 3
sql_error create database db WAL -1
sql_error create database db WAL 0
print ====> VGROUPS value [1~4096, default: 2]
sql create database db VGROUPS 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data2_db != 1 then
return -1
endi
sql drop database db
sql create database db VGROUPS 16
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
if $data2_db != 16 then
return -1
endi
sql drop database db
sql_error create database db VGROUPS 4097
sql_error create database db VGROUPS -1
sql_error create database db VGROUPS 0
print ====> SINGLE_STABLE [0 | 1, default: ]
sql create database db SINGLE_STABLE 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXXXX_db != 1 then
# return -1
#endi
sql drop database db
sql create database db SINGLE_STABLE 0
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXXXX_db != 0 then
# return -1
#endi
sql drop database db
sql_error create database db SINGLE_STABLE 2
sql_error create database db SINGLE_STABLE -1
print ====> STREAM_MODE [0 | 1, default: ]
sql create database db STREAM_MODE 1
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXXX_db != 1 then
# return -1
#endi
sql drop database db
sql create database db STREAM_MODE 0
sql show databases
print $data0_db $data1_db $data2_db $data3_db $data4_db $data5_db $data6_db $data7_db $data8_db $data9_db $data10_db $data11_db $data12_db $data13_db $data14_db $data15_db $data16_db $data17_db
#if $dataXXX_db != 0 then
# return -1
#endi
sql drop database db
sql_error create database db STREAM_MODE 2
sql_error create database db STREAM_MODE -1
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录