Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d8479d7a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d8479d7a
编写于
5月 20, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
merge from 3.0
上级
7391a756
8dad69b4
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
546 addition
and
233 deletion
+546
-233
cmake/cmake.options
cmake/cmake.options
+12
-6
include/libs/qcom/query.h
include/libs/qcom/query.h
+5
-3
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+43
-50
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-4
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+3
-1
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+12
-15
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+19
-24
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+11
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+8
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+6
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+39
-29
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-17
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+45
-30
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+15
-1
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+13
-13
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+36
-22
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+12
-1
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+7
-7
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+5
-5
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+2
-0
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+1
-0
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+2
-0
tests/script/tsim/insert/update0.sim
tests/script/tsim/insert/update0.sim
+230
-0
未找到文件。
cmake/cmake.options
浏览文件 @
d8479d7a
...
...
@@ -46,6 +46,18 @@ IF(${TD_WINDOWS})
ON
)
option(
BUILD_TEST
"If build unit tests using googletest"
OFF
)
ELSE ()
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ENDIF ()
option(
...
...
@@ -54,12 +66,6 @@ option(
OFF
)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
option(
BUILD_WITH_LEVELDB
"If build with leveldb"
...
...
include/libs/qcom/query.h
浏览文件 @
d8479d7a
...
...
@@ -182,8 +182,10 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE
#define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST)
#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_PAR_TABLE_NOT_EXIST || (_code) == TSDB_CODE_VND_TB_NOT_EXIST || \
(_code) == TSDB_CODE_PAR_INVALID_COLUMNS_NUM || (_code) == TSDB_CODE_PAR_INVALID_COLUMN || \
(_code) == TSDB_CODE_PAR_TAGS_NOT_MATCHED)
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED)
...
...
@@ -194,7 +196,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
#define NEED_SCHEDULER_RETRY_ERROR(_code) \
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
#define REQUEST_MAX_TRY_TIMES
5
#define REQUEST_MAX_TRY_TIMES
1
#define qFatal(...) \
do { \
...
...
source/client/src/clientImpl.c
浏览文件 @
d8479d7a
...
...
@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "command.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tref.h"
#include "cJSON.h"
#include "tdataformat.h"
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
...
...
@@ -189,7 +189,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
(
*
pQuery
)
->
pResSchema
,
(
*
pQuery
)
->
numOfResCols
);
setResPrecision
(
&
pRequest
->
body
.
resInfo
,
(
*
pQuery
)
->
precision
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
||
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
TSWAP
(
pRequest
->
dbList
,
(
*
pQuery
)
->
pDbList
);
TSWAP
(
pRequest
->
tableList
,
(
*
pQuery
)
->
pTableList
);
}
...
...
@@ -293,7 +294,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult
res
=
{.
code
=
0
,
.
numOfRows
=
0
,
.
msgSize
=
ERROR_MSG_BUF_DEFAULT_SIZE
,
.
msg
=
pRequest
->
msgBuf
};
int32_t
code
=
schedulerExecJob
(
pTransporter
,
pNodeList
,
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
&
res
);
pRequest
->
metric
.
start
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
pRequest
->
body
.
queryJob
!=
0
)
{
schedulerFreeJob
(
pRequest
->
body
.
queryJob
);
...
...
@@ -483,7 +484,8 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
int32_t
retryNum
=
0
;
int32_t
code
=
0
;
while
(
retryNum
++
<
REQUEST_MAX_TRY_TIMES
)
{
do
{
destroyRequest
(
pRequest
);
pRequest
=
launchQuery
(
pTscObj
,
sql
,
sqlLen
);
if
(
pRequest
==
NULL
||
TSDB_CODE_SUCCESS
==
pRequest
->
code
||
!
NEED_CLIENT_HANDLE_ERROR
(
pRequest
->
code
))
{
break
;
...
...
@@ -494,9 +496,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
pRequest
->
code
=
code
;
break
;
}
destroyRequest
(
pRequest
);
}
}
while
(
retryNum
++
<
REQUEST_MAX_TRY_TIMES
);
return
pRequest
;
}
...
...
@@ -805,21 +805,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
return
TSDB_CODE_SUCCESS
;
}
static
char
*
parseTagDatatoJson
(
void
*
p
){
char
*
string
=
NULL
;
cJSON
*
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
static
char
*
parseTagDatatoJson
(
void
*
p
)
{
char
*
string
=
NULL
;
cJSON
*
json
=
cJSON_CreateObject
();
if
(
json
==
NULL
)
{
goto
end
;
}
int16_t
nCols
=
kvRowNCols
(
p
);
char
tagJsonKey
[
256
]
=
{
0
};
char
tagJsonKey
[
256
]
=
{
0
};
for
(
int
j
=
0
;
j
<
nCols
;
++
j
)
{
SColIdx
*
pColIdx
=
kvRowColIdxAt
(
p
,
j
);
char
*
val
=
(
char
*
)(
kvRowColVal
(
p
,
pColIdx
));
if
(
j
==
0
){
if
(
*
val
==
TSDB_DATA_TYPE_NULL
)
{
SColIdx
*
pColIdx
=
kvRowColIdxAt
(
p
,
j
);
char
*
val
=
(
char
*
)(
kvRowColVal
(
p
,
pColIdx
));
if
(
j
==
0
)
{
if
(
*
val
==
TSDB_DATA_TYPE_NULL
)
{
string
=
taosMemoryCalloc
(
1
,
8
);
sprintf
(
varDataVal
(
string
),
"%s"
,
TSDB_DATA_NULL_STR_L
);
varDataSetLen
(
string
,
strlen
(
varDataVal
(
string
)));
...
...
@@ -834,19 +833,18 @@ static char* parseTagDatatoJson(void *p){
// json value
val
+=
varDataTLen
(
val
);
char
*
realData
=
POINTER_SHIFT
(
val
,
CHAR_BYTES
);
char
type
=
*
val
;
if
(
type
==
TSDB_DATA_TYPE_NULL
)
{
char
type
=
*
val
;
if
(
type
==
TSDB_DATA_TYPE_NULL
)
{
cJSON
*
value
=
cJSON_CreateNull
();
if
(
value
==
NULL
)
{
if
(
value
==
NULL
)
{
goto
end
;
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
}
else
if
(
type
==
TSDB_DATA_TYPE_NCHAR
)
{
}
else
if
(
type
==
TSDB_DATA_TYPE_NCHAR
)
{
cJSON
*
value
=
NULL
;
if
(
varDataLen
(
realData
)
>
0
){
char
*
tagJsonValue
=
taosMemoryCalloc
(
varDataLen
(
realData
),
1
);
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
realData
),
varDataLen
(
realData
),
tagJsonValue
);
if
(
varDataLen
(
realData
)
>
0
)
{
char
*
tagJsonValue
=
taosMemoryCalloc
(
varDataLen
(
realData
),
1
);
int32_t
length
=
taosUcs4ToMbs
((
TdUcs4
*
)
varDataVal
(
realData
),
varDataLen
(
realData
),
tagJsonValue
);
if
(
length
<
0
)
{
tscError
(
"charset:%s to %s. val:%s convert json value failed."
,
DEFAULT_UNICODE_ENCODEC
,
tsCharset
,
val
);
taosMemoryFree
(
tagJsonValue
);
...
...
@@ -854,45 +852,41 @@ static char* parseTagDatatoJson(void *p){
}
value
=
cJSON_CreateString
(
tagJsonValue
);
taosMemoryFree
(
tagJsonValue
);
if
(
value
==
NULL
)
{
if
(
value
==
NULL
)
{
goto
end
;
}
}
else
if
(
varDataLen
(
realData
)
==
0
)
{
}
else
if
(
varDataLen
(
realData
)
==
0
)
{
value
=
cJSON_CreateString
(
""
);
}
else
{
}
else
{
ASSERT
(
0
);
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
}
else
if
(
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
jsonVd
=
*
(
double
*
)(
realData
);
cJSON
*
value
=
cJSON_CreateNumber
(
jsonVd
);
if
(
value
==
NULL
)
{
if
(
value
==
NULL
)
{
goto
end
;
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}
else
if
(
type
==
TSDB_DATA_TYPE_BOOL
)
{
char
jsonVd
=
*
(
char
*
)(
realData
);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
}
else
if
(
type
==
TSDB_DATA_TYPE_BOOL
)
{
char
jsonVd
=
*
(
char
*
)(
realData
);
cJSON
*
value
=
cJSON_CreateBool
(
jsonVd
);
if
(
value
==
NULL
)
{
if
(
value
==
NULL
)
{
goto
end
;
}
cJSON_AddItemToObject
(
json
,
tagJsonKey
,
value
);
}
else
{
}
else
{
ASSERT
(
0
);
}
}
string
=
cJSON_PrintUnformatted
(
json
);
end:
...
...
@@ -930,7 +924,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
pResultInfo
->
pCol
[
i
].
pData
=
pResultInfo
->
convertBuf
[
i
];
pResultInfo
->
row
[
i
]
=
pResultInfo
->
pCol
[
i
].
pData
;
}
else
if
(
type
==
TSDB_DATA_TYPE_JSON
&&
colLength
[
i
]
>
0
)
{
}
else
if
(
type
==
TSDB_DATA_TYPE_JSON
&&
colLength
[
i
]
>
0
)
{
char
*
p
=
taosMemoryRealloc
(
pResultInfo
->
convertBuf
[
i
],
colLength
[
i
]);
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -943,7 +937,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if
(
pCol
->
offset
[
j
]
!=
-
1
)
{
char
*
pStart
=
pCol
->
offset
[
j
]
+
pCol
->
pData
;
int32_t
jsonInnerType
=
*
pStart
;
char
*
jsonInnerData
=
pStart
+
CHAR_BYTES
;
char
dst
[
TSDB_MAX_JSON_TAG_LEN
]
=
{
0
};
...
...
@@ -951,7 +944,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
sprintf
(
varDataVal
(
dst
),
"%s"
,
TSDB_DATA_NULL_STR_L
);
varDataSetLen
(
dst
,
strlen
(
varDataVal
(
dst
)));
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_JSON
)
{
char
*
jsonString
=
parseTagDatatoJson
(
jsonInnerData
);
char
*
jsonString
=
parseTagDatatoJson
(
jsonInnerData
);
STR_TO_VARSTR
(
dst
,
jsonString
);
taosMemoryFree
(
jsonString
);
}
else
if
(
jsonInnerType
==
TSDB_DATA_TYPE_NCHAR
)
{
// value -> "value"
...
...
source/client/test/clientTests.cpp
浏览文件 @
d8479d7a
...
...
@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result
(
pRes
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
...
@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
printf
(
"start to insert next table
\n
"
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
...
@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
taos_close
(
pConn
);
}
#endif
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
d8479d7a
...
...
@@ -126,7 +126,9 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
mmPutRpcMsgToWorker
(
&
pMgmt
->
readWorker
,
pMsg
);
}
int32_t
mmPutRpcMsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPutRpcMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
}
int32_t
mmPutRpcMsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPutRpcMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
}
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
)
{
SSingleWorkerCfg
qCfg
=
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
d8479d7a
...
...
@@ -70,11 +70,10 @@ static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
vmProcessFetchQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
...
...
@@ -85,16 +84,15 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
static
void
vmProcessWriteQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
if
(
pArray
==
NULL
)
{
dError
(
"failed to process %d msgs in write-queue since %s"
,
numOfMsgs
,
terrstr
());
return
;
...
...
@@ -216,16 +214,15 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
dTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
static
int32_t
vmPutNodeMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
,
EQueueType
qtype
)
{
SRpcMsg
*
pRpc
=
pMsg
;
SRpcMsg
*
pRpc
=
pMsg
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
int32_t
code
=
0
;
...
...
@@ -304,7 +301,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
static
int32_t
vmPutRpcMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pRpc
,
EQueueType
qtype
)
{
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
d8479d7a
...
...
@@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
);
int32_t
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlockFilterOut
(
STqReadHandle
*
pHandle
,
SHashObj
*
filterOutUids
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d8479d7a
...
...
@@ -128,7 +128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
return
0
;
}
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
int32_t
tqPushMsgNew
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
,
SRpcHandleInfo
handleInfo
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
void
*
pIter
=
NULL
;
STqExec
*
pExec
=
NULL
;
...
...
@@ -238,10 +238,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
SRpcMsg
resp
=
{.
info
=
handleInfo
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
};
tmsgSendRsp
(
&
resp
);
atomic_store_ptr
(
&
pExec
->
pushHandle
.
handle
,
NULL
);
taosWUnLockLatch
(
&
pExec
->
pushHandle
.
lock
);
...
...
@@ -410,9 +409,9 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
pTopic
->
buffer
.
output
[
j
].
status
=
0
;
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
pTopic
->
buffer
.
output
[
j
].
pReadHandle
=
pReadHandle
;
pTopic
->
buffer
.
output
[
j
].
task
=
qCreateStreamExecTaskInfo
(
pTopic
->
qmsg
,
&
handle
);
...
...
@@ -666,10 +665,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
SRpcMsg
resp
=
{.
info
=
pMsg
->
info
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
};
tmsgSendRsp
(
&
resp
);
tqDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerId
,
pReq
->
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
...
...
@@ -848,12 +846,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = msgLen;
pMsg->code = 0;
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(
pMsg
);
tmsgSendRsp(
&resp
);
taosMemoryFree(pHead);
return 0;
} else {
...
...
@@ -881,10 +877,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
...
...
@@ -1005,10 +1000,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
STqReadHandle
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
SReadHandle
handle
=
{
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
.
reader
=
pStreamReader
,
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
};
pTask
->
exec
.
runners
[
i
].
inputHandle
=
pStreamReader
;
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
d8479d7a
...
...
@@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
return
0
;
}
int
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
ASSERT
(
pHandle
->
tbIdHash
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashRemove
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
));
}
return
0
;
}
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
d8479d7a
...
...
@@ -2076,8 +2076,14 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
#endif
if
(
TD_SUPPORT_UPDATE
(
pCfg
->
update
))
{
if
(
lastKeyAppend
!=
key
)
{
if
(
lastKeyAppend
!=
TSKEY_INITIAL_VAL
)
{
++
curRow
;
}
lastKeyAppend
=
key
;
}
// load data from file firstly
numOfRows
=
doCopyRowsFromFileBlock
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
curRow
,
pos
,
pos
);
lastKeyAppend
=
key
;
if
(
rv1
!=
TD_ROW_SVER
(
row1
))
{
rv1
=
TD_ROW_SVER
(
row1
);
...
...
@@ -2087,7 +2093,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
// still assign data into current row
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
numOfRows
+=
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
lastKeyAppend
);
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
...
...
@@ -2099,7 +2105,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur
->
mixBlock
=
true
;
moveToNextRowInMem
(
pCheckInfo
);
++
curRow
;
pos
+=
step
;
}
else
{
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d8479d7a
...
...
@@ -199,8 +199,12 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
case TDMT_VND_STREAM_TRIGGER:{
// refactor, avoid double free
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
pMsg->pCont = NULL;
return code;
}
#endif
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
d8479d7a
...
...
@@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
pTaskInfo
;
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamBlockScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
taosArrayPush
(
qa
,
id
);
}
metaReaderClear
(
&
mr
);
return
qa
;
}
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
...
...
@@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo
=
pInfo
->
pDownstream
[
0
];
}
int32_t
code
=
0
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
taosArrayPush
(
qa
,
id
);
}
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
taosArrayDestroy
(
qa
);
metaReaderClear
(
&
mr
);
}
else
{
// remove the table id in current list
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
int32_t
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
else
{
assert
(
0
);
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
tableIdList
);
taosArrayDestroy
(
qa
);
}
return
TSDB_CODE_SUCCESS
;
return
code
;
}
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
d8479d7a
...
...
@@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
pAggInfo
->
groupId
=
groupId
;
}
/**
* For interval query of both super table and table, copy the data in ascending order, since the output results are
* ordered in SWindowResutl already. While handling the group by query for both table and super table,
* all group result are completed already.
*
* @param pQInfo
* @param result
*/
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
...
...
@@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
continue
;
}
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pPos
->
groupId
;
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pPos
->
groupId
)
{
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
break
;
}
...
...
@@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
taskInfo
),
tstrerror
(
code
));
taskInfo
->
code
=
code
;
longjmp
(
taskInfo
->
env
,
code
);
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
...
...
@@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
}
}
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)
);
qDebug
(
"%s result generated, rows:%d, groupId:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pBlock
->
info
.
rows
,
pBlock
->
info
.
groupId
);
blockDataUpdateTsWindow
(
pBlock
);
return
0
;
}
...
...
@@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
return
;
}
// clear the existed group id
pBlock
->
info
.
groupId
=
0
;
doCopyToSDataBlock
(
pTaskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
rowCellOffset
,
pCtx
,
numOfExprs
);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow
(
pBlock
);
}
static
void
updateNumOfRowsInResultRows
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SResultRowInfo
*
pResultRowInfo
,
...
...
@@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted
(
pOperator
);
}
doSetOperatorCompleted
(
pOperator
);
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
d8479d7a
...
...
@@ -41,6 +41,13 @@
sToken = tStrGetToken(pSql, &index, false); \
} while (0)
#define NEXT_VALID_TOKEN(pSql, sToken) \
do { \
sToken.n = tGetToken(pSql, &sToken.type); \
sToken.z = pSql; \
pSql += sToken.n; \
} while (TK_NK_SPACE == sToken.type)
typedef
struct
SInsertParseContext
{
SParseContext
*
pComCxt
;
// input
char
*
pSql
;
// input
...
...
@@ -482,9 +489,11 @@ static int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid bool data"
,
pToken
->
z
);
}
}
else
if
(
pToken
->
type
==
TK_NK_INTEGER
)
{
return
func
(
pMsgBuf
,
((
taosStr2Int64
(
pToken
->
z
,
NULL
,
10
)
==
0
)
?
&
FALSE_VALUE
:
&
TRUE_VALUE
),
pSchema
->
bytes
,
param
);
return
func
(
pMsgBuf
,
((
taosStr2Int64
(
pToken
->
z
,
NULL
,
10
)
==
0
)
?
&
FALSE_VALUE
:
&
TRUE_VALUE
),
pSchema
->
bytes
,
param
);
}
else
if
(
pToken
->
type
==
TK_NK_FLOAT
)
{
return
func
(
pMsgBuf
,
((
taosStr2Double
(
pToken
->
z
,
NULL
)
==
0
)
?
&
FALSE_VALUE
:
&
TRUE_VALUE
),
pSchema
->
bytes
,
param
);
return
func
(
pMsgBuf
,
((
taosStr2Double
(
pToken
->
z
,
NULL
)
==
0
)
?
&
FALSE_VALUE
:
&
TRUE_VALUE
),
pSchema
->
bytes
,
param
);
}
else
{
return
buildSyntaxErrMsg
(
pMsgBuf
,
"invalid bool data"
,
pToken
->
z
);
}
...
...
@@ -685,7 +694,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
isOrdered
=
false
;
}
if
(
index
<
0
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"invalid column/tag name"
,
sToken
.
z
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msg
,
TSDB_CODE_PAR_INVALID_COLUMN
,
sToken
.
z
);
}
if
(
pColList
->
cols
[
index
].
valStat
==
VAL_STAT_HAS
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"duplicated column name"
,
sToken
.
z
);
...
...
@@ -895,8 +904,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
"( is expected"
,
sToken
.
z
);
}
CHECK_CODE
(
parseTagsClause
(
pCxt
,
pCxt
->
pTableMeta
->
schema
,
getTableInfo
(
pCxt
->
pTableMeta
).
precision
,
name
->
tname
));
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
if
(
TK_NK_RP
!=
sToken
.
type
)
{
NEXT_VALID_TOKEN
(
pCxt
->
pSql
,
sToken
);
if
(
TK_NK_COMMA
==
sToken
.
type
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msg
,
TSDB_CODE_PAR_TAGS_NOT_MATCHED
);
}
else
if
(
TK_NK_RP
!=
sToken
.
type
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
") is expected"
,
sToken
.
z
);
}
...
...
@@ -996,8 +1007,10 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
pDataBlock
->
size
+=
extendedRowSize
;
// len;
}
NEXT_TOKEN
(
pCxt
->
pSql
,
sToken
);
if
(
TK_NK_RP
!=
sToken
.
type
)
{
NEXT_VALID_TOKEN
(
pCxt
->
pSql
,
sToken
);
if
(
TK_NK_COMMA
==
sToken
.
type
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msg
,
TSDB_CODE_PAR_INVALID_COLUMNS_NUM
);
}
else
if
(
TK_NK_RP
!=
sToken
.
type
)
{
return
buildSyntaxErrMsg
(
&
pCxt
->
msg
,
") expected"
,
sToken
.
z
);
}
...
...
@@ -1057,10 +1070,10 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
static
int32_t
parseInsertBody
(
SInsertParseContext
*
pCxt
)
{
int32_t
tbNum
=
0
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
bool
autoCreateTbl
=
false
;
STableMeta
*
pMeta
=
NULL
;
int32_t
tbNum
=
0
;
char
tbFName
[
TSDB_TABLE_FNAME_LEN
];
bool
autoCreateTbl
=
false
;
STableMeta
*
pMeta
=
NULL
;
// for each table
while
(
1
)
{
...
...
@@ -1121,7 +1134,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
&
dataBuf
,
NULL
,
&
pCxt
->
createTblReq
));
pMeta
=
pCxt
->
pTableMeta
;
pCxt
->
pTableMeta
=
NULL
;
if
(
TK_NK_LP
==
sToken
.
type
)
{
// pSql -> field1_name, ...)
CHECK_CODE
(
parseBoundColumns
(
pCxt
,
&
dataBuf
->
boundColumnInfo
,
getTableColumnSchema
(
pMeta
)));
...
...
@@ -1160,7 +1173,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
memcpy
(
tags
,
&
pCxt
->
tags
,
sizeof
(
pCxt
->
tags
));
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
(
*
pCxt
->
pStmtCb
->
setInfoFn
)(
pCxt
->
pStmtCb
->
pStmt
,
pMeta
,
tags
,
tbFName
,
autoCreateTbl
,
pCxt
->
pVgroupsHashObj
,
pCxt
->
pTableBlockHashObj
);
memset
(
&
pCxt
->
tags
,
0
,
sizeof
(
pCxt
->
tags
));
pCxt
->
pVgroupsHashObj
=
NULL
;
...
...
@@ -1231,14 +1245,14 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
context
.
pOutput
->
payloadType
=
PAYLOAD_TYPE_KV
;
int32_t
code
=
skipInsertInto
(
&
context
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
parseInsertBody
(
&
context
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
||
NEED_CLIENT_HANDLE_ERROR
(
code
)
)
{
SName
*
pTable
=
taosHashIterate
(
context
.
pTableNameHashObj
,
NULL
);
while
(
NULL
!=
pTable
)
{
taosArrayPush
((
*
pQuery
)
->
pTableList
,
pTable
);
...
...
@@ -1579,9 +1593,9 @@ typedef struct SmlExecTableHandle {
}
SmlExecTableHandle
;
typedef
struct
SmlExecHandle
{
SHashObj
*
pBlockHash
;
SmlExecTableHandle
tableExecHandle
;
SQuery
*
pQuery
;
SHashObj
*
pBlockHash
;
SmlExecTableHandle
tableExecHandle
;
SQuery
*
pQuery
;
}
SSmlExecHandle
;
static
void
smlDestroyTableHandle
(
void
*
pHandle
)
{
...
...
@@ -1673,9 +1687,9 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
i
]
-
1
];
// colId starts with 1
param
.
schema
=
pTagSchema
;
SSmlKv
*
kv
=
taosArrayGetP
(
cols
,
i
);
if
(
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
KvRowAppend
(
msg
,
kv
->
value
,
kv
->
length
,
&
param
);
}
else
{
}
else
{
KvRowAppend
(
msg
,
&
(
kv
->
value
),
kv
->
length
,
&
param
);
}
}
...
...
@@ -1688,13 +1702,13 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
return
TSDB_CODE_SUCCESS
;
}
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
int32_t
smlBindData
(
void
*
handle
,
SArray
*
tags
,
SArray
*
colsSchema
,
SArray
*
cols
,
bool
format
,
STableMeta
*
pTableMeta
,
char
*
tableName
,
char
*
msgBuf
,
int16_t
msgBufLen
)
{
SMsgBuf
pBuf
=
{.
buf
=
msgBuf
,
.
len
=
msgBufLen
};
SSmlExecHandle
*
smlHandle
=
(
SSmlExecHandle
*
)
handle
;
smlDestroyTableHandle
(
&
smlHandle
->
tableExecHandle
);
// free for each table
SSchema
*
pTagsSchema
=
getTableTagSchema
(
pTableMeta
);
smlDestroyTableHandle
(
&
smlHandle
->
tableExecHandle
);
// free for each table
SSchema
*
pTagsSchema
=
getTableTagSchema
(
pTableMeta
);
setBoundColumnInfo
(
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
getNumOfTags
(
pTableMeta
));
int
ret
=
smlBoundColumnData
(
tags
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1702,7 +1716,8 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
return
ret
;
}
SKVRow
row
=
NULL
;
ret
=
smlBuildTagRow
(
tags
,
&
smlHandle
->
tableExecHandle
.
tagsBuilder
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
&
row
,
&
pBuf
);
ret
=
smlBuildTagRow
(
tags
,
&
smlHandle
->
tableExecHandle
.
tagsBuilder
,
&
smlHandle
->
tableExecHandle
.
tags
,
pTagsSchema
,
&
row
,
&
pBuf
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
...
...
@@ -1733,7 +1748,7 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
initRowBuilder
(
&
pDataBlock
->
rowBuilder
,
pDataBlock
->
pTableMeta
->
sversion
,
&
pDataBlock
->
boundColumnInfo
);
int32_t
rowNum
=
taosArrayGetSize
(
cols
);
if
(
rowNum
<=
0
)
{
if
(
rowNum
<=
0
)
{
return
buildInvalidOperationMsg
(
&
pBuf
,
"cols size <= 0"
);
}
ret
=
allocateMemForSize
(
pDataBlock
,
extendedRowSize
*
rowNum
);
...
...
@@ -1744,9 +1759,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
for
(
int32_t
r
=
0
;
r
<
rowNum
;
++
r
)
{
STSRow
*
row
=
(
STSRow
*
)(
pDataBlock
->
pData
+
pDataBlock
->
size
);
// skip the SSubmitBlk header
tdSRowResetBuf
(
pBuilder
,
row
);
void
*
rowData
=
taosArrayGetP
(
cols
,
r
);
void
*
rowData
=
taosArrayGetP
(
cols
,
r
);
size_t
rowDataSize
=
0
;
if
(
format
)
{
if
(
format
)
{
rowDataSize
=
taosArrayGetSize
(
rowData
);
}
...
...
@@ -1781,9 +1796,9 @@ int32_t smlBindData(void *handle, SArray *tags, SArray *colsSchema, SArray *cols
kv
->
i
=
convertTimePrecision
(
kv
->
i
,
TSDB_TIME_PRECISION_NANO
,
pTableMeta
->
tableInfo
.
precision
);
}
if
(
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
if
(
IS_VAR_DATA_TYPE
(
kv
->
type
))
{
MemRowAppend
(
&
pBuf
,
kv
->
value
,
colLen
,
&
param
);
}
else
{
}
else
{
MemRowAppend
(
&
pBuf
,
&
(
kv
->
value
),
colLen
,
&
param
);
}
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
d8479d7a
...
...
@@ -120,6 +120,20 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
return
getTableMetaImpl
(
pCxt
,
toName
(
pCxt
->
pParseCxt
->
acctId
,
pDbName
,
pTableName
,
&
name
),
pMeta
);
}
static
int32_t
refreshGetTableMeta
(
STranslateContext
*
pCxt
,
const
char
*
pDbName
,
const
char
*
pTableName
,
STableMeta
**
pMeta
)
{
SParseContext
*
pParCxt
=
pCxt
->
pParseCxt
;
SName
name
;
toName
(
pCxt
->
pParseCxt
->
acctId
,
pDbName
,
pTableName
,
&
name
);
int32_t
code
=
catalogRefreshGetTableMeta
(
pParCxt
->
pCatalog
,
pParCxt
->
pTransporter
,
&
pParCxt
->
mgmtEpSet
,
&
name
,
pMeta
,
false
);
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
parserError
(
"catalogRefreshGetTableMeta error, code:%s, dbName:%s, tbName:%s"
,
tstrerror
(
code
),
pDbName
,
pTableName
);
}
return
code
;
}
static
int32_t
getTableDistVgInfo
(
STranslateContext
*
pCxt
,
const
SName
*
pName
,
SArray
**
pVgInfo
)
{
SParseContext
*
pParCxt
=
pCxt
->
pParseCxt
;
int32_t
code
=
collectUseDatabase
(
pName
,
pCxt
->
pDbs
);
...
...
@@ -3201,7 +3215,7 @@ static int32_t translateExplain(STranslateContext* pCxt, SExplainStmt* pStmt) {
}
static
int32_t
translateDescribe
(
STranslateContext
*
pCxt
,
SDescribeStmt
*
pStmt
)
{
return
g
etTableMeta
(
pCxt
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
pStmt
->
pMeta
);
return
refreshG
etTableMeta
(
pCxt
,
pStmt
->
dbName
,
pStmt
->
tableName
,
&
pStmt
->
pMeta
);
}
static
int32_t
translateKillConnection
(
STranslateContext
*
pCxt
,
SKillStmt
*
pStmt
)
{
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
d8479d7a
...
...
@@ -47,7 +47,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
SQueryTableRsp
rsp
=
{.
code
=
code
};
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -85,7 +85,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
SExplainRsp
rsp
=
{.
numOfPlans
=
num
,
.
subplanInfo
=
execInfo
};
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSExplainRsp
(
pRsp
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -104,7 +104,7 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
int32_t
qwBuildAndSendHbRsp
(
SRpcHandleInfo
*
pConn
,
SSchedulerHbRsp
*
pStatus
,
int32_t
code
)
{
int32_t
contLen
=
tSerializeSSchedulerHbRsp
(
NULL
,
0
,
pStatus
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSSchedulerHbRsp
(
pRsp
,
contLen
,
pStatus
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -212,7 +212,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
showRsp
.
tableMeta
.
numOfColumns
=
cols
;
int32_t
bufLen
=
tSerializeSShowRsp
(
NULL
,
0
,
&
showRsp
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
tSerializeSShowRsp
(
pBuf
,
bufLen
,
&
showRsp
);
SRpcMsg
rpcMsg
=
{
...
...
@@ -341,7 +341,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<=
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid query msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -361,7 +361,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t
rId
=
msg
->
refId
;
SQWMsg
qwMsg
=
{.
node
=
node
,
.
msg
=
msg
->
msg
+
msg
->
sqlLen
,
.
msgLen
=
msg
->
phyLen
,
.
connInfo
=
pMsg
->
info
};
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
char
*
sql
=
strndup
(
msg
->
msg
,
msg
->
sqlLen
);
QW_SCH_TASK_DLOG
(
"processQuery start, node:%p, handle:%p, sql:%s"
,
node
,
pMsg
->
info
.
handle
,
sql
);
taosMemoryFreeClear
(
sql
);
...
...
@@ -378,8 +378,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -407,7 +407,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SResReadyReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task ready msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -467,7 +467,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
SResFetchReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid fetch msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -505,7 +505,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
...
...
@@ -542,7 +542,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
STaskDropReq
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid task drop msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -581,7 +581,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSchedulerHbReq
req
=
{
0
};
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
if
(
NULL
==
pMsg
->
pCont
)
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
...
...
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
d8479d7a
...
...
@@ -14,6 +14,9 @@
*/
#include "tdbInt.h"
// #include <sys/types.h>
// #include <unistd.h>
struct
SPCache
{
int
szPage
;
int
nPages
;
...
...
@@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
uint32_t
*
t
=
(
uint32_t
*
)((
pPgid
)
->
fileid
);
return
(
uint32_t
)(
t
[
0
]
+
t
[
1
]
+
t
[
2
]
+
t
[
3
]
+
t
[
4
]
+
t
[
5
]
+
(
pPgid
)
->
pgno
);
}
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static
int
tdbPCacheOpenImpl
(
SPCache
*
pCache
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
...
...
@@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) {
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
i32
nRef
;
tdbPCacheLock
(
pCache
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
pTxn
);
if
(
pPage
)
{
tdbRefPage
(
pPage
);
nRef
=
tdbRefPage
(
pPage
);
}
ASSERT
(
pPage
);
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
return
pPage
;
}
...
...
@@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
ASSERT
(
pTxn
);
nRef
=
tdbUnrefPage
(
pPage
);
ASSERT
(
nRef
>=
0
);
//
nRef = tdbUnrefPage(pPage);
//
ASSERT(nRef >= 0);
tdbPCacheLock
(
pCache
);
nRef
=
tdbUnrefPage
(
pPage
);
if
(
nRef
==
0
)
{
tdbPCacheLock
(
pCache
);
// test the nRef again to make sure
// it is safe th handle the page
nRef
=
tdbGetPageRef
(
pPage
);
if
(
nRef
==
0
)
{
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
}
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
// nRef = tdbGetPageRef(pPage);
// if (nRef == 0) {
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
}
}
tdbPCacheUnlock
(
pCache
);
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
}
// }
}
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
szPage
;
}
...
...
@@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
--
;
// printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"pin page %d"
,
pPage
->
id
);
}
}
...
...
@@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
++
;
// printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"unpin page %d"
,
pPage
->
id
);
}
...
...
@@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
h
=
tdbPCachePageHash
(
&
(
pPage
->
pgid
));
for
(
ppPage
=
&
(
pCache
->
pgHash
[
h
%
pCache
->
nHash
]);
(
*
ppPage
)
&&
*
ppPage
!=
pPage
;
ppPage
=
&
((
*
ppPage
)
->
pHashNext
))
;
ASSERT
(
*
ppPage
==
pPage
);
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
if
(
*
ppPage
)
{
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
}
tdbTrace
(
"remove page %d to hash"
,
pPage
->
id
);
}
...
...
@@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
pCache
->
nPage
++
;
// printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"add page %d to hash"
,
pPage
->
id
);
}
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
d8479d7a
...
...
@@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
pgid
.
pgno
=
pgno
;
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
if
(
pPage
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
if
(
!
TDB_PAGE_INITIALIZED
(
pPage
))
{
ret
=
tdbPagerInitPage
(
pPager
,
pPage
,
initPage
,
arg
,
loadPage
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
}
// printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
ASSERT
(
TDB_PAGE_INITIALIZED
(
pPage
));
ASSERT
(
pPage
->
pPager
==
pPager
);
...
...
@@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
return
0
;
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
// printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
}
static
int
tdbPagerAllocFreePage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
// TODO: Allocate a page from the free list
...
...
@@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
ret
=
(
*
initPage
)(
pPage
,
arg
,
init
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
TDB_UNLOCK_PAGE
(
pPage
);
return
-
1
;
}
...
...
@@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
}
}
}
else
{
ASSERT
(
0
);
return
-
1
;
}
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
d8479d7a
...
...
@@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) {
#define P_LOCK_FAIL -1
static
inline
int
tdbTryLockPage
(
tdb_spinlock_t
*
pLock
)
{
int
ret
;
if
(
tdbSpinlockTrylock
(
pLock
)
==
0
)
{
ret
=
P_LOCK_SUCC
;
}
else
if
(
errno
==
EBUSY
)
{
ret
=
P_LOCK_BUSY
;
int
ret
=
tdbSpinlockTrylock
(
pLock
)
;
if
(
ret
==
0
)
{
ret
urn
P_LOCK_SUCC
;
}
else
if
(
ret
==
EBUSY
)
{
ret
urn
P_LOCK_BUSY
;
}
else
{
ret
=
P_LOCK_FAIL
;
ASSERT
(
0
);
return
P_LOCK_FAIL
;
}
return
ret
;
}
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
...
...
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
d8479d7a
...
...
@@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
tdbClose
(
pEnv
);
}
TEST
(
tdb_test
,
DISABLED_
multi_thread_query
)
{
TEST
(
tdb_test
,
multi_thread_query
)
{
int
ret
;
TDB
*
pEnv
;
TTB
*
pDb
;
tdb_cmpr_fn_t
compFunc
;
int
nData
=
100000
;
int
nData
=
100000
0
;
TXN
txn
;
taosRemoveDir
(
"tdb"
);
// Open Env
ret
=
tdbOpen
(
"tdb"
,
512
,
1
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
10
,
&
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
...
...
@@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
char
key
[
64
];
char
val
[
64
];
int64_t
poolLimit
=
4096
;
// 1M pool limit
int64_t
poolLimit
=
4096
*
20
;
// 1M pool limit
int64_t
txnid
=
0
;
SPoolMem
*
pPool
;
...
...
@@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
GTEST_ASSERT_EQ
(
ret
,
0
);
}
TEST
(
tdb_test
,
multi_thread1
)
{
TEST
(
tdb_test
,
DISABLED_
multi_thread1
)
{
#if 0
int ret;
TDB *pDb;
...
...
source/libs/transport/src/trans.c
浏览文件 @
d8479d7a
...
...
@@ -94,7 +94,9 @@ void rpcFreeCont(void* cont) {
if
(
cont
==
NULL
)
{
return
;
}
taosMemoryFree
((
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
tTrace
(
"free mem: %p"
,
(
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
if
(
ptr
==
NULL
)
{
...
...
source/libs/transport/src/transComm.c
浏览文件 @
d8479d7a
...
...
@@ -133,6 +133,7 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
}
else
{
p
->
cap
=
p
->
total
;
p
->
buf
=
taosMemoryRealloc
(
p
->
buf
,
p
->
cap
);
tTrace
(
"internal malloc mem: %p, size: %d"
,
p
->
buf
,
p
->
cap
);
uvBuf
->
base
=
p
->
buf
+
p
->
len
;
uvBuf
->
len
=
p
->
cap
-
p
->
len
;
...
...
source/libs/transport/src/transSrv.c
浏览文件 @
d8479d7a
...
...
@@ -469,6 +469,8 @@ static void uvStartSendResp(SSrvMsg* smsg) {
if
(
pConn
->
broken
==
true
)
{
// persist by
transFreeMsg
(
smsg
->
msg
.
pCont
);
taosMemoryFree
(
smsg
);
transUnrefSrvHandle
(
pConn
);
return
;
}
...
...
tests/script/tsim/insert/update0.sim
0 → 100644
浏览文件 @
d8479d7a
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0 keep 365000d,365000d,365000d
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int) tags (city binary(20),district binary(20)) rollup(min) file_factor 0.1 delay 2;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang")
sql create table ct2 using stb tags("BeiJing", "HaiDian")
sql show tables
if $rows != 2 then
return -1
endi
print =============== step3-1 insert records into ct1
sql insert into ct1 values('2022-05-03 16:59:00.010', 10);
sql insert into ct1 values('2022-05-03 16:59:00.011', 11);
sql insert into ct1 values('2022-05-03 16:59:00.016', 16);
sql insert into ct1 values('2022-05-03 16:59:00.016', 17);
sql insert into ct1 values('2022-05-03 16:59:00.020', 20);
sql insert into ct1 values('2022-05-03 16:59:00.016', 18);
sql insert into ct1 values('2022-05-03 16:59:00.021', 21);
sql insert into ct1 values('2022-05-03 16:59:00.022', 22);
print =============== step3-1 query records of ct1 from memory
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data21 != 18 then
print data21 $data21 != 18
return -1
endi
if $data51 != 22 then
print data51 $data51 != 22
return -1
endi
print =============== step3-1 insert records into ct2
sql insert into ct2 values('2022-03-02 16:59:00.010', 1),('2022-03-02 16:59:00.010',11),('2022-04-01 16:59:00.011',2),('2022-04-01 16:59:00.011',5),('2022-03-06 16:59:00.013',7);
sql insert into ct2 values('2022-03-02 16:59:00.010', 3),('2022-03-02 16:59:00.010',33),('2022-04-01 16:59:00.011',4),('2022-04-01 16:59:00.011',6),('2022-03-06 16:59:00.013',8);
sql insert into ct2 values('2022-03-02 16:59:00.010', 103),('2022-03-02 16:59:00.010',303),('2022-04-01 16:59:00.011',40),('2022-04-01 16:59:00.011',60),('2022-03-06 16:59:00.013',80);
print =============== step3-1 query records of ct2 from memory
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 3 then
print rows $rows != 3
return -1
endi
if $data01 != 103 then
print data01 $data01 != 103
return -1
endi
if $data11 != 80 then
print data11 $data11 != 80
return -1
endi
if $data21 != 40 then
print data21 $data21 != 40
return -1
endi
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== step3-2 query records of ct1 from file
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data21 != 18 then
print data21 $data21 != 18
return -1
endi
if $data51 != 22 then
print data51 $data51 != 22
return -1
endi
print =============== step3-2 query records of ct2 from file
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 3 then
print rows $rows != 3
return -1
endi
if $data01 != 103 then
print data01 $data01 != 103
return -1
endi
if $data11 != 80 then
print data11 $data11 != 80
return -1
endi
if $data21 != 40 then
print data21 $data21 != 40
return -1
endi
print =============== step3-3 query records of ct1 from memory and file(merge)
sql insert into ct1 values('2022-05-03 16:59:00.010', 100);
sql insert into ct1 values('2022-05-03 16:59:00.022', 200);
sql insert into ct1 values('2022-05-03 16:59:00.016', 160);
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 100 then
print data01 $data01 != 100
return -1
endi
if $data21 != 160 then
print data21 $data21 != 160
return -1
endi
if $data51 != 200 then
print data51 $data51 != 200
return -1
endi
print =============== step3-3 query records of ct2 from memory and file(merge)
sql insert into ct2(ts) values('2022-04-02 16:59:00.016');
sql insert into ct2 values('2022-03-06 16:59:00.013', NULL);
sql insert into ct2 values('2022-03-01 16:59:00.016', 10);
sql insert into ct2(ts) values('2022-04-01 16:59:00.011');
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
if $rows != 5 then
print rows $rows != 5
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data11 != 103 then
print data11 $data11 != 103
return -1
endi
if $data21 != NULL then
print data21 $data21 != NULL
return -1
endi
if $data31 != 40 then
print data31 $data31 != 40
return -1
endi
if $data41 != NULL then
print data41 $data41 != NULL
return -1
endi
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录