Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2bf5b040
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1191
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
2bf5b040
编写于
11月 03, 2022
作者:
D
dapan1121
提交者:
GitHub
11月 03, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17851 from taosdata/enh/TD-19956
enh: optimize submit response message
上级
695a99a6
f00d6933
变更
22
隐藏空白更改
内联
并排
Showing
22 changed file
with
362 addition
and
77 deletion
+362
-77
include/common/tmsg.h
include/common/tmsg.h
+0
-1
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+8
-6
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-1
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+1
-0
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+13
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+9
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+19
-3
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+29
-8
source/common/src/tmsg.c
source/common/src/tmsg.c
+5
-3
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+8
-0
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+12
-12
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+4
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+10
-12
source/libs/catalog/src/ctgAsync.c
source/libs/catalog/src/ctgAsync.c
+8
-4
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+41
-3
source/libs/catalog/src/ctgDbg.c
source/libs/catalog/src/ctgDbg.c
+97
-9
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+91
-10
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+1
-0
source/libs/command/src/command.c
source/libs/command/src/command.c
+2
-0
source/libs/scheduler/src/schRemote.c
source/libs/scheduler/src/schRemote.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
2bf5b040
...
@@ -297,7 +297,6 @@ typedef struct {
...
@@ -297,7 +297,6 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
code
;
int32_t
code
;
int8_t
hashMeta
;
int64_t
uid
;
int64_t
uid
;
char
*
tblFName
;
char
*
tblFName
;
int32_t
numOfRows
;
int32_t
numOfRows
;
...
...
include/libs/catalog/catalog.h
浏览文件 @
2bf5b040
...
@@ -203,13 +203,11 @@ int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
...
@@ -203,13 +203,11 @@ int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
int32_t
catalogUpdateTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
int32_t
catalogUpdateTableMeta
(
SCatalog
*
pCatalog
,
STableMetaRsp
*
rspMsg
);
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
int32_t
catalogGetCachedSTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
int32_t
catalogGetCachedSTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
);
STableMeta
**
pTableMeta
);
int32_t
catalogGetCachedTableHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
int32_t
catalogGetCachedTableHashVgroup
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
/**
/**
* Force refresh DB's local cached vgroup info.
* Force refresh DB's local cached vgroup info.
...
@@ -309,7 +307,7 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
...
@@ -309,7 +307,7 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
int32_t
catalogChkAuth
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
);
bool
*
pass
);
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
,
bool
*
exists
);
bool
*
pass
,
bool
*
exists
);
int32_t
catalogUpdateUserAuthInfo
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
);
int32_t
catalogUpdateUserAuthInfo
(
SCatalog
*
pCtg
,
SGetUserAuthRsp
*
pAuth
);
...
@@ -326,6 +324,10 @@ SMetaData* catalogCloneMetaData(SMetaData* pData);
...
@@ -326,6 +324,10 @@ SMetaData* catalogCloneMetaData(SMetaData* pData);
void
catalogFreeMetaData
(
SMetaData
*
pData
);
void
catalogFreeMetaData
(
SMetaData
*
pData
);
int32_t
ctgdEnableDebug
(
char
*
option
,
bool
enable
);
int32_t
ctgdHandleDbgCommand
(
char
*
command
);
/**
/**
* Destroy catalog and relase all resources
* Destroy catalog and relase all resources
*/
*/
...
...
include/libs/qcom/query.h
浏览文件 @
2bf5b040
...
@@ -248,7 +248,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
...
@@ -248,7 +248,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(_code) == TSDB_CODE_PAR_INVALID_DROP_COL || ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID))
(_code) == TSDB_CODE_PAR_INVALID_DROP_COL || ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID))
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_
TABLE_RECREATED
)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_
INVALID_TABLE_SCHEMA_VER
)
#define NEED_CLIENT_HANDLE_ERROR(_code) \
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
...
...
include/util/taoserror.h
浏览文件 @
2bf5b040
...
@@ -340,7 +340,7 @@ int32_t* taosGetErrno();
...
@@ -340,7 +340,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0618)
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_
TABLE_RECREATED
TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_
INVALID_TABLE_SCHEMA_VER
TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061C)
// query
// query
...
...
source/client/inc/clientInt.h
浏览文件 @
2bf5b040
...
@@ -318,6 +318,7 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_
...
@@ -318,6 +318,7 @@ void* createTscObj(const char* user, const char* auth, const char* db, int32_
void
destroyTscObj
(
void
*
pObj
);
void
destroyTscObj
(
void
*
pObj
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
int32_t
releaseTscObj
(
int64_t
rid
);
int32_t
releaseTscObj
(
int64_t
rid
);
void
destroyAppInst
(
SAppInstInfo
*
pAppInfo
);
uint64_t
generateRequestId
();
uint64_t
generateRequestId
();
...
...
source/client/inc/clientStmt.h
浏览文件 @
2bf5b040
...
@@ -39,6 +39,7 @@ typedef enum {
...
@@ -39,6 +39,7 @@ typedef enum {
STMT_BIND_COL
,
STMT_BIND_COL
,
STMT_ADD_BATCH
,
STMT_ADD_BATCH
,
STMT_EXECUTE
,
STMT_EXECUTE
,
STMT_MAX
,
}
STMT_STATUS
;
}
STMT_STATUS
;
typedef
struct
SStmtTableCache
{
typedef
struct
SStmtTableCache
{
...
@@ -94,12 +95,18 @@ typedef struct STscStmt {
...
@@ -94,12 +95,18 @@ typedef struct STscStmt {
STscObj
*
taos
;
STscObj
*
taos
;
SCatalog
*
pCatalog
;
SCatalog
*
pCatalog
;
int32_t
affectedRows
;
int32_t
affectedRows
;
uint32_t
seqId
;
uint32_t
seqIds
[
STMT_MAX
];
SStmtSQLInfo
sql
;
SStmtSQLInfo
sql
;
SStmtExecInfo
exec
;
SStmtExecInfo
exec
;
SStmtBindInfo
bInfo
;
SStmtBindInfo
bInfo
;
}
STscStmt
;
}
STscStmt
;
extern
char
*
gStmtStatusStr
[];
#define STMT_LOG_SEQ(n) do { (pStmt)->seqId++; (pStmt)->seqIds[n]++; STMT_DLOG("the %dth:%d %s", (pStmt)->seqIds[n], (pStmt)->seqId, gStmtStatusStr[n]); } while (0)
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
#define STMT_STATUS_NE(S) (pStmt->sql.status != STMT_##S)
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
#define STMT_STATUS_EQ(S) (pStmt->sql.status == STMT_##S)
...
@@ -128,6 +135,12 @@ typedef struct STscStmt {
...
@@ -128,6 +135,12 @@ typedef struct STscStmt {
} \
} \
} while (0)
} while (0)
#define STMT_ELOG(param, ...) qError("stmt:%p " param, pStmt, __VA_ARGS__)
#define STMT_DLOG(param, ...) qDebug("stmt:%p " param, pStmt, __VA_ARGS__)
#define STMT_ELOG_E(param) qError("stmt:%p " param, pStmt)
#define STMT_DLOG_E(param) qDebug("stmt:%p " param, pStmt)
TAOS_STMT
*
stmtInit
(
STscObj
*
taos
);
TAOS_STMT
*
stmtInit
(
STscObj
*
taos
);
int
stmtClose
(
TAOS_STMT
*
stmt
);
int
stmtClose
(
TAOS_STMT
*
stmt
);
int
stmtExec
(
TAOS_STMT
*
stmt
);
int
stmtExec
(
TAOS_STMT
*
stmt
);
...
...
source/client/src/clientImpl.c
浏览文件 @
2bf5b040
...
@@ -133,6 +133,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
...
@@ -133,6 +133,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
taosThreadMutexInit
(
&
p
->
qnodeMutex
,
NULL
);
taosThreadMutexInit
(
&
p
->
qnodeMutex
,
NULL
);
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
p
->
pTransporter
=
openTransporter
(
user
,
secretEncrypt
,
tsNumOfCores
);
p
->
pAppHbMgr
=
appHbMgrInit
(
p
,
key
);
p
->
pAppHbMgr
=
appHbMgrInit
(
p
,
key
);
if
(
NULL
==
p
->
pAppHbMgr
)
{
destroyAppInst
(
p
);
taosThreadMutexUnlock
(
&
appInfo
.
mutex
);
taosMemoryFreeClear
(
key
);
return
NULL
;
}
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
taosHashPut
(
appInfo
.
pInstMap
,
key
,
strlen
(
key
),
&
p
,
POINTER_BYTES
);
p
->
instKey
=
key
;
p
->
instKey
=
key
;
key
=
NULL
;
key
=
NULL
;
...
@@ -1266,7 +1272,9 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
...
@@ -1266,7 +1272,9 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
pMsgSendInfo
->
requestObjRefId
=
pRequest
->
self
;
pMsgSendInfo
->
requestObjRefId
=
pRequest
->
self
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
fp
=
getMsgRspHandle
(
pMsgSendInfo
->
msgType
);
pMsgSendInfo
->
fp
=
getMsgRspHandle
(
pMsgSendInfo
->
msgType
);
pMsgSendInfo
->
param
=
pRequest
;
pMsgSendInfo
->
param
=
taosMemoryCalloc
(
1
,
sizeof
(
pRequest
->
self
));
*
(
int64_t
*
)
pMsgSendInfo
->
param
=
pRequest
->
self
;
SConnectReq
connectReq
=
{
0
};
SConnectReq
connectReq
=
{
0
};
STscObj
*
pObj
=
pRequest
->
pTscObj
;
STscObj
*
pObj
=
pRequest
->
pTscObj
;
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
2bf5b040
...
@@ -45,8 +45,13 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -45,8 +45,13 @@ int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
}
}
int32_t
processConnectRsp
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
processConnectRsp
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
SRequestObj
*
pRequest
=
param
;
SRequestObj
*
pRequest
=
acquireRequest
(
*
(
int64_t
*
)
param
);
if
(
NULL
==
pRequest
)
{
setErrno
(
pRequest
,
TSDB_CODE_TSC_DISCONNECTED
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
goto
End
;
}
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrno
(
pRequest
,
code
);
setErrno
(
pRequest
,
code
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
...
@@ -55,6 +60,12 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -55,6 +60,12 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
STscObj
*
pTscObj
=
pRequest
->
pTscObj
;
if
(
NULL
==
pTscObj
->
pAppInfo
||
NULL
==
pTscObj
->
pAppInfo
->
pAppHbMgr
)
{
setErrno
(
pRequest
,
TSDB_CODE_TSC_DISCONNECTED
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
goto
End
;
}
SConnectRsp
connectRsp
=
{
0
};
SConnectRsp
connectRsp
=
{
0
};
if
(
tDeserializeSConnectRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
connectRsp
)
!=
0
)
{
if
(
tDeserializeSConnectRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
connectRsp
)
!=
0
)
{
code
=
TSDB_CODE_TSC_INVALID_VERSION
;
code
=
TSDB_CODE_TSC_INVALID_VERSION
;
...
@@ -115,10 +126,15 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -115,10 +126,15 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
connectRsp
.
clusterId
,
tscDebug
(
"0x%"
PRIx64
" clusterId:%"
PRId64
", totalConn:%"
PRId64
,
pRequest
->
requestId
,
connectRsp
.
clusterId
,
pTscObj
->
pAppInfo
->
numOfConns
);
pTscObj
->
pAppInfo
->
numOfConns
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
End:
End:
if
(
pRequest
)
{
releaseRequest
(
pRequest
->
self
);
}
taosMemoryFree
(
param
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pData
);
return
code
;
return
code
;
...
...
source/client/src/clientStmt.c
浏览文件 @
2bf5b040
...
@@ -5,6 +5,8 @@
...
@@ -5,6 +5,8 @@
#include "clientStmt.h"
#include "clientStmt.h"
char
*
gStmtStatusStr
[]
=
{
"unknown"
,
"init"
,
"prepare"
,
"settbname"
,
"settags"
,
"fetchFields"
,
"bind"
,
"bindCol"
,
"addBatch"
,
"exec"
};
static
int32_t
stmtCreateRequest
(
STscStmt
*
pStmt
)
{
static
int32_t
stmtCreateRequest
(
STscStmt
*
pStmt
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
...
@@ -21,6 +23,10 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) {
...
@@ -21,6 +23,10 @@ static int32_t stmtCreateRequest(STscStmt* pStmt) {
int32_t
stmtSwitchStatus
(
STscStmt
*
pStmt
,
STMT_STATUS
newStatus
)
{
int32_t
stmtSwitchStatus
(
STscStmt
*
pStmt
,
STMT_STATUS
newStatus
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
newStatus
>=
STMT_INIT
&&
newStatus
<
STMT_MAX
)
{
STMT_LOG_SEQ
(
newStatus
);
}
switch
(
newStatus
)
{
switch
(
newStatus
)
{
case
STMT_PREPARE
:
case
STMT_PREPARE
:
break
;
break
;
...
@@ -528,13 +534,17 @@ TAOS_STMT* stmtInit(STscObj* taos) {
...
@@ -528,13 +534,17 @@ TAOS_STMT* stmtInit(STscObj* taos) {
pStmt
->
bInfo
.
needParse
=
true
;
pStmt
->
bInfo
.
needParse
=
true
;
pStmt
->
sql
.
status
=
STMT_INIT
;
pStmt
->
sql
.
status
=
STMT_INIT
;
STMT_LOG_SEQ
(
STMT_INIT
);
tscDebug
(
"stmt:%p initialized"
,
pStmt
);
return
pStmt
;
return
pStmt
;
}
}
int
stmtPrepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
)
{
int
stmtPrepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
tscDebug
(
"stmt
start to prepare"
);
STMT_DLOG_E
(
"
start to prepare"
);
if
(
pStmt
->
sql
.
status
>=
STMT_PREPARE
)
{
if
(
pStmt
->
sql
.
status
>=
STMT_PREPARE
)
{
STMT_ERR_RET
(
stmtResetStmt
(
pStmt
));
STMT_ERR_RET
(
stmtResetStmt
(
pStmt
));
...
@@ -555,7 +565,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
...
@@ -555,7 +565,7 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
int
stmtSetTbName
(
TAOS_STMT
*
stmt
,
const
char
*
tbName
)
{
int
stmtSetTbName
(
TAOS_STMT
*
stmt
,
const
char
*
tbName
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
tscDebug
(
"stmt
start to set tbName: %s"
,
tbName
);
STMT_DLOG
(
"
start to set tbName: %s"
,
tbName
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_SETTBNAME
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_SETTBNAME
));
...
@@ -587,7 +597,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
...
@@ -587,7 +597,7 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
)
{
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
tscDebug
(
"stmt
start to set tbTags"
);
STMT_DLOG_E
(
"
start to set tbTags"
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_SETTAGS
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_SETTAGS
));
...
@@ -649,7 +659,7 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
...
@@ -649,7 +659,7 @@ int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int32_t
colIdx
)
{
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int32_t
colIdx
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
tscDebug
(
"start to bind stmt data, colIdx: %d"
,
colIdx
);
STMT_DLOG
(
"start to bind stmt data, colIdx: %d"
,
colIdx
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_BIND
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_BIND
));
...
@@ -743,7 +753,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
...
@@ -743,7 +753,7 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
int
stmtAddBatch
(
TAOS_STMT
*
stmt
)
{
int
stmtAddBatch
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
tscDebug
(
"stmt
start to add batch"
);
STMT_DLOG_E
(
"
start to add batch"
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_ADD_BATCH
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_ADD_BATCH
));
...
@@ -756,8 +766,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
...
@@ -756,8 +766,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
tscDebug
(
"stmt start to update tbUid, blockNum: %d"
,
pRsp
->
nBlocks
);
tscDebug
(
"stmt start to update tbUid, blockNum: %d"
,
pRsp
->
nBlocks
);
if
(
pRsp
->
nBlocks
<=
0
)
{
if
(
pRsp
->
nBlocks
<=
0
)
{
tscError
(
"invalid submit resp block number %d"
,
pRsp
->
nBlocks
);
return
TSDB_CODE_SUCCESS
;
STMT_ERR_RET
(
TSDB_CODE_TSC_APP_ERROR
);
}
}
size_t
keyLen
=
0
;
size_t
keyLen
=
0
;
...
@@ -810,7 +819,7 @@ int stmtExec(TAOS_STMT* stmt) {
...
@@ -810,7 +819,7 @@ int stmtExec(TAOS_STMT* stmt) {
SSubmitRsp
*
pRsp
=
NULL
;
SSubmitRsp
*
pRsp
=
NULL
;
bool
autoCreateTbl
=
pStmt
->
exec
.
autoCreateTbl
;
bool
autoCreateTbl
=
pStmt
->
exec
.
autoCreateTbl
;
tscDebug
(
"stmt
start to exec"
);
STMT_DLOG_E
(
"
start to exec"
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_EXECUTE
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_EXECUTE
));
...
@@ -885,6 +894,8 @@ int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->exec.affec
...
@@ -885,6 +894,8 @@ int stmtAffectedRowsOnce(TAOS_STMT* stmt) { return ((STscStmt*)stmt)->exec.affec
int
stmtIsInsert
(
TAOS_STMT
*
stmt
,
int
*
insert
)
{
int
stmtIsInsert
(
TAOS_STMT
*
stmt
,
int
*
insert
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start is insert"
);
if
(
pStmt
->
sql
.
type
)
{
if
(
pStmt
->
sql
.
type
)
{
*
insert
=
(
STMT_TYPE_INSERT
==
pStmt
->
sql
.
type
||
STMT_TYPE_MULTI_INSERT
==
pStmt
->
sql
.
type
);
*
insert
=
(
STMT_TYPE_INSERT
==
pStmt
->
sql
.
type
||
STMT_TYPE_MULTI_INSERT
==
pStmt
->
sql
.
type
);
}
else
{
}
else
{
...
@@ -897,6 +908,8 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
...
@@ -897,6 +908,8 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
int
stmtGetTagFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
int
stmtGetTagFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to get tag fields"
);
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
}
...
@@ -927,6 +940,8 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
...
@@ -927,6 +940,8 @@ int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int
stmtGetColFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
int
stmtGetColFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to get col fields"
);
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
}
...
@@ -957,6 +972,8 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
...
@@ -957,6 +972,8 @@ int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
int
stmtGetParamNum
(
TAOS_STMT
*
stmt
,
int
*
nums
)
{
int
stmtGetParamNum
(
TAOS_STMT
*
stmt
,
int
*
nums
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to get param num"
);
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
...
@@ -986,6 +1003,8 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
...
@@ -986,6 +1003,8 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
int
stmtGetParam
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
)
{
int
stmtGetParam
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to get param"
);
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
}
...
@@ -1028,6 +1047,8 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
...
@@ -1028,6 +1047,8 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
TAOS_RES
*
stmtUseResult
(
TAOS_STMT
*
stmt
)
{
TAOS_RES
*
stmtUseResult
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
STMT_DLOG_E
(
"start to use result"
);
if
(
STMT_TYPE_QUERY
!=
pStmt
->
sql
.
type
)
{
if
(
STMT_TYPE_QUERY
!=
pStmt
->
sql
.
type
)
{
tscError
(
"useResult only for query statement"
);
tscError
(
"useResult only for query statement"
);
return
NULL
;
return
NULL
;
...
...
source/common/src/tmsg.c
浏览文件 @
2bf5b040
...
@@ -5432,9 +5432,12 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
...
@@ -5432,9 +5432,12 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pBlock
->
hashMeta
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pBlock
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pBlock
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
if
(
pBlock
->
tblFName
)
{
if
(
tEncodeCStr
(
pEncoder
,
pBlock
->
tblFName
)
<
0
)
return
-
1
;
}
else
{
if
(
tEncodeCStr
(
pEncoder
,
""
)
<
0
)
return
-
1
;
}
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32v
(
pEncoder
,
pBlock
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tEncodeI64v
(
pEncoder
,
pBlock
->
sver
)
<
0
)
return
-
1
;
if
(
tEncodeI64v
(
pEncoder
,
pBlock
->
sver
)
<
0
)
return
-
1
;
...
@@ -5451,7 +5454,6 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
...
@@ -5451,7 +5454,6 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tStartDecode
(
pDecoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pBlock
->
code
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pBlock
->
hashMeta
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pBlock
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pBlock
->
uid
)
<
0
)
return
-
1
;
pBlock
->
tblFName
=
taosMemoryCalloc
(
TSDB_TABLE_FNAME_LEN
,
1
);
pBlock
->
tblFName
=
taosMemoryCalloc
(
TSDB_TABLE_FNAME_LEN
,
1
);
if
(
NULL
==
pBlock
->
tblFName
)
return
-
1
;
if
(
NULL
==
pBlock
->
tblFName
)
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
2bf5b040
...
@@ -116,6 +116,13 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
...
@@ -116,6 +116,13 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
if
(
info
.
suid
)
{
if
(
info
.
suid
)
{
metaGetInfo
(
pTsdb
->
pVnode
->
pMeta
,
info
.
suid
,
&
info
);
metaGetInfo
(
pTsdb
->
pVnode
->
pMeta
,
info
.
suid
,
&
info
);
}
}
if
(
pMsgIter
->
sversion
!=
info
.
skmVer
)
{
tsdbError
(
"vgId:%d, req sver:%d, skmVer:%d suid:%"
PRId64
" uid:%"
PRId64
,
TD_VID
(
pTsdb
->
pVnode
),
pMsgIter
->
sversion
,
info
.
skmVer
,
suid
,
uid
);
code
=
TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER
;
goto
_err
;
}
pRsp
->
sver
=
info
.
skmVer
;
pRsp
->
sver
=
info
.
skmVer
;
// create/get STbData to op
// create/get STbData to op
...
@@ -133,6 +140,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
...
@@ -133,6 +140,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
return
code
;
return
code
;
_err:
_err:
terrno
=
code
;
return
code
;
return
code
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
2bf5b040
...
@@ -861,6 +861,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -861,6 +861,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SEncoder
encoder
=
{
0
};
SEncoder
encoder
=
{
0
};
SArray
*
newTbUids
=
NULL
;
SArray
*
newTbUids
=
NULL
;
SVStatis
statis
=
{
0
};
SVStatis
statis
=
{
0
};
bool
tbCreated
=
false
;
terrno
=
TSDB_CODE_SUCCESS
;
terrno
=
TSDB_CODE_SUCCESS
;
pRsp
->
code
=
0
;
pRsp
->
code
=
0
;
...
@@ -894,11 +895,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -894,11 +895,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if
(
pBlock
==
NULL
)
break
;
if
(
pBlock
==
NULL
)
break
;
SSubmitBlkRsp
submitBlkRsp
=
{
0
};
SSubmitBlkRsp
submitBlkRsp
=
{
0
};
tbCreated
=
false
;
// create table for auto create table mode
// create table for auto create table mode
if
(
msgIter
.
schemaLen
>
0
)
{
if
(
msgIter
.
schemaLen
>
0
)
{
submitBlkRsp
.
hashMeta
=
1
;
tDecoderInit
(
&
decoder
,
pBlock
->
data
,
msgIter
.
schemaLen
);
tDecoderInit
(
&
decoder
,
pBlock
->
data
,
msgIter
.
schemaLen
);
if
(
tDecodeSVCreateTbReq
(
&
decoder
,
&
createTbReq
)
<
0
)
{
if
(
tDecodeSVCreateTbReq
(
&
decoder
,
&
createTbReq
)
<
0
)
{
pRsp
->
code
=
TSDB_CODE_INVALID_MSG
;
pRsp
->
code
=
TSDB_CODE_INVALID_MSG
;
...
@@ -935,12 +935,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -935,12 +935,13 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
}
}
taosArrayPush
(
newTbUids
,
&
createTbReq
.
uid
);
taosArrayPush
(
newTbUids
,
&
createTbReq
.
uid
);
}
submitBlkRsp
.
uid
=
createTbReq
.
uid
;
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
strlen
(
pVnode
->
config
.
dbname
)
+
strlen
(
createTbReq
.
name
)
+
2
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s.%s"
,
pVnode
->
config
.
dbname
,
createTbReq
.
name
);
submitBlkRsp
.
uid
=
createTbReq
.
uid
;
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
strlen
(
pVnode
->
config
.
dbname
)
+
strlen
(
createTbReq
.
name
)
+
2
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s.%s"
,
pVnode
->
config
.
dbname
,
createTbReq
.
name
);
tbCreated
=
true
;
}
msgIter
.
uid
=
createTbReq
.
uid
;
msgIter
.
uid
=
createTbReq
.
uid
;
if
(
createTbReq
.
type
==
TSDB_CHILD_TABLE
)
{
if
(
createTbReq
.
type
==
TSDB_CHILD_TABLE
)
{
msgIter
.
suid
=
createTbReq
.
ctb
.
suid
;
msgIter
.
suid
=
createTbReq
.
ctb
.
suid
;
...
@@ -953,10 +954,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -953,10 +954,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
#endif
#endif
tDecoderClear
(
&
decoder
);
tDecoderClear
(
&
decoder
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
taosArrayDestroy
(
createTbReq
.
ctb
.
tagName
);
}
else
{
}
submitBlkRsp
.
tblFName
=
taosMemoryMalloc
(
TSDB_TABLE_FNAME_LEN
);
sprintf
(
submitBlkRsp
.
tblFName
,
"%s."
,
pVnode
->
config
.
dbname
);
}
if
(
tsdbInsertTableData
(
pVnode
->
pTsdb
,
version
,
&
msgIter
,
pBlock
,
&
submitBlkRsp
)
<
0
)
{
if
(
tsdbInsertTableData
(
pVnode
->
pTsdb
,
version
,
&
msgIter
,
pBlock
,
&
submitBlkRsp
)
<
0
)
{
submitBlkRsp
.
code
=
terrno
;
submitBlkRsp
.
code
=
terrno
;
...
@@ -964,7 +962,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
...
@@ -964,7 +962,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
submitRsp
.
numOfRows
+=
submitBlkRsp
.
numOfRows
;
submitRsp
.
numOfRows
+=
submitBlkRsp
.
numOfRows
;
submitRsp
.
affectedRows
+=
submitBlkRsp
.
affectedRows
;
submitRsp
.
affectedRows
+=
submitBlkRsp
.
affectedRows
;
taosArrayPush
(
submitRsp
.
pArray
,
&
submitBlkRsp
);
if
(
tbCreated
||
submitBlkRsp
.
code
)
{
taosArrayPush
(
submitRsp
.
pArray
,
&
submitBlkRsp
);
}
}
}
if
(
taosArrayGetSize
(
newTbUids
)
>
0
)
{
if
(
taosArrayGetSize
(
newTbUids
)
>
0
)
{
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
2bf5b040
...
@@ -33,6 +33,7 @@ extern "C" {
...
@@ -33,6 +33,7 @@ extern "C" {
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_DEFAULT_FETCH_NUM 8
#define CTG_MAX_COMMAND_LEN 512
#define CTG_RENT_SLOT_SECOND 1.5
#define CTG_RENT_SLOT_SECOND 1.5
...
@@ -223,6 +224,7 @@ typedef struct SCtgUserAuth {
...
@@ -223,6 +224,7 @@ typedef struct SCtgUserAuth {
typedef
struct
SCatalog
{
typedef
struct
SCatalog
{
uint64_t
clusterId
;
uint64_t
clusterId
;
bool
stopUpdate
;
SHashObj
*
userCache
;
// key:user, value:SCtgUserAuth
SHashObj
*
userCache
;
// key:user, value:SCtgUserAuth
SHashObj
*
dbCache
;
// key:dbname, value:SCtgDBCache
SHashObj
*
dbCache
;
// key:dbname, value:SCtgDBCache
SCtgRentMgmt
dbRent
;
SCtgRentMgmt
dbRent
;
...
@@ -671,7 +673,7 @@ void ctgdShowClusterCache(SCatalog* pCtg);
...
@@ -671,7 +673,7 @@ void ctgdShowClusterCache(SCatalog* pCtg);
int32_t
ctgdShowCacheInfo
(
void
);
int32_t
ctgdShowCacheInfo
(
void
);
int32_t
ctgRemoveTbMetaFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
bool
syncReq
);
int32_t
ctgRemoveTbMetaFromCache
(
SCatalog
*
pCtg
,
SName
*
pTableName
,
bool
syncReq
);
int32_t
ctgGetTbMetaFromCache
(
SCatalog
*
pCtg
,
S
RequestConnInfo
*
pConn
,
S
CtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
int32_t
ctgGetTbMetaFromCache
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
);
int32_t
ctgGetTbMetasFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgTbMetasCtx
*
ctx
,
int32_t
dbIdx
,
int32_t
ctgGetTbMetasFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
SCtgTbMetasCtx
*
ctx
,
int32_t
dbIdx
,
int32_t
*
fetchIdx
,
int32_t
baseResIdx
,
SArray
*
pList
);
int32_t
*
fetchIdx
,
int32_t
baseResIdx
,
SArray
*
pList
);
...
@@ -786,6 +788,7 @@ void ctgFreeTbCacheImpl(SCtgTbCache* pCache);
...
@@ -786,6 +788,7 @@ void ctgFreeTbCacheImpl(SCtgTbCache* pCache);
int32_t
ctgRemoveTbMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
ctgRemoveTbMeta
(
SCatalog
*
pCtg
,
SName
*
pTableName
);
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
int32_t
ctgGetTbHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
);
SName
*
ctgGetFetchName
(
SArray
*
pNames
,
SCtgFetch
*
pFetch
);
SName
*
ctgGetFetchName
(
SArray
*
pNames
,
SCtgFetch
*
pFetch
);
int32_t
ctgdGetOneHandle
(
SCatalog
**
pHandle
);
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCatalogMgmt
gCtgMgmt
;
extern
SCtgDebug
gCTGDebug
;
extern
SCtgDebug
gCTGDebug
;
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
2bf5b040
...
@@ -202,7 +202,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
...
@@ -202,7 +202,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
int32_t
code
=
0
;
int32_t
code
=
0
;
STableMetaOutput
*
output
=
NULL
;
STableMetaOutput
*
output
=
NULL
;
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
pCtg
,
pConn
,
ctx
,
pTableMeta
));
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
pCtg
,
ctx
,
pTableMeta
));
if
(
*
pTableMeta
||
(
ctx
->
flag
&
CTG_FLAG_ONLY_CACHE
))
{
if
(
*
pTableMeta
||
(
ctx
->
flag
&
CTG_FLAG_ONLY_CACHE
))
{
goto
_return
;
goto
_return
;
}
}
...
@@ -959,14 +959,14 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName
...
@@ -959,14 +959,14 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
}
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
int32_t
catalogGetCachedTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_ENTER
();
SCtgTbMetaCtx
ctx
=
{
0
};
SCtgTbMetaCtx
ctx
=
{
0
};
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
flag
=
CTG_FLAG_UNKNOWN_STB
|
CTG_FLAG_ONLY_CACHE
;
ctx
.
flag
=
CTG_FLAG_UNKNOWN_STB
|
CTG_FLAG_ONLY_CACHE
;
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
NULL
,
&
ctx
,
pTableMeta
));
}
}
...
@@ -981,15 +981,14 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam
...
@@ -981,15 +981,14 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SNam
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
}
}
int32_t
catalogGetCachedSTableMeta
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
int32_t
catalogGetCachedSTableMeta
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
STableMeta
**
pTableMeta
)
{
STableMeta
**
pTableMeta
)
{
CTG_API_ENTER
();
CTG_API_ENTER
();
SCtgTbMetaCtx
ctx
=
{
0
};
SCtgTbMetaCtx
ctx
=
{
0
};
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
pName
=
(
SName
*
)
pTableName
;
ctx
.
flag
=
CTG_FLAG_STB
|
CTG_FLAG_ONLY_CACHE
;
ctx
.
flag
=
CTG_FLAG_STB
|
CTG_FLAG_ONLY_CACHE
;
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
pConn
,
&
ctx
,
pTableMeta
));
CTG_API_LEAVE
(
ctgGetTbMeta
(
pCtg
,
NULL
,
&
ctx
,
pTableMeta
));
}
}
...
@@ -1114,11 +1113,10 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
...
@@ -1114,11 +1113,10 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
,
NULL
));
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
,
NULL
));
}
}
int32_t
catalogGetCachedTableHashVgroup
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
SName
*
pTableName
,
int32_t
catalogGetCachedTableHashVgroup
(
SCatalog
*
pCtg
,
const
SName
*
pTableName
,
SVgroupInfo
*
pVgroup
,
bool
*
exists
)
{
SVgroupInfo
*
pVgroup
,
bool
*
exists
)
{
CTG_API_ENTER
();
CTG_API_ENTER
();
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
pConn
,
pTableName
,
pVgroup
,
exists
));
CTG_API_LEAVE
(
ctgGetTbHashVgroup
(
pCtg
,
NULL
,
pTableName
,
pVgroup
,
exists
));
}
}
#if 0
#if 0
...
@@ -1387,16 +1385,16 @@ _return:
...
@@ -1387,16 +1385,16 @@ _return:
CTG_API_LEAVE
(
code
);
CTG_API_LEAVE
(
code
);
}
}
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
SRequestConnInfo
*
pConn
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
int32_t
catalogChkAuthFromCache
(
SCatalog
*
pCtg
,
const
char
*
user
,
const
char
*
dbFName
,
AUTH_TYPE
type
,
bool
*
pass
,
bool
*
exists
)
{
bool
*
pass
,
bool
*
exists
)
{
CTG_API_ENTER
();
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pConn
||
NULL
==
user
||
NULL
==
dbFName
||
NULL
==
pass
||
NULL
==
exists
)
{
if
(
NULL
==
pCtg
||
NULL
==
user
||
NULL
==
dbFName
||
NULL
==
pass
||
NULL
==
exists
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
}
int32_t
code
=
0
;
int32_t
code
=
0
;
CTG_ERR_JRET
(
ctgChkAuth
(
pCtg
,
pConn
,
user
,
dbFName
,
type
,
pass
,
exists
));
CTG_ERR_JRET
(
ctgChkAuth
(
pCtg
,
NULL
,
user
,
dbFName
,
type
,
pass
,
exists
));
_return:
_return:
...
...
source/libs/catalog/src/ctgAsync.c
浏览文件 @
2bf5b040
...
@@ -1204,11 +1204,15 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
...
@@ -1204,11 +1204,15 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
stbCtx
.
flag
=
flag
;
stbCtx
.
flag
=
flag
;
stbCtx
.
pName
=
&
stbName
;
stbCtx
.
pName
=
&
stbName
;
taosMemoryFreeClear
(
pOut
->
tbMeta
)
;
STableMeta
*
stbMeta
=
NULL
;
CTG_ERR_JRET
(
ctgReadTbMetaFromCache
(
pCtg
,
&
stbCtx
,
&
pOut
->
tbMeta
)
);
ctgReadTbMetaFromCache
(
pCtg
,
&
stbCtx
,
&
stbMeta
);
if
(
pOut
->
tbMeta
)
{
if
(
stbMeta
&&
stbMeta
->
sversion
>=
pOut
->
tbMeta
->
sversion
)
{
ctgDebug
(
"use cached stb meta, tbName:%s"
,
tNameGetTableName
(
pName
));
ctgDebug
(
"use cached stb meta, tbName:%s"
,
tNameGetTableName
(
pName
));
exist
=
1
;
exist
=
1
;
}
else
{
ctgDebug
(
"need to get/update stb meta, tbName:%s"
,
tNameGetTableName
(
pName
));
taosMemoryFreeClear
(
pOut
->
tbMeta
);
taosMemoryFreeClear
(
stbMeta
);
}
}
}
}
...
@@ -1641,7 +1645,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) {
...
@@ -1641,7 +1645,7 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) {
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
pMsgCtx
->
pBatchs
=
pJob
->
pBatchs
;
}
}
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
pCtg
,
pConn
,
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
,
(
STableMeta
**
)
&
pTask
->
res
));
CTG_ERR_RET
(
ctgGetTbMetaFromCache
(
pCtg
,
(
SCtgTbMetaCtx
*
)
pTask
->
taskCtx
,
(
STableMeta
**
)
&
pTask
->
res
));
if
(
pTask
->
res
)
{
if
(
pTask
->
res
)
{
CTG_ERR_RET
(
ctgHandleTaskEnd
(
pTask
,
0
));
CTG_ERR_RET
(
ctgHandleTaskEnd
(
pTask
,
0
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
2bf5b040
...
@@ -248,6 +248,8 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid,
...
@@ -248,6 +248,8 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid,
goto
_return
;
goto
_return
;
}
}
taosHashRelease
(
dbCache
->
stbCache
,
stName
);
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
CTG_LOCK
(
CTG_READ
,
&
pCache
->
metaLock
);
if
(
NULL
==
pCache
->
pMeta
)
{
if
(
NULL
==
pCache
->
pMeta
)
{
ctgDebug
(
"stb 0x%"
PRIx64
" meta not in cache, dbFName:%s"
,
suid
,
dbFName
);
ctgDebug
(
"stb 0x%"
PRIx64
" meta not in cache, dbFName:%s"
,
suid
,
dbFName
);
...
@@ -1550,7 +1552,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
...
@@ -1550,7 +1552,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
char
*
dbFName
=
msg
->
dbFName
;
char
*
dbFName
=
msg
->
dbFName
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
NULL
==
dbInfo
->
vgHash
)
{
if
(
pCtg
->
stopUpdate
||
NULL
==
dbInfo
->
vgHash
)
{
goto
_return
;
goto
_return
;
}
}
...
@@ -1620,6 +1622,10 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
...
@@ -1620,6 +1622,10 @@ int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
SCtgDropDBMsg
*
msg
=
operation
->
data
;
SCtgDropDBMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgGetDBCache
(
msg
->
pCtg
,
msg
->
dbFName
,
&
dbCache
);
ctgGetDBCache
(
msg
->
pCtg
,
msg
->
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
...
@@ -1646,6 +1652,10 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
...
@@ -1646,6 +1652,10 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
SCtgDropDbVgroupMsg
*
msg
=
operation
->
data
;
SCtgDropDbVgroupMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgGetDBCache
(
msg
->
pCtg
,
msg
->
dbFName
,
&
dbCache
);
ctgGetDBCache
(
msg
->
pCtg
,
msg
->
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
...
@@ -1675,6 +1685,10 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
...
@@ -1675,6 +1685,10 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
STableMetaOutput
*
pMeta
=
msg
->
pMeta
;
STableMetaOutput
*
pMeta
=
msg
->
pMeta
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
if
((
!
CTG_IS_META_CTABLE
(
pMeta
->
metaType
))
&&
NULL
==
pMeta
->
tbMeta
)
{
if
((
!
CTG_IS_META_CTABLE
(
pMeta
->
metaType
))
&&
NULL
==
pMeta
->
tbMeta
)
{
ctgError
(
"no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s"
,
pMeta
->
dbFName
,
pMeta
->
tbName
);
ctgError
(
"no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s"
,
pMeta
->
dbFName
,
pMeta
->
tbName
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
CTG_ERR_JRET
(
TSDB_CODE_CTG_INTERNAL_ERROR
);
...
@@ -1723,6 +1737,10 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
...
@@ -1723,6 +1737,10 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
SCtgDropStbMetaMsg
*
msg
=
operation
->
data
;
SCtgDropStbMetaMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
);
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
...
@@ -1776,6 +1794,10 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
...
@@ -1776,6 +1794,10 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
SCtgDropTblMetaMsg
*
msg
=
operation
->
data
;
SCtgDropTblMetaMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
);
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
);
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
...
@@ -1819,6 +1841,10 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
...
@@ -1819,6 +1841,10 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
SCtgUpdateUserMsg
*
msg
=
operation
->
data
;
SCtgUpdateUserMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
SCtgUserAuth
*
pUser
=
(
SCtgUserAuth
*
)
taosHashGet
(
pCtg
->
userCache
,
msg
->
userAuth
.
user
,
strlen
(
msg
->
userAuth
.
user
));
SCtgUserAuth
*
pUser
=
(
SCtgUserAuth
*
)
taosHashGet
(
pCtg
->
userCache
,
msg
->
userAuth
.
user
,
strlen
(
msg
->
userAuth
.
user
));
if
(
NULL
==
pUser
)
{
if
(
NULL
==
pUser
)
{
SCtgUserAuth
userAuth
=
{
0
};
SCtgUserAuth
userAuth
=
{
0
};
...
@@ -1872,8 +1898,12 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
...
@@ -1872,8 +1898,12 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
int32_t
code
=
0
;
int32_t
code
=
0
;
SCtgUpdateEpsetMsg
*
msg
=
operation
->
data
;
SCtgUpdateEpsetMsg
*
msg
=
operation
->
data
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
CTG_ERR_JRET
(
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
));
CTG_ERR_JRET
(
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
ctgDebug
(
"db %s not exist, ignore epset update"
,
msg
->
dbFName
);
ctgDebug
(
"db %s not exist, ignore epset update"
,
msg
->
dbFName
);
...
@@ -1920,6 +1950,10 @@ int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
...
@@ -1920,6 +1950,10 @@ int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
STableIndex
*
pIndex
=
msg
->
pIndex
;
STableIndex
*
pIndex
=
msg
->
pIndex
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
pIndex
->
dbFName
,
0
,
&
dbCache
));
CTG_ERR_JRET
(
ctgGetAddDBCache
(
pCtg
,
pIndex
->
dbFName
,
0
,
&
dbCache
));
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
pIndex
->
dbFName
,
pIndex
->
tbName
,
&
pIndex
));
CTG_ERR_JRET
(
ctgWriteTbIndexToCache
(
pCtg
,
dbCache
,
pIndex
->
dbFName
,
pIndex
->
tbName
,
&
pIndex
));
...
@@ -1942,6 +1976,10 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
...
@@ -1942,6 +1976,10 @@ int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCatalog
*
pCtg
=
msg
->
pCtg
;
SCtgDBCache
*
dbCache
=
NULL
;
SCtgDBCache
*
dbCache
=
NULL
;
if
(
pCtg
->
stopUpdate
)
{
goto
_return
;
}
CTG_ERR_JRET
(
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
));
CTG_ERR_JRET
(
ctgGetDBCache
(
pCtg
,
msg
->
dbFName
,
&
dbCache
));
if
(
NULL
==
dbCache
)
{
if
(
NULL
==
dbCache
)
{
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
...
@@ -2154,7 +2192,7 @@ int32_t ctgStartUpdateThread() {
...
@@ -2154,7 +2192,7 @@ int32_t ctgStartUpdateThread() {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
ctgGetTbMetaFromCache
(
SCatalog
*
pCtg
,
S
RequestConnInfo
*
pConn
,
S
CtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
)
{
int32_t
ctgGetTbMetaFromCache
(
SCatalog
*
pCtg
,
SCtgTbMetaCtx
*
ctx
,
STableMeta
**
pTableMeta
)
{
if
(
IS_SYS_DBNAME
(
ctx
->
pName
->
dbname
))
{
if
(
IS_SYS_DBNAME
(
ctx
->
pName
->
dbname
))
{
CTG_FLAG_SET_SYS_DB
(
ctx
->
flag
);
CTG_FLAG_SET_SYS_DB
(
ctx
->
flag
);
}
}
...
...
source/libs/catalog/src/ctgDbg.c
浏览文件 @
2bf5b040
...
@@ -226,28 +226,45 @@ _return:
...
@@ -226,28 +226,45 @@ _return:
CTG_RET
(
code
);
CTG_RET
(
code
);
}
}
int32_t
ctgdEnableDebug
(
char
*
option
)
{
int32_t
ctgdEnableDebug
(
char
*
option
,
bool
enable
)
{
if
(
0
==
strcasecmp
(
option
,
"lock"
))
{
if
(
0
==
strcasecmp
(
option
,
"lock"
))
{
gCTGDebug
.
lockEnable
=
tru
e
;
gCTGDebug
.
lockEnable
=
enabl
e
;
qDebug
(
"
lock debug enabled"
);
qDebug
(
"
catalog lock debug set to %d"
,
enable
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
0
==
strcasecmp
(
option
,
"cache"
))
{
if
(
0
==
strcasecmp
(
option
,
"cache"
))
{
gCTGDebug
.
cacheEnable
=
tru
e
;
gCTGDebug
.
cacheEnable
=
enabl
e
;
qDebug
(
"ca
che debug enabled"
);
qDebug
(
"ca
talog cache debug set to %d"
,
enable
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
0
==
strcasecmp
(
option
,
"api"
))
{
if
(
0
==
strcasecmp
(
option
,
"api"
))
{
gCTGDebug
.
apiEnable
=
tru
e
;
gCTGDebug
.
apiEnable
=
enabl
e
;
qDebug
(
"
api debug enabled"
);
qDebug
(
"
catalog api debug set to %d"
,
enable
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
if
(
0
==
strcasecmp
(
option
,
"meta"
))
{
if
(
0
==
strcasecmp
(
option
,
"meta"
))
{
gCTGDebug
.
metaEnable
=
true
;
gCTGDebug
.
metaEnable
=
enable
;
qDebug
(
"api debug enabled"
);
qDebug
(
"catalog meta debug set to %d"
,
enable
);
return
TSDB_CODE_SUCCESS
;
}
if
(
0
==
strcasecmp
(
option
,
"stopUpdate"
))
{
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
pCtg
->
stopUpdate
=
enable
;
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
}
qDebug
(
"catalog stopUpdate set to %d"
,
enable
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -256,6 +273,77 @@ int32_t ctgdEnableDebug(char *option) {
...
@@ -256,6 +273,77 @@ int32_t ctgdEnableDebug(char *option) {
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
return
TSDB_CODE_CTG_INTERNAL_ERROR
;
}
}
int32_t
ctgdHandleDbgCommand
(
char
*
command
)
{
if
(
NULL
==
command
)
{
CTG_RET
(
TSDB_CODE_INVALID_PARA
);
}
if
(
strlen
(
command
)
>
CTG_MAX_COMMAND_LEN
)
{
CTG_RET
(
TSDB_CODE_INVALID_PARA
);
}
char
*
dup
=
strdup
(
command
);
char
*
option
=
NULL
;
char
*
param
=
NULL
;
int32_t
i
=
0
;
bool
newItem
=
true
;
while
(
*
(
dup
+
i
))
{
if
(
isspace
(
*
(
dup
+
i
)))
{
*
(
dup
+
i
)
=
0
;
++
i
;
newItem
=
true
;
continue
;
}
if
(
!
newItem
)
{
++
i
;
continue
;
}
newItem
=
false
;
if
(
NULL
==
option
)
{
option
=
dup
+
i
;
++
i
;
continue
;
}
if
(
NULL
==
param
)
{
param
=
dup
+
i
;
++
i
;
continue
;
}
taosMemoryFree
(
dup
);
CTG_RET
(
TSDB_CODE_INVALID_PARA
);
}
bool
enable
=
atoi
(
param
);
int32_t
code
=
ctgdEnableDebug
(
option
,
enable
);
taosMemoryFree
(
dup
);
CTG_RET
(
code
);
}
int32_t
ctgdGetOneHandle
(
SCatalog
**
pHandle
)
{
SCatalog
*
pCtg
=
NULL
;
void
*
pIter
=
taosHashIterate
(
gCtgMgmt
.
pCluster
,
NULL
);
while
(
pIter
)
{
pCtg
=
*
(
SCatalog
**
)
pIter
;
taosHashCancelIterate
(
gCtgMgmt
.
pCluster
,
pIter
);
break
;
}
*
pHandle
=
pCtg
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgdGetStatNum
(
char
*
option
,
void
*
res
)
{
int32_t
ctgdGetStatNum
(
char
*
option
,
void
*
res
)
{
if
(
0
==
strcasecmp
(
option
,
"runtime.numOfOpDequeue"
))
{
if
(
0
==
strcasecmp
(
option
,
"runtime.numOfOpDequeue"
))
{
*
(
uint64_t
*
)
res
=
atomic_load_64
(
&
gCtgMgmt
.
stat
.
runtime
.
numOfOpDequeue
);
*
(
uint64_t
*
)
res
=
atomic_load_64
(
&
gCtgMgmt
.
stat
.
runtime
.
numOfOpDequeue
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
2bf5b040
...
@@ -41,7 +41,6 @@
...
@@ -41,7 +41,6 @@
namespace
{
namespace
{
extern
"C"
int32_t
ctgdGetClusterCacheNum
(
struct
SCatalog
*
pCatalog
,
int32_t
type
);
extern
"C"
int32_t
ctgdGetClusterCacheNum
(
struct
SCatalog
*
pCatalog
,
int32_t
type
);
extern
"C"
int32_t
ctgdEnableDebug
(
char
*
option
);
extern
"C"
int32_t
ctgdGetStatNum
(
char
*
option
,
void
*
res
);
extern
"C"
int32_t
ctgdGetStatNum
(
char
*
option
,
void
*
res
);
void
ctgTestSetRspTableMeta
();
void
ctgTestSetRspTableMeta
();
...
@@ -49,6 +48,8 @@ void ctgTestSetRspCTableMeta();
...
@@ -49,6 +48,8 @@ void ctgTestSetRspCTableMeta();
void
ctgTestSetRspSTableMeta
();
void
ctgTestSetRspSTableMeta
();
void
ctgTestSetRspMultiSTableMeta
();
void
ctgTestSetRspMultiSTableMeta
();
extern
int32_t
clientConnRefPool
;
enum
{
enum
{
CTGT_RSP_VGINFO
=
1
,
CTGT_RSP_VGINFO
=
1
,
CTGT_RSP_TBMETA
,
CTGT_RSP_TBMETA
,
...
@@ -151,10 +152,10 @@ void ctgTestInitLogFile() {
...
@@ -151,10 +152,10 @@ void ctgTestInitLogFile() {
qDebugFlag
=
159
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
ctgdEnableDebug
(
"api"
);
ctgdEnableDebug
(
"api"
,
true
);
ctgdEnableDebug
(
"meta"
);
ctgdEnableDebug
(
"meta"
,
true
);
ctgdEnableDebug
(
"cache"
);
ctgdEnableDebug
(
"cache"
,
true
);
ctgdEnableDebug
(
"lock"
);
ctgdEnableDebug
(
"lock"
,
true
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
@@ -1204,6 +1205,34 @@ void *ctgTestSetCtableMetaThread(void *param) {
...
@@ -1204,6 +1205,34 @@ void *ctgTestSetCtableMetaThread(void *param) {
}
}
void
ctgTestFetchRows
(
TAOS_RES
*
result
,
int32_t
*
rows
)
{
TAOS_ROW
row
;
int
num_fields
=
taos_num_fields
(
result
);
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
result
);
char
temp
[
256
];
// fetch the records row by row
while
((
row
=
taos_fetch_row
(
result
)))
{
(
*
rows
)
++
;
memset
(
temp
,
0
,
sizeof
(
temp
));
taos_print_row
(
temp
,
row
,
fields
,
num_fields
);
printf
(
"
\t
[%s]
\n
"
,
temp
);
}
}
void
ctgTestExecQuery
(
TAOS
*
taos
,
char
*
sql
,
bool
fetch
,
int32_t
*
rows
)
{
TAOS_RES
*
result
=
taos_query
(
taos
,
sql
);
int
code
=
taos_errno
(
result
);
ASSERT_EQ
(
code
,
0
);
if
(
fetch
)
{
ctgTestFetchRows
(
result
,
rows
);
}
taos_free_result
(
result
);
}
TEST
(
tableMeta
,
normalTable
)
{
TEST
(
tableMeta
,
normalTable
)
{
struct
SCatalog
*
pCtg
=
NULL
;
struct
SCatalog
*
pCtg
=
NULL
;
SVgroupInfo
vgInfo
=
{
0
};
SVgroupInfo
vgInfo
=
{
0
};
...
@@ -1245,7 +1274,7 @@ TEST(tableMeta, normalTable) {
...
@@ -1245,7 +1274,7 @@ TEST(tableMeta, normalTable) {
memset
(
&
vgInfo
,
0
,
sizeof
(
vgInfo
));
memset
(
&
vgInfo
,
0
,
sizeof
(
vgInfo
));
bool
exists
=
false
;
bool
exists
=
false
;
code
=
catalogGetCachedTableHashVgroup
(
pCtg
,
mockPointer
,
&
n
,
&
vgInfo
,
&
exists
);
code
=
catalogGetCachedTableHashVgroup
(
pCtg
,
&
n
,
&
vgInfo
,
&
exists
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
vgId
,
8
);
ASSERT_EQ
(
vgInfo
.
epSet
.
numOfEps
,
3
);
ASSERT_EQ
(
vgInfo
.
epSet
.
numOfEps
,
3
);
...
@@ -1292,7 +1321,7 @@ TEST(tableMeta, normalTable) {
...
@@ -1292,7 +1321,7 @@ TEST(tableMeta, normalTable) {
taosMemoryFree
(
tableMeta
);
taosMemoryFree
(
tableMeta
);
tableMeta
=
NULL
;
tableMeta
=
NULL
;
catalogGetCachedTableMeta
(
pCtg
,
mockPointer
,
&
n
,
&
tableMeta
);
catalogGetCachedTableMeta
(
pCtg
,
&
n
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
vgId
,
8
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_NORMAL_TABLE
);
...
@@ -1500,7 +1529,7 @@ TEST(tableMeta, superTableCase) {
...
@@ -1500,7 +1529,7 @@ TEST(tableMeta, superTableCase) {
}
}
tableMeta
=
NULL
;
tableMeta
=
NULL
;
code
=
catalogGetCachedSTableMeta
(
pCtg
,
mockPointer
,
&
n
,
&
tableMeta
);
code
=
catalogGetCachedSTableMeta
(
pCtg
,
&
n
,
&
tableMeta
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
vgId
,
0
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
ASSERT_EQ
(
tableMeta
->
tableType
,
TSDB_SUPER_TABLE
);
...
@@ -2772,7 +2801,7 @@ TEST(apiTest, catalogChkAuth_test) {
...
@@ -2772,7 +2801,7 @@ TEST(apiTest, catalogChkAuth_test) {
bool
pass
=
false
;
bool
pass
=
false
;
bool
exists
=
false
;
bool
exists
=
false
;
code
=
catalogChkAuthFromCache
(
pCtg
,
mockPointer
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
code
=
catalogChkAuthFromCache
(
pCtg
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
exists
,
false
);
ASSERT_EQ
(
exists
,
false
);
...
@@ -2790,7 +2819,7 @@ TEST(apiTest, catalogChkAuth_test) {
...
@@ -2790,7 +2819,7 @@ TEST(apiTest, catalogChkAuth_test) {
}
}
}
}
code
=
catalogChkAuthFromCache
(
pCtg
,
mockPointer
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
code
=
catalogChkAuthFromCache
(
pCtg
,
ctgTestUsername
,
ctgTestDbname
,
AUTH_TYPE_READ
,
&
pass
,
&
exists
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pass
,
true
);
ASSERT_EQ
(
pass
,
true
);
ASSERT_EQ
(
exists
,
true
);
ASSERT_EQ
(
exists
,
true
);
...
@@ -3063,6 +3092,58 @@ TEST(apiTest, catalogGetDnodeList_test) {
...
@@ -3063,6 +3092,58 @@ TEST(apiTest, catalogGetDnodeList_test) {
catalogDestroy
();
catalogDestroy
();
}
}
#ifdef INTEGRATION_TEST
TEST
(
intTest
,
autoCreateTableTest
)
{
struct
SCatalog
*
pCtg
=
NULL
;
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_TRUE
(
NULL
!=
taos
);
ctgdEnableDebug
(
"api"
,
true
);
ctgdEnableDebug
(
"meta"
,
true
);
ctgdEnableDebug
(
"cache"
,
true
);
ctgdEnableDebug
(
"lock"
,
true
);
ctgTestExecQuery
(
taos
,
"drop database if exists db1"
,
false
,
NULL
);
ctgTestExecQuery
(
taos
,
"create database db1"
,
false
,
NULL
);
ctgTestExecQuery
(
taos
,
"create stable db1.st1 (ts timestamp, f1 int) tags(tg1 int)"
,
false
,
NULL
);
ctgTestExecQuery
(
taos
,
"insert into db1.tb1 using db1.st1 tags(1) values(now, 1)"
,
false
,
NULL
);
ctgdGetOneHandle
(
&
pCtg
);
while
(
true
)
{
uint32_t
n
=
ctgdGetClusterCacheNum
(
pCtg
,
CTG_DBG_META_NUM
);
if
(
2
!=
n
)
{
taosMsleep
(
50
);
}
else
{
break
;
}
}
uint64_t
n
=
0
,
m
=
0
;
ctgdGetStatNum
(
"runtime.numOfOpDequeue"
,
(
void
*
)
&
n
);
ctgTestExecQuery
(
taos
,
"insert into db1.tb1 using db1.st1 tags(1) values(now, 2)"
,
false
,
NULL
);
ctgTestExecQuery
(
taos
,
"insert into db1.tb1 values(now, 3)"
,
false
,
NULL
);
taosMsleep
(
1000
);
ctgdGetStatNum
(
"runtime.numOfOpDequeue"
,
(
void
*
)
&
m
);
ASSERT_EQ
(
n
,
m
);
ctgdEnableDebug
(
"stopUpdate"
,
true
);
ctgTestExecQuery
(
taos
,
"alter table db1.st1 add column f2 double"
,
false
,
NULL
);
ctgdEnableDebug
(
"stopUpdate"
,
false
);
ctgTestExecQuery
(
taos
,
"insert into db1.tb1 (ts, f1) values(now, 4)"
,
false
,
NULL
);
taos_close
(
taos
);
}
#endif
int
main
(
int
argc
,
char
**
argv
)
{
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
testing
::
InitGoogleTest
(
&
argc
,
argv
);
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
2bf5b040
...
@@ -96,6 +96,7 @@ extern "C" {
...
@@ -96,6 +96,7 @@ extern "C" {
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule"
#define COMMAND_ENABLE_RESCHEDULE "enableReSchedule"
#define COMMAND_CATALOG_DEBUG "catalogDebug"
typedef
struct
SExplainGroup
{
typedef
struct
SExplainGroup
{
int32_t
nodeNum
;
int32_t
nodeNum
;
...
...
source/libs/command/src/command.c
浏览文件 @
2bf5b040
...
@@ -571,6 +571,8 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
...
@@ -571,6 +571,8 @@ static int32_t execAlterCmd(char* cmd, char* value, bool* processed) {
code
=
schedulerUpdatePolicy
(
atoi
(
value
));
code
=
schedulerUpdatePolicy
(
atoi
(
value
));
}
else
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_ENABLE_RESCHEDULE
))
{
}
else
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_ENABLE_RESCHEDULE
))
{
code
=
schedulerEnableReSchedule
(
atoi
(
value
));
code
=
schedulerEnableReSchedule
(
atoi
(
value
));
}
else
if
(
0
==
strcasecmp
(
cmd
,
COMMAND_CATALOG_DEBUG
))
{
code
=
ctgdHandleDbgCommand
(
value
);
}
else
{
}
else
{
goto
_return
;
goto
_return
;
}
}
...
...
source/libs/scheduler/src/schRemote.c
浏览文件 @
2bf5b040
...
@@ -280,7 +280,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
...
@@ -280,7 +280,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
}
}
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
rsp
->
affectedRows
);
atomic_add_fetch_32
(
&
pJob
->
resNumOfRows
,
rsp
->
affectedRows
);
SCH_TASK_DLOG
(
"submit succeed, affectedRows:%d
"
,
rsp
->
affectedRow
s
);
SCH_TASK_DLOG
(
"submit succeed, affectedRows:%d
, blocks:%d"
,
rsp
->
affectedRows
,
rsp
->
nBlock
s
);
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
SCH_LOCK
(
SCH_WRITE
,
&
pJob
->
resLock
);
if
(
pJob
->
execRes
.
res
)
{
if
(
pJob
->
execRes
.
res
)
{
...
...
source/util/src/terror.c
浏览文件 @
2bf5b040
...
@@ -340,7 +340,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last ro
...
@@ -340,7 +340,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last ro
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TABLE_NOT_EXIST
,
"Table not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TABLE_NOT_EXIST
,
"Table not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_STB_ALREADY_EXIST
,
"Stable already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_STB_ALREADY_EXIST
,
"Stable already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_STB_NOT_EXIST
,
"Stable not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_STB_NOT_EXIST
,
"Stable not exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_
TABLE_RECREATED
,
"Table re-create
d"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_
INVALID_TABLE_SCHEMA_VER
,
"Table schema is ol
d"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR
,
"TDB env open error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR
,
"TDB env open error"
)
// query
// query
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录