Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7a39255e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7a39255e
编写于
12月 21, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/vnode
上级
897b5c59
e35e4c39
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
255 addition
and
424 deletion
+255
-424
include/util/taoserror.h
include/util/taoserror.h
+4
-0
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+26
-21
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+11
-10
source/client/src/clientMain.c
source/client/src/clientMain.c
+3
-8
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+15
-354
source/client/src/tscEnv.c
source/client/src/tscEnv.c
+2
-5
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+1
-1
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+1
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+7
-3
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+9
-5
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+6
-2
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+6
-2
source/libs/index/inc/index_tfile.h
source/libs/index/inc/index_tfile.h
+3
-3
source/libs/index/src/index.c
source/libs/index/src/index.c
+7
-3
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+1
-1
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+60
-4
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+8
-1
source/libs/parser/inc/astToMsg.h
source/libs/parser/inc/astToMsg.h
+1
-0
source/libs/parser/src/astToMsg.c
source/libs/parser/src/astToMsg.c
+44
-0
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+36
-0
source/util/src/terror.c
source/util/src/terror.c
+4
-0
未找到文件。
include/util/taoserror.h
浏览文件 @
7a39255e
...
...
@@ -230,6 +230,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5)
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6)
// mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
// dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
...
...
source/client/inc/clientInt.h
浏览文件 @
7a39255e
...
...
@@ -86,26 +86,32 @@ typedef struct STscObj {
SAppInstInfo
*
pAppInfo
;
}
STscObj
;
typedef
struct
S
Client
ResultInfo
{
const
char
*
pMsg
;
typedef
struct
S
Req
ResultInfo
{
const
char
*
p
Rsp
Msg
;
const
char
*
pData
;
TAOS_FIELD
*
fields
;
int32_t
numOfCols
;
int32_t
numOfRows
;
int32_t
current
;
uint32_t
numOfCols
;
int32_t
*
length
;
TAOS_ROW
row
;
char
**
pCol
;
}
SClientResultInfo
;
typedef
struct
SReqBody
{
tsem_t
rspSem
;
// not used now
void
*
fp
;
void
*
param
;
int32_t
paramLen
;
int64_t
execId
;
// showId/queryId
SClientResultInfo
*
pResInfo
;
}
SRequestBody
;
uint32_t
numOfRows
;
uint32_t
current
;
}
SReqResultInfo
;
typedef
struct
SReqMsg
{
void
*
pMsg
;
uint32_t
len
;
}
SReqMsgInfo
;
typedef
struct
SRequestSendRecvBody
{
tsem_t
rspSem
;
// not used now
void
*
fp
;
int64_t
execId
;
// showId/queryId
SReqMsgInfo
requestMsg
;
SReqResultInfo
resInfo
;
}
SRequestSendRecvBody
;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
...
...
@@ -115,7 +121,7 @@ typedef struct SRequestObj {
STscObj
*
pTscObj
;
SQueryExecMetric
metric
;
char
*
sqlstr
;
// sql string
SRequestBody
body
;
SRequest
SendRecv
Body
body
;
int64_t
self
;
char
*
msgBuf
;
int32_t
code
;
...
...
@@ -123,11 +129,10 @@ typedef struct SRequestObj {
}
SRequestObj
;
typedef
struct
SRequestMsgBody
{
int32_t
msgType
;
void
*
pData
;
int32_t
msgLen
;
uint64_t
requestId
;
uint64_t
requestObjRefId
;
int32_t
msgType
;
SReqMsgInfo
msgInfo
;
uint64_t
requestId
;
uint64_t
requestObjRefId
;
}
SRequestMsgBody
;
extern
SAppInfo
appInfo
;
...
...
@@ -158,7 +163,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
);
void
*
doFetchRow
(
SRequestObj
*
pRequest
);
void
setResultDataPtr
(
S
Client
ResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
void
setResultDataPtr
(
S
Req
ResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
);
#ifdef __cplusplus
}
...
...
source/client/src/clientImpl.c
浏览文件 @
7a39255e
...
...
@@ -155,8 +155,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
code
=
qParseQuerySql
(
pRequest
->
sqlstr
,
sqlLen
,
pRequest
->
requestId
,
&
type
,
&
output
,
&
outputLen
,
pRequest
->
msgBuf
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
if
(
type
==
TSDB_SQL_CREATE_USER
||
type
==
TSDB_SQL_SHOW
||
type
==
TSDB_SQL_DROP_USER
||
type
==
TSDB_SQL_DROP_ACCT
||
type
==
TSDB_SQL_CREATE_DB
||
type
==
TSDB_SQL_CREATE_ACCT
)
{
pRequest
->
type
=
type
;
pRequest
->
body
.
param
=
output
;
pRequest
->
body
.
paramLen
=
outputLen
;
pRequest
->
body
.
requestMsg
=
(
SReqMsgInfo
){.
pMsg
=
output
,
.
len
=
outputLen
};
SRequestMsgBody
body
=
{
0
};
buildRequestMsgFp
[
type
](
pRequest
,
&
body
);
...
...
@@ -165,6 +164,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
sendMsgToServer
(
pTscObj
->
pTransporter
,
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
body
,
&
transporterId
);
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
destroyRequestMsgBody
(
&
body
);
}
else
{
assert
(
0
);
...
...
@@ -255,7 +256,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
static
int32_t
buildConnectMsg
(
SRequestObj
*
pRequest
,
SRequestMsgBody
*
pMsgBody
)
{
pMsgBody
->
msgType
=
TSDB_MSG_TYPE_CONNECT
;
pMsgBody
->
msg
Len
=
sizeof
(
SConnectMsg
);
pMsgBody
->
msg
Info
.
len
=
sizeof
(
SConnectMsg
);
pMsgBody
->
requestObjRefId
=
pRequest
->
self
;
SConnectMsg
*
pConnect
=
calloc
(
1
,
sizeof
(
SConnectMsg
));
...
...
@@ -279,28 +280,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
pConnect
->
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
pConnect
->
app
,
appInfo
.
appName
,
tListLen
(
pConnect
->
app
));
pMsgBody
->
pData
=
pConnect
;
pMsgBody
->
msgInfo
.
pMsg
=
pConnect
;
return
0
;
}
static
void
destroyRequestMsgBody
(
SRequestMsgBody
*
pMsgBody
)
{
assert
(
pMsgBody
!=
NULL
);
tfree
(
pMsgBody
->
pData
);
tfree
(
pMsgBody
->
msgInfo
.
pMsg
);
}
int32_t
sendMsgToServer
(
void
*
pTransporter
,
SEpSet
*
epSet
,
const
SRequestMsgBody
*
pBody
,
int64_t
*
pTransporterId
)
{
char
*
pMsg
=
rpcMallocCont
(
pBody
->
msg
L
en
);
char
*
pMsg
=
rpcMallocCont
(
pBody
->
msg
Info
.
l
en
);
if
(
NULL
==
pMsg
)
{
tscError
(
"0x%"
PRIx64
" msg:%s malloc failed"
,
pBody
->
requestId
,
taosMsg
[
pBody
->
msgType
]);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
(
pMsg
,
pBody
->
pData
,
pBody
->
msgL
en
);
memcpy
(
pMsg
,
pBody
->
msgInfo
.
pMsg
,
pBody
->
msgInfo
.
l
en
);
SRpcMsg
rpcMsg
=
{
.
msgType
=
pBody
->
msgType
,
.
pCont
=
pMsg
,
.
contLen
=
pBody
->
msg
L
en
,
.
contLen
=
pBody
->
msg
Info
.
l
en
,
.
ahandle
=
(
void
*
)
pBody
->
requestObjRefId
,
.
handle
=
NULL
,
.
code
=
0
...
...
@@ -388,7 +389,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c
void
*
doFetchRow
(
SRequestObj
*
pRequest
)
{
assert
(
pRequest
!=
NULL
);
S
ClientResultInfo
*
pResultInfo
=
pRequest
->
body
.
pR
esInfo
;
S
ReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
r
esInfo
;
if
(
pResultInfo
->
pData
==
NULL
||
pResultInfo
->
current
>=
pResultInfo
->
numOfRows
)
{
pRequest
->
type
=
TSDB_SQL_RETRIEVE_MNODE
;
...
...
@@ -421,7 +422,7 @@ void* doFetchRow(SRequestObj* pRequest) {
return
pResultInfo
->
row
;
}
void
setResultDataPtr
(
S
Client
ResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
)
{
void
setResultDataPtr
(
S
Req
ResultInfo
*
pResultInfo
,
TAOS_FIELD
*
pFields
,
int32_t
numOfCols
,
int32_t
numOfRows
)
{
assert
(
numOfCols
>
0
&&
pFields
!=
NULL
&&
pResultInfo
!=
NULL
);
if
(
numOfRows
==
0
)
{
return
;
...
...
source/client/src/clientMain.c
浏览文件 @
7a39255e
...
...
@@ -115,12 +115,7 @@ int taos_field_count(TAOS_RES *res) {
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SClientResultInfo
*
pResInfo
=
pRequest
->
body
.
pResInfo
;
if
(
pResInfo
==
NULL
)
{
return
0
;
}
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
return
pResInfo
->
numOfCols
;
}
...
...
@@ -133,7 +128,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return
NULL
;
}
S
ClientResultInfo
*
pResInfo
=
((
SRequestObj
*
)
res
)
->
body
.
pResInfo
;
S
ReqResultInfo
*
pResInfo
=
&
(((
SRequestObj
*
)
res
)
->
body
.
resInfo
)
;
return
pResInfo
->
fields
;
}
...
...
@@ -248,7 +243,7 @@ int* taos_fetch_lengths(TAOS_RES *res) {
return
NULL
;
}
return
((
SRequestObj
*
)
res
)
->
body
.
pResInfo
->
length
;
return
((
SRequestObj
*
)
res
)
->
body
.
resInfo
.
length
;
}
const
char
*
taos_data_type
(
int
type
)
{
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
7a39255e
...
...
@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <catalog.h>
#include <tname.h>
#include "os.h"
#include "catalog.h"
#include "tname.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "tmsgtype.h"
#include "trpc.h"
...
...
@@ -29,16 +29,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId);
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) {
int32_t initial = 100; // 100 ms by default
if (count <= 1) {
return 0;
}
return initial * ((2u)<<(count - 2));
}
static int32_t vgIdCompare(const void *lhs, const void *rhs) {
int32_t left = *(int32_t *)lhs;
int32_t right = *(int32_t *)rhs;
...
...
@@ -298,36 +288,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
taosReleaseRef(tscRefId, rid);
}
int tscSendMsgToServer(SSqlObj *pSql) {
STscObj* pObj = pSql->pTscObj;
SSqlCmd* pCmd = &pSql->cmd;
char *pMsg = rpcMallocCont(pCmd->payloadLen);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pSql->self, taosMsg[pSql->cmd.msgType]);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
// set the mgmt ip list
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
tscDumpMgmtEpSet(pSql);
}
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
SRpcMsg rpcMsg = {
.msgType = pSql->cmd.msgType,
.pCont = pMsg,
.contLen = pSql->cmd.payloadLen,
.ahandle = (void*)pSql->self,
.handle = NULL,
.code = 0
};
rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid);
return TSDB_CODE_SUCCESS;
}
// handle three situation
// 1. epset retry, only return last failure ep
// 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn
...
...
@@ -354,176 +314,6 @@ void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) {
}
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
if (pSql == NULL) {
rpcFreeCont(rpcMsg->pCont);
return;
}
assert(pSql->self == handle);
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pSql->rpcRid = -1;
if (pObj->signature != pObj) {
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
taosRemoveRef(tscObjRef, handle);
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
if (pEpSet) {
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
if (pCmd->command < TSDB_SQL_MGMT) {
tscUpdateVgroupInfo(pSql, pEpSet);
} else {
tscUpdateMgmtEpSet(pSql, pEpSet);
}
}
}
int32_t cmd = pCmd->command;
// set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
pSql->cmd.insertParam.schemaAttached = 1;
}
// single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAG_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
// 1. super table subquery
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
pSql->res.code = rpcMsg->code; // keep the previous error code
if (pSql->retry > pSql->maxRetry) {
tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
} else {
// wait for a little bit moment and then retry
// todo do not sleep in rpc callback thread, add this process into queue to process
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
int32_t duration = getWaitingTimeInterval(pSql->retry);
taosMsleep(duration);
}
pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql, 0);
// if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
return;
}
}
}
}
pRes->rspLen = 0;
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
} else {
pRes->code = rpcMsg->code;
}
if (pRes->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
pSql->retry = 0;
}
if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
assert(rpcMsg->msgType == pCmd->msgType + 1);
pRes->code = rpcMsg->code;
pRes->rspType = rpcMsg->msgType;
pRes->rspLen = rpcMsg->contLen;
if (pRes->rspLen > 0 && rpcMsg->pCont) {
char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
if (tmp == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
} else {
pRes->pRsp = tmp;
memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
}
} else {
tfree(pRes->pRsp);
}
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
pMsg->code = htonl(pMsg->code);
pMsg->numOfRows = htonl(pMsg->numOfRows);
pMsg->affectedRows = htonl(pMsg->affectedRows);
pMsg->failedRows = htonl(pMsg->failedRows);
pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
pRes->numOfRows += pMsg->affectedRows;
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
} else {
tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
}
}
if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
}
bool shouldFree = tscShouldBeFreed(pSql);
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
pRes->code = rpcMsg->code;
}
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) {
tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64);
tscSetFqdnErrorMsg(pSql, pEpSet);
}
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
}
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
taosRemoveRef(tscObjRef, handle);
}
taosReleaseRef(tscObjRef, handle);
rpcFreeCont(rpcMsg->pCont);
}
int doBuildAndSendMsg(SSqlObj *pSql) {
SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res;
...
...
@@ -2987,51 +2777,6 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
return code;
}
static void freeElem(void* p) {
tfree(*(char**)p);
}
/**
* retrieve table meta from mnode, and then update the local table meta hashmap.
* @param pSql sql object
* @param tableIndex table index
* @return status code
*/
int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex);
char name[TSDB_TABLE_FNAME_LEN] = {0};
int32_t code = tNameExtractFullName(&pTableMetaInfo->name, name);
if (code != TSDB_CODE_SUCCESS) {
tscError("0x%"PRIx64" failed to generate the table full name", pSql->self);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (pTableMeta) {
tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRIu64, pSql->self, name,
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
}
// remove stored tableMeta info in hash table
tscResetSqlCmd(pCmd, true, pSql->self);
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name);
taosArrayPush(pNameList, &n);
code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(pNameList, freeElem);
taosArrayDestroyEx(vgroupList, freeElem);
return code;
}
static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
...
...
@@ -3044,58 +2789,11 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) {
return true;
}
int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) {
int32_t code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
if (allVgroupInfoRetrieved(pQueryInfo)) {
return TSDB_CODE_SUCCESS;
}
SSqlObj *pNew = calloc(1, sizeof(SSqlObj));
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->cmd.command = TSDB_SQL_STABLEVGROUP;
// TODO TEST IT
SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd);
if (pNewQueryInfo == NULL) {
tscFreeSqlObj(pNew);
return code;
}
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i);
STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta);
tscAddTableMetaInfo(pNewQueryInfo, &pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables);
}
if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pNew);
return code;
}
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
registerSqlObj(pNew);
tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self);
pSql->svgroupRid = pNew->self;
tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables);
pNew->fp = tscTableMetaCallBack;
pNew->param = (void *)pSql->self;
code = tscBuildAndSendRequest(pNew, NULL);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
return code;
}
#endif
int32_t
buildConnectMsg
(
SRequestObj
*
pRequest
,
SRequestMsgBody
*
pMsgBody
)
{
pMsgBody
->
msgType
=
TSDB_MSG_TYPE_CONNECT
;
pMsgBody
->
msg
Len
=
sizeof
(
SConnectMsg
);
pMsgBody
->
msg
Info
.
len
=
sizeof
(
SConnectMsg
);
pMsgBody
->
requestObjRefId
=
pRequest
->
self
;
SConnectMsg
*
pConnect
=
calloc
(
1
,
sizeof
(
SConnectMsg
));
...
...
@@ -3119,7 +2817,7 @@ int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pConnect
->
startTime
=
htobe64
(
appInfo
.
startTime
);
tstrncpy
(
pConnect
->
app
,
appInfo
.
appName
,
tListLen
(
pConnect
->
app
));
pMsgBody
->
pData
=
pConnect
;
pMsgBody
->
msgInfo
.
pMsg
=
pConnect
;
return
0
;
}
...
...
@@ -3160,17 +2858,14 @@ int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
pTscObj
->
pAppInfo
->
clusterId
=
pConnect
->
clusterId
;
atomic_add_fetch_64
(
&
pTscObj
->
pAppInfo
->
numOfConns
,
1
);
pRequest
->
body
.
pResInfo
=
calloc
(
1
,
sizeof
(
SClientResultInfo
));
pRequest
->
body
.
pResInfo
->
pMsg
=
pMsg
;
pRequest
->
body
.
resInfo
.
pRspMsg
=
pMsg
;
tscDebug
(
"0x%"
PRIx64
" clusterId:%d, totalConn:%"
PRId64
,
pRequest
->
requestId
,
pConnect
->
clusterId
,
pTscObj
->
pAppInfo
->
numOfConns
);
return
0
;
}
int32_t
doBuildMsgSupp
(
SRequestObj
*
pRequest
,
SRequestMsgBody
*
pMsgBody
)
{
pMsgBody
->
requestObjRefId
=
pRequest
->
self
;
pMsgBody
->
msgLen
=
pRequest
->
body
.
paramLen
;
pMsgBody
->
pData
=
pRequest
->
body
.
param
;
pMsgBody
->
msgInfo
=
pRequest
->
body
.
requestMsg
;
switch
(
pRequest
->
type
)
{
case
TSDB_SQL_CREATE_USER
:
...
...
@@ -3188,7 +2883,7 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
case
TSDB_SQL_CREATE_DB
:
{
pMsgBody
->
msgType
=
TSDB_MSG_TYPE_CREATE_DB
;
SCreateDbMsg
*
pCreateMsg
=
pRequest
->
body
.
param
;
SCreateDbMsg
*
pCreateMsg
=
pRequest
->
body
.
requestMsg
.
pMsg
;
SName
name
=
{
0
};
int32_t
ret
=
tNameSetDbName
(
&
name
,
pRequest
->
pTscObj
->
acctId
,
pCreateMsg
->
db
,
strnlen
(
pCreateMsg
->
db
,
tListLen
(
pCreateMsg
->
db
)));
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -3205,36 +2900,6 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
}
}
STableMeta
*
createTableMetaFromMsg
(
STableMetaMsg
*
pTableMetaMsg
)
{
assert
(
pTableMetaMsg
!=
NULL
&&
pTableMetaMsg
->
numOfColumns
>=
2
);
size_t
schemaSize
=
(
pTableMetaMsg
->
numOfColumns
+
pTableMetaMsg
->
numOfTags
)
*
sizeof
(
SSchema
);
STableMeta
*
pTableMeta
=
calloc
(
1
,
sizeof
(
STableMeta
)
+
schemaSize
);
pTableMeta
->
tableType
=
pTableMetaMsg
->
tableType
;
pTableMeta
->
vgId
=
pTableMetaMsg
->
vgId
;
pTableMeta
->
suid
=
pTableMetaMsg
->
suid
;
pTableMeta
->
uid
=
pTableMetaMsg
->
tuid
;
pTableMeta
->
tableInfo
=
(
STableComInfo
)
{
.
numOfTags
=
pTableMetaMsg
->
numOfTags
,
.
precision
=
pTableMetaMsg
->
precision
,
.
numOfColumns
=
pTableMetaMsg
->
numOfColumns
,
};
pTableMeta
->
sversion
=
pTableMetaMsg
->
sversion
;
pTableMeta
->
tversion
=
pTableMetaMsg
->
tversion
;
memcpy
(
pTableMeta
->
schema
,
pTableMetaMsg
->
pSchema
,
schemaSize
);
int32_t
numOfTotalCols
=
pTableMeta
->
tableInfo
.
numOfColumns
;
for
(
int32_t
i
=
0
;
i
<
numOfTotalCols
;
++
i
)
{
pTableMeta
->
tableInfo
.
rowSize
+=
pTableMeta
->
schema
[
i
].
bytes
;
}
return
pTableMeta
;
}
int32_t
processShowRsp
(
SRequestObj
*
pRequest
,
const
char
*
pMsg
,
int32_t
msgLen
)
{
SShowRsp
*
pShow
=
(
SShowRsp
*
)
pMsg
;
pShow
->
showId
=
htonl
(
pShow
->
showId
);
...
...
@@ -3257,12 +2922,8 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
pFields
[
i
].
bytes
=
pSchema
[
i
].
bytes
;
}
if
(
pRequest
->
body
.
pResInfo
==
NULL
)
{
pRequest
->
body
.
pResInfo
=
calloc
(
1
,
sizeof
(
SClientResultInfo
));
}
pRequest
->
body
.
pResInfo
->
pMsg
=
pMsg
;
SClientResultInfo
*
pResInfo
=
pRequest
->
body
.
pResInfo
;
pRequest
->
body
.
resInfo
.
pRspMsg
=
pMsg
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
pResInfo
->
fields
=
pFields
;
pResInfo
->
numOfCols
=
pMetaMsg
->
numOfColumns
;
...
...
@@ -3276,27 +2937,27 @@ int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen)
int
buildRetrieveMnodeMsg
(
SRequestObj
*
pRequest
,
SRequestMsgBody
*
pMsgBody
)
{
pMsgBody
->
msgType
=
TSDB_MSG_TYPE_SHOW_RETRIEVE
;
pMsgBody
->
msg
L
en
=
sizeof
(
SRetrieveTableMsg
);
pMsgBody
->
msg
Info
.
l
en
=
sizeof
(
SRetrieveTableMsg
);
pMsgBody
->
requestObjRefId
=
pRequest
->
self
;
SRetrieveTableMsg
*
pRetrieveMsg
=
calloc
(
1
,
sizeof
(
SRetrieveTableMsg
));
pRetrieveMsg
->
showId
=
htonl
(
pRequest
->
body
.
execId
);
pMsgBody
->
pData
=
pRetrieveMsg
;
pMsgBody
->
msgInfo
.
pMsg
=
pRetrieveMsg
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
processRetrieveMnodeRsp
(
SRequestObj
*
pRequest
,
const
char
*
pMsg
,
int32_t
msgLen
)
{
assert
(
msgLen
>=
sizeof
(
SRetrieveTableRsp
));
tfree
(
pRequest
->
body
.
pResInfo
->
pMsg
);
pRequest
->
body
.
pResInfo
->
pMsg
=
pMsg
;
tfree
(
pRequest
->
body
.
resInfo
.
pRs
pMsg
);
pRequest
->
body
.
resInfo
.
pRs
pMsg
=
pMsg
;
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
pMsg
;
pRetrieve
->
numOfRows
=
htonl
(
pRetrieve
->
numOfRows
);
pRetrieve
->
precision
=
htons
(
pRetrieve
->
precision
);
S
ClientResultInfo
*
pResInfo
=
pRequest
->
body
.
pR
esInfo
;
S
ReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
r
esInfo
;
pResInfo
->
numOfRows
=
pRetrieve
->
numOfRows
;
pResInfo
->
pData
=
pRetrieve
->
data
;
// todo fix this in async model
...
...
source/client/src/tscEnv.c
浏览文件 @
7a39255e
...
...
@@ -170,7 +170,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
pRequest
->
type
=
type
;
pRequest
->
pTscObj
=
pObj
;
pRequest
->
body
.
fp
=
fp
;
pRequest
->
body
.
param
=
param
;
// pRequest->body.requestMsg.
= param;
pRequest
->
msgBuf
=
calloc
(
1
,
ERROR_MSG_BUF_DEFAULT_SIZE
);
tsem_init
(
&
pRequest
->
body
.
rspSem
,
0
,
0
);
...
...
@@ -188,10 +188,7 @@ static void doDestroyRequest(void* p) {
tfree
(
pRequest
->
sqlstr
);
tfree
(
pRequest
->
pInfo
);
if
(
pRequest
->
body
.
pResInfo
!=
NULL
)
{
tfree
(
pRequest
->
body
.
pResInfo
->
pMsg
);
tfree
(
pRequest
->
body
.
pResInfo
);
}
tfree
(
pRequest
->
body
.
resInfo
.
pRspMsg
);
deregisterRequest
(
pRequest
);
tfree
(
pRequest
);
...
...
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
7a39255e
...
...
@@ -71,7 +71,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
}
static
SSdbRaw
*
mndAcctActionEncode
(
SAcctObj
*
pAcct
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_ACCT
,
TSDB_ACCT_VER_NUMBER
,
sizeof
(
SAcctObj
));
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_ACCT
,
TSDB_ACCT_VER_NUMBER
,
sizeof
(
SAcctObj
)
+
TSDB_ACCT_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
7a39255e
...
...
@@ -63,7 +63,7 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
}
static
SSdbRaw
*
mndClusterActionEncode
(
SClusterObj
*
pCluster
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CLUSTER
,
TSDB_CLUSTER_VER_NUMBE
,
sizeof
(
SClusterObj
));
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CLUSTER
,
TSDB_CLUSTER_VER_NUMBE
,
sizeof
(
SClusterObj
)
+
TSDB_CLUSTER_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
7a39255e
...
...
@@ -61,8 +61,12 @@ int32_t mndInitMnode(SMnode *pMnode) {
void
mndCleanupMnode
(
SMnode
*
pMnode
)
{}
static
SMnodeObj
*
mndAcquireMnode
(
SMnode
*
pMnode
,
int32_t
mnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_MNODE
,
&
mnodeId
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMnodeObj
*
pObj
=
sdbAcquire
(
pSdb
,
SDB_MNODE
,
&
mnodeId
);
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_MND_MNODE_NOT_EXIST
;
}
return
pObj
;
}
static
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
pMnodeObj
)
{
...
...
@@ -98,7 +102,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
}
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pMnodeObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_MNODE
,
TSDB_MNODE_VER_NUMBER
,
sizeof
(
SMnodeObj
));
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_MNODE
,
TSDB_MNODE_VER_NUMBER
,
sizeof
(
SMnodeObj
)
+
TSDB_MNODE_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
7a39255e
...
...
@@ -23,7 +23,7 @@
#include "mndUser.h"
#include "tname.h"
#define TSDB_STB_VER_NUM 1
#define TSDB_STB_VER_NUM
BER
1
#define TSDB_STB_RESERVE_SIZE 64
static
SSdbRaw
*
mndStbActionEncode
(
SStbObj
*
pStb
);
...
...
@@ -70,7 +70,7 @@ void mndCleanupStb(SMnode *pMnode) {}
static
SSdbRaw
*
mndStbActionEncode
(
SStbObj
*
pStb
)
{
int32_t
size
=
sizeof
(
SStbObj
)
+
(
pStb
->
numOfColumns
+
pStb
->
numOfTags
)
*
sizeof
(
SSchema
)
+
TSDB_STB_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_STB
,
TSDB_STB_VER_NUM
,
size
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_STB
,
TSDB_STB_VER_NUM
BER
,
size
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
...
...
@@ -103,7 +103,7 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_STB_VER_NUM
)
{
if
(
sver
!=
TSDB_STB_VER_NUM
BER
)
{
mError
(
"failed to decode stable since %s"
,
terrstr
());
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
return
NULL
;
...
...
@@ -176,8 +176,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
}
SStbObj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_STB
,
stbName
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SStbObj
*
pStb
=
sdbAcquire
(
pSdb
,
SDB_STB
,
stbName
);
if
(
pStb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
}
return
pStb
;
}
void
mndReleaseStb
(
SMnode
*
pMnode
,
SStbObj
*
pStb
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
7a39255e
...
...
@@ -317,8 +317,12 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewT
}
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
transId
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
STrans
*
pTrans
=
sdbAcquire
(
pSdb
,
SDB_TRANS
,
&
transId
);
if
(
pTrans
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TRANS_NOT_EXIST
;
}
return
pTrans
;
}
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
7a39255e
...
...
@@ -175,8 +175,12 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNe
}
SUserObj
*
mndAcquireUser
(
SMnode
*
pMnode
,
char
*
userName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_USER
,
userName
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SUserObj
*
pUser
=
sdbAcquire
(
pSdb
,
SDB_USER
,
userName
);
if
(
pUser
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
}
return
pUser
;
}
void
mndReleaseUser
(
SMnode
*
pMnode
,
SUserObj
*
pUser
)
{
...
...
source/libs/index/inc/index_tfile.h
浏览文件 @
7a39255e
...
...
@@ -59,7 +59,7 @@ typedef struct TFileReader {
typedef
struct
IndexTFile
{
char
*
path
;
TFile
Reader
*
tb
;
TFile
Cache
*
cache
;
TFileWriter
*
tw
;
}
IndexTFile
;
...
...
@@ -79,14 +79,14 @@ typedef struct TFileReaderOpt {
}
TFileReaderOpt
;
// tfile cache
TFileCache
*
tfileCacheCreate
();
TFileCache
*
tfileCacheCreate
(
const
char
*
path
);
void
tfileCacheDestroy
(
TFileCache
*
tcache
);
TFileReader
*
tfileCacheGet
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
);
void
tfileCachePut
(
TFileCache
*
tcache
,
TFileCacheKey
*
key
,
TFileReader
*
reader
);
TFileWriter
*
tfileWriterCreate
(
const
char
*
suid
,
const
char
*
colName
);
IndexTFile
*
indexTFileCreate
();
IndexTFile
*
indexTFileCreate
(
const
char
*
path
);
int
indexTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
);
...
...
source/libs/index/src/index.c
浏览文件 @
7a39255e
...
...
@@ -333,13 +333,17 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType
//refactor, merge interResults into fResults by oType
SArray
*
first
=
taosArrayGetP
(
interResults
,
0
);
taosArraySort
(
first
,
uidCompare
);
taosArrayRemoveDuplicate
(
first
,
uidCompare
,
NULL
);
if
(
oType
==
MUST
)
{
// just one column index, enhance later
taosArrayAddAll
(
fResults
,
first
);
}
else
if
(
oType
==
SHOULD
)
{
// just one column index, enhance later
taosArrayAddAll
(
fResults
,
first
);
// tag1 condistion || tag2 condition
}
else
if
(
oType
==
NOT
)
{
// just one column index, enhance later
taosArrayAddAll
(
fResults
,
first
);
// not use currently
}
return
0
;
...
...
source/libs/index/src/index_cache.c
浏览文件 @
7a39255e
...
...
@@ -147,7 +147,7 @@ int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t
char
*
buf
=
calloc
(
1
,
keyLen
);
if
(
qtype
==
QUERY_TERM
)
{
}
else
if
(
qtype
==
QUERY_PREFIX
)
{
}
else
if
(
qtype
==
QUERY_SUFFIX
)
{
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
7a39255e
...
...
@@ -13,12 +13,42 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <dirent.h>
#include "index_tfile.h"
#include "index_fst.h"
#include "index_util.h"
// tfile name suid-colId-version.tindex
static
int
tfileGetFileList
(
const
char
*
path
,
SArray
*
result
)
{
DIR
*
dir
=
opendir
(
path
);
if
(
NULL
==
dir
)
{
return
-
1
;
}
struct
dirent
*
entry
;
while
((
entry
=
readdir
(
dir
))
!=
NULL
)
{
size_t
len
=
strlen
(
entry
->
d_name
);
char
*
buf
=
calloc
(
1
,
len
+
1
);
memcpy
(
buf
,
entry
->
d_name
,
len
);
taosArrayPush
(
result
,
&
buf
);
}
closedir
(
dir
);
return
0
;
}
static
int
tfileCompare
(
const
void
*
a
,
const
void
*
b
)
{
const
char
*
aName
=
*
(
char
**
)
a
;
const
char
*
bName
=
*
(
char
**
)
b
;
size_t
aLen
=
strlen
(
aName
);
size_t
bLen
=
strlen
(
bName
);
return
strncmp
(
aName
,
bName
,
aLen
>
bLen
?
aLen
:
bLen
);
}
static
int
tfileParseFileName
(
const
char
*
filename
,
uint64_t
*
suid
,
int
*
colId
,
int
*
version
)
{
if
(
3
==
sscanf
(
filename
,
"%"
PRIu64
"-%d-%d.tindex"
,
suid
,
colId
,
version
))
{
// read suid & colid & version success
return
0
;
}
return
-
1
;
}
static
void
tfileSerialCacheKey
(
TFileCacheKey
*
key
,
char
*
buf
)
{
SERIALIZE_MEM_TO_BUF
(
buf
,
key
,
suid
);
SERIALIZE_VAR_TO_BUF
(
buf
,
'_'
,
char
);
...
...
@@ -29,12 +59,28 @@ static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
SERIALIZE_STR_MEM_TO_BUF
(
buf
,
key
,
colName
,
key
->
nColName
);
}
TFileCache
*
tfileCacheCreate
()
{
TFileCache
*
tfileCacheCreate
(
const
char
*
path
)
{
TFileCache
*
tcache
=
calloc
(
1
,
sizeof
(
TFileCache
));
if
(
tcache
==
NULL
)
{
return
NULL
;
}
tcache
->
tableCache
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
tcache
->
capacity
=
64
;
SArray
*
files
=
taosArrayInit
(
4
,
sizeof
(
void
*
));
tfileGetFileList
(
path
,
files
);
taosArraySort
(
files
,
tfileCompare
);
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
files
);
i
++
)
{
char
*
file
=
taosArrayGetP
(
files
,
i
);
uint64_t
suid
;
int
colId
,
version
;
if
(
0
!=
tfileParseFileName
(
file
,
&
suid
,
&
colId
,
&
version
))
{
// invalid file, just skip
continue
;
}
free
((
void
*
)
file
);
}
taosArrayDestroy
(
files
);
return
tcache
;
}
void
tfileCacheDestroy
(
TFileCache
*
tcache
)
{
...
...
@@ -59,8 +105,11 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
IndexTFile
*
indexTFileCreate
()
{
IndexTFile
*
indexTFileCreate
(
const
char
*
path
)
{
IndexTFile
*
tfile
=
calloc
(
1
,
sizeof
(
IndexTFile
));
tfile
->
cache
=
tfileCacheCreate
(
path
);
return
tfile
;
}
void
IndexTFileDestroy
(
IndexTFile
*
tfile
)
{
...
...
@@ -69,8 +118,15 @@ void IndexTFileDestroy(IndexTFile *tfile) {
int
indexTFileSearch
(
void
*
tfile
,
SIndexTermQuery
*
query
,
SArray
*
result
)
{
IndexTFile
*
ptfile
=
(
IndexTFile
*
)
tfile
;
IndexTFile
*
pTfile
=
(
IndexTFile
*
)
tfile
;
SIndexTerm
*
term
=
query
->
term
;
TFileCacheKey
key
=
{.
suid
=
term
->
suid
,
.
colType
=
term
->
colType
,
.
version
=
0
,
.
colName
=
term
->
colName
,
.
nColName
=
term
->
nColName
};
TFileReader
*
reader
=
tfileCacheGet
(
pTfile
->
cache
,
&
key
);
return
0
;
}
int
indexTFilePut
(
void
*
tfile
,
SIndexTerm
*
term
,
uint64_t
uid
)
{
...
...
source/libs/index/test/indexTests.cc
浏览文件 @
7a39255e
...
...
@@ -26,6 +26,7 @@
class
FstWriter
{
public:
FstWriter
()
{
_wc
=
writerCtxCreate
(
TFile
,
"/tmp/tindex"
,
false
,
0
);
_b
=
fstBuilderCreate
(
NULL
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
...
...
@@ -37,15 +38,19 @@ class FstWriter {
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
writerCtxDestroy
(
_wc
);
}
private:
FstBuilder
*
_b
;
WriterCtx
*
_wc
;
};
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
)
{
_w
=
fstCountingWriterCreate
(
NULL
);
_wc
=
writerCtxCreate
(
TFile
,
"/tmp/tindex"
,
true
,
0
);
_w
=
fstCountingWriterCreate
(
_wc
);
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
}
...
...
@@ -94,12 +99,14 @@ class FstReadMemory {
fstCountingWriterDestroy
(
_w
);
fstDestroy
(
_fst
);
fstSliceDestroy
(
&
_s
);
writerCtxDestroy
(
_wc
);
}
private:
FstCountingWriter
*
_w
;
Fst
*
_fst
;
FstSlice
_s
;
WriterCtx
*
_wc
;
size_t
_size
;
};
...
...
source/libs/parser/inc/astToMsg.h
浏览文件 @
7a39255e
...
...
@@ -5,6 +5,7 @@
#include "taosmsg.h"
SCreateUserMsg
*
buildUserManipulationMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
outputLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SCreateAcctMsg
*
buildAcctManipulationMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
outputLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SDropUserMsg
*
buildDropUserMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
outputLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SShowMsg
*
buildShowMsg
(
SShowInfo
*
pShowInfo
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
);
SCreateDbMsg
*
buildCreateDbMsg
(
SCreateDbInfo
*
pCreateDbInfo
,
char
*
msgBuf
,
int32_t
msgLen
);
...
...
source/libs/parser/src/astToMsg.c
浏览文件 @
7a39255e
...
...
@@ -24,6 +24,50 @@ SCreateUserMsg* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, in
return
pMsg
;
}
SCreateAcctMsg
*
buildAcctManipulationMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
outputLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgLen
)
{
SCreateAcctMsg
*
pMsg
=
(
SCreateAcctMsg
*
)
calloc
(
1
,
sizeof
(
SCreateAcctMsg
));
if
(
pMsg
==
NULL
)
{
// tscError("0x%" PRIx64 " failed to malloc for query msg", id);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
}
SCreateAcctMsg
*
pCreateMsg
=
(
SCreateAcctMsg
*
)
calloc
(
1
,
sizeof
(
SCreateAcctMsg
));
SToken
*
pName
=
&
pInfo
->
pMiscInfo
->
user
.
user
;
SToken
*
pPwd
=
&
pInfo
->
pMiscInfo
->
user
.
passwd
;
strncpy
(
pCreateMsg
->
user
,
pName
->
z
,
pName
->
n
);
strncpy
(
pCreateMsg
->
pass
,
pPwd
->
z
,
pPwd
->
n
);
SCreateAcctInfo
*
pAcctOpt
=
&
pInfo
->
pMiscInfo
->
acctOpt
;
pCreateMsg
->
maxUsers
=
htonl
(
pAcctOpt
->
maxUsers
);
pCreateMsg
->
maxDbs
=
htonl
(
pAcctOpt
->
maxDbs
);
pCreateMsg
->
maxTimeSeries
=
htonl
(
pAcctOpt
->
maxTimeSeries
);
pCreateMsg
->
maxStreams
=
htonl
(
pAcctOpt
->
maxStreams
);
// pCreateMsg->maxPointsPerSecond = htonl(pAcctOpt->maxPointsPerSecond);
pCreateMsg
->
maxStorage
=
htobe64
(
pAcctOpt
->
maxStorage
);
// pCreateMsg->maxQueryTime = htobe64(pAcctOpt->maxQueryTime);
// pCreateMsg->maxConnections = htonl(pAcctOpt->maxConnections);
if
(
pAcctOpt
->
stat
.
n
==
0
)
{
pCreateMsg
->
accessState
=
-
1
;
}
else
{
if
(
pAcctOpt
->
stat
.
z
[
0
]
==
'r'
&&
pAcctOpt
->
stat
.
n
==
1
)
{
pCreateMsg
->
accessState
=
TSDB_VN_READ_ACCCESS
;
}
else
if
(
pAcctOpt
->
stat
.
z
[
0
]
==
'w'
&&
pAcctOpt
->
stat
.
n
==
1
)
{
pCreateMsg
->
accessState
=
TSDB_VN_WRITE_ACCCESS
;
}
else
if
(
strncmp
(
pAcctOpt
->
stat
.
z
,
"all"
,
3
)
==
0
&&
pAcctOpt
->
stat
.
n
==
3
)
{
pCreateMsg
->
accessState
=
TSDB_VN_ALL_ACCCESS
;
}
else
if
(
strncmp
(
pAcctOpt
->
stat
.
z
,
"no"
,
2
)
==
0
&&
pAcctOpt
->
stat
.
n
==
2
)
{
pCreateMsg
->
accessState
=
0
;
}
}
*
outputLen
=
sizeof
(
SCreateAcctMsg
);
return
pMsg
;
}
SDropUserMsg
*
buildDropUserMsg
(
SSqlInfo
*
pInfo
,
int32_t
*
msgLen
,
int64_t
id
,
char
*
msgBuf
,
int32_t
msgBufLen
)
{
SToken
*
pName
=
taosArrayGet
(
pInfo
->
pMiscInfo
->
a
,
0
);
if
(
pName
->
n
>=
TSDB_USER_LEN
)
{
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
7a39255e
...
...
@@ -4228,6 +4228,42 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in
break
;
}
case
TSDB_SQL_CREATE_ACCT
:
case
TSDB_SQL_ALTER_ACCT
:
{
const
char
*
msg1
=
"invalid state option, available options[no, r, w, all]"
;
const
char
*
msg2
=
"invalid user/account name"
;
const
char
*
msg3
=
"name too long"
;
SToken
*
pName
=
&
pInfo
->
pMiscInfo
->
user
.
user
;
SToken
*
pPwd
=
&
pInfo
->
pMiscInfo
->
user
.
passwd
;
if
(
parserValidatePassword
(
pPwd
,
pMsgBuf
)
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
}
if
(
pName
->
n
>=
TSDB_USER_LEN
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg3
);
}
if
(
parserValidateNameToken
(
pName
)
!=
TSDB_CODE_SUCCESS
)
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg2
);
}
SCreateAcctInfo
*
pAcctOpt
=
&
pInfo
->
pMiscInfo
->
acctOpt
;
if
(
pAcctOpt
->
stat
.
n
>
0
)
{
if
(
pAcctOpt
->
stat
.
z
[
0
]
==
'r'
&&
pAcctOpt
->
stat
.
n
==
1
)
{
}
else
if
(
pAcctOpt
->
stat
.
z
[
0
]
==
'w'
&&
pAcctOpt
->
stat
.
n
==
1
)
{
}
else
if
(
strncmp
(
pAcctOpt
->
stat
.
z
,
"all"
,
3
)
==
0
&&
pAcctOpt
->
stat
.
n
==
3
)
{
}
else
if
(
strncmp
(
pAcctOpt
->
stat
.
z
,
"no"
,
2
)
==
0
&&
pAcctOpt
->
stat
.
n
==
2
)
{
}
else
{
return
buildInvalidOperationMsg
(
pMsgBuf
,
msg1
);
}
}
*
output
=
buildAcctManipulationMsg
(
pInfo
,
outputLen
,
id
,
msgBuf
,
msgBufLen
);
break
;
}
case
TSDB_SQL_DROP_ACCT
:
case
TSDB_SQL_DROP_USER
:
{
*
output
=
buildDropUserMsg
(
pInfo
,
outputLen
,
id
,
msgBuf
,
msgBufLen
);
...
...
source/util/src/terror.c
浏览文件 @
7a39255e
...
...
@@ -240,6 +240,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment"
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_FUNC_CODE
,
"Invalid func code"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_FUNC_BUFSIZE
,
"Invalid func bufSize"
)
// mnode-trans
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TRANS_ALREADY_EXIST
,
"Transaction already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TRANS_NOT_EXIST
,
"Transaction not exists"
)
// dnode
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_ACTION_IN_PROGRESS
,
"Action in progress"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DND_EXITING
,
"Dnode is exiting"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录