未验证 提交 f5a447b2 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9546 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
...@@ -174,6 +174,7 @@ typedef enum _mgmt_table { ...@@ -174,6 +174,7 @@ typedef enum _mgmt_table {
typedef struct SBuildTableMetaInput { typedef struct SBuildTableMetaInput {
int32_t vgId; int32_t vgId;
char* dbName;
char* tableFullName; char* tableFullName;
} SBuildTableMetaInput; } SBuildTableMetaInput;
...@@ -355,9 +356,9 @@ typedef struct SEpSet { ...@@ -355,9 +356,9 @@ typedef struct SEpSet {
} SEpSet; } SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
if(buf == NULL) return sizeof(SEpSet); if (buf == NULL) return sizeof(SEpSet);
memcpy(buf, pEp, sizeof(SEpSet)); memcpy(buf, pEp, sizeof(SEpSet));
//TODO: endian conversion // TODO: endian conversion
return sizeof(SEpSet); return sizeof(SEpSet);
} }
...@@ -776,6 +777,7 @@ typedef struct { ...@@ -776,6 +777,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
char dbFname[TSDB_DB_FNAME_LEN];
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg; } STableInfoMsg;
...@@ -810,6 +812,7 @@ typedef struct { ...@@ -810,6 +812,7 @@ typedef struct {
typedef struct { typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
char stbFname[TSDB_TABLE_FNAME_LEN]; char stbFname[TSDB_TABLE_FNAME_LEN];
char dbFname[TSDB_DB_FNAME_LEN];
int32_t numOfTags; int32_t numOfTags;
int32_t numOfColumns; int32_t numOfColumns;
int8_t precision; int8_t precision;
...@@ -1122,10 +1125,10 @@ typedef struct STaskDropRsp { ...@@ -1122,10 +1125,10 @@ typedef struct STaskDropRsp {
} STaskDropRsp; } STaskDropRsp;
typedef struct { typedef struct {
int8_t igExists; int8_t igExists;
char* name; char* name;
char* physicalPlan; char* physicalPlan;
char* logicalPlan; char* logicalPlan;
} SCMCreateTopicReq; } SCMCreateTopicReq;
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
...@@ -1161,8 +1164,8 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi ...@@ -1161,8 +1164,8 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi
} }
typedef struct { typedef struct {
char* topicName; char* topicName;
char* consumerGroup; char* consumerGroup;
int64_t consumerId; int64_t consumerId;
} SCMSubscribeReq; } SCMSubscribeReq;
...@@ -1183,7 +1186,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq ...@@ -1183,7 +1186,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
SEpSet pEpSet; SEpSet pEpSet;
} SCMSubscribeRsp; } SCMSubscribeRsp;
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
...@@ -1252,9 +1255,9 @@ typedef struct SVCreateTbReq { ...@@ -1252,9 +1255,9 @@ typedef struct SVCreateTbReq {
char* name; char* name;
uint32_t ttl; uint32_t ttl;
uint32_t keep; uint32_t keep;
#define TD_SUPER_TABLE 0 #define TD_SUPER_TABLE TSDB_SUPER_TABLE
#define TD_CHILD_TABLE 1 #define TD_CHILD_TABLE TSDB_CHILD_TABLE
#define TD_NORMAL_TABLE 2 #define TD_NORMAL_TABLE TSDB_NORMAL_TABLE
uint8_t type; uint8_t type;
union { union {
struct { struct {
...@@ -1282,8 +1285,10 @@ typedef struct { ...@@ -1282,8 +1285,10 @@ typedef struct {
int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq);
int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq);
int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq);
void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq);
int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq);
void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq);
typedef struct SVCreateTbRsp { typedef struct SVCreateTbRsp {
} SVCreateTbRsp; } SVCreateTbRsp;
......
...@@ -40,8 +40,9 @@ enum { ...@@ -40,8 +40,9 @@ enum {
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" )
......
...@@ -25,20 +25,20 @@ ...@@ -25,20 +25,20 @@
extern "C" { extern "C" {
#endif #endif
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
// Types exported // Types exported
typedef struct SMeta SMeta; typedef struct SMeta SMeta;
#define META_SUPER_TABLE 0
#define META_CHILD_TABLE 1
#define META_NORMAL_TABLE 2
typedef struct SMetaCfg { typedef struct SMetaCfg {
/// LRU cache size /// LRU cache size
uint64_t lruSize; uint64_t lruSize;
} SMetaCfg; } SMetaCfg;
typedef struct { typedef struct {
int32_t nCols; uint32_t nCols;
SSchema *pSchema; SSchema *pSchema;
} SSchemaWrapper; } SSchemaWrapper;
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
#include "tname.h" #include "tname.h"
#include "tvariant.h" #include "tvariant.h"
/* /**
* The first field of a node of any type is guaranteed to be the int16_t. * The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode. * Hence the type of any node can be gotten by casting it to SQueryNode.
*/ */
...@@ -157,7 +157,7 @@ typedef struct SVgDataBlocks { ...@@ -157,7 +157,7 @@ typedef struct SVgDataBlocks {
typedef struct SInsertStmtInfo { typedef struct SInsertStmtInfo {
int16_t nodeType; int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>. SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement] uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position const char* sql; // current sql statement position
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_FREELIST_H_
#define _TD_UTIL_FREELIST_H_
#include "os.h"
#include "tlist.h"
#ifdef __cplusplus
extern "C" {
#endif
struct SFreeListNode {
TD_SLIST_NODE(SFreeListNode);
char payload[];
};
typedef TD_SLIST(SFreeListNode) SFreeList;
#define TFL_MALLOC(SIZE, LIST) \
({ \
void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \
if (ptr) { \
TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \
ptr = ((struct SFreeListNode *)ptr)->payload; \
} \
ptr; \
})
#define tFreeListInit(pFL) TD_SLIST_INIT(pFL)
static FORCE_INLINE void tFreeListClear(SFreeList *pFL) {
struct SFreeListNode *pNode;
for (;;) {
pNode = TD_SLIST_HEAD(pFL);
if (pNode == NULL) break;
TD_SLIST_POP(pFL);
free(pNode);
}
}
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_FREELIST_H_*/
\ No newline at end of file
...@@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { ...@@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
......
...@@ -435,42 +435,14 @@ TEST(testCase, connect_Test) { ...@@ -435,42 +435,14 @@ TEST(testCase, connect_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, show_table_Test) { //TEST(testCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "show tables");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, create_multiple_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
// //
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); // taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st2 tags(2)"); // pRes = taos_query(pConn, "show tables");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); // printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
...@@ -490,3 +462,31 @@ TEST(testCase, show_table_Test) { ...@@ -490,3 +462,31 @@ TEST(testCase, show_table_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
...@@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) { ...@@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) {
return 0; return 0;
} }
int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
int tlen = 0; int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver); tlen += taosEncodeFixedU64(buf, pReq->ver);
...@@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { ...@@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
return buf; return buf;
} }
int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) {
int tlen = 0;
tlen += taosEncodeFixedU64(buf, pReq->ver);
tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray));
for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i);
tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq);
}
return tlen;
}
void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) {
uint32_t nsize = 0;
buf = taosDecodeFixedU64(buf, &pReq->ver);
buf = taosDecodeFixedU32(buf, &nsize);
for (size_t i = 0; i < nsize; i++) {
SVCreateTbReq req;
buf = tDeserializeSVCreateTbReq(buf, &req);
taosArrayPush(pReq->pArray, &req);
}
return buf;
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static int tmsgStartEncode(SMsgEncoder *pME) { static int tmsgStartEncode(SMsgEncoder *pME) {
struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode));
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "vnodeDef.h" #include "vnodeDef.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
...@@ -43,7 +43,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -43,7 +43,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_SHOW_TABLES_FETCH: case TDMT_VND_SHOW_TABLES_FETCH:
return vnodeGetTableList(pVnode, pMsg); return vnodeGetTableList(pVnode, pMsg);
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
default: default:
...@@ -60,18 +60,21 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -60,18 +60,21 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t nCols; int32_t nCols;
int32_t nTagCols; int32_t nTagCols;
SSchemaWrapper *pSW; SSchemaWrapper *pSW;
STableMetaMsg * pTbMetaMsg; STableMetaMsg * pTbMetaMsg = NULL;
SSchema * pTagSchema; SSchema * pTagSchema;
SRpcMsg rpcMsg;
int msgLen = 0;
int32_t code = TSDB_CODE_VND_APP_ERROR;
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
if (pTbCfg == NULL) { if (pTbCfg == NULL) {
return -1; goto _exit;
} }
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
if (pStbCfg == NULL) { if (pStbCfg == NULL) {
return -1; goto _exit;
} }
pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true); pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true);
...@@ -91,12 +94,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -91,12 +94,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pTagSchema = NULL; pTagSchema = NULL;
} }
int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
if (pTbMetaMsg == NULL) { if (pTbMetaMsg == NULL) {
return -1; goto _exit;
} }
memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname));
strcpy(pTbMetaMsg->tbFname, pTbCfg->name); strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
strcpy(pTbMetaMsg->stbFname, pStbCfg->name); strcpy(pTbMetaMsg->stbFname, pStbCfg->name);
...@@ -119,13 +123,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -119,13 +123,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pSch->bytes = htonl(pSch->bytes); pSch->bytes = htonl(pSch->bytes);
} }
SRpcMsg rpcMsg = { code = 0;
.handle = pMsg->handle,
.ahandle = pMsg->ahandle, _exit:
.pCont = pTbMetaMsg,
.contLen = msgLen, rpcMsg.handle = pMsg->handle;
.code = 0, rpcMsg.ahandle = pMsg->ahandle;
}; rpcMsg.pCont = pTbMetaMsg;
rpcMsg.contLen = msgLen;
rpcMsg.code = code;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
...@@ -138,10 +144,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -138,10 +144,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
* @param pRsp * @param pRsp
*/ */
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
SMTbCursor* pCur = metaOpenTbCursor(pVnode->pMeta); SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta);
SArray* pArray = taosArrayInit(10, POINTER_BYTES); SArray * pArray = taosArrayInit(10, POINTER_BYTES);
char* name = NULL; char * name = NULL;
int32_t totalLen = 0; int32_t totalLen = 0;
while ((name = metaTbCursorNext(pCur)) != NULL) { while ((name = metaTbCursorNext(pCur)) != NULL) {
taosArrayPush(pArray, &name); taosArrayPush(pArray, &name);
...@@ -150,18 +156,19 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -150,18 +156,19 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
metaCloseTbCursor(pCur); metaCloseTbCursor(pCur);
int32_t rowLen = (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4; int32_t rowLen =
int32_t numOfTables = (int32_t) taosArrayGetSize(pArray); (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4;
int32_t numOfTables = (int32_t)taosArrayGetSize(pArray);
int32_t payloadLen = rowLen * numOfTables; int32_t payloadLen = rowLen * numOfTables;
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont; // SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen); SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen); memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);
char* p = pFetchRsp->data; char *p = pFetchRsp->data;
for(int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
char* n = taosArrayGetP(pArray, i); char *n = taosArrayGetP(pArray, i);
STR_TO_VARSTR(p, n); STR_TO_VARSTR(p, n);
p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
...@@ -171,11 +178,11 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -171,11 +178,11 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
pFetchRsp->precision = 0; pFetchRsp->precision = 0;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.pCont = pFetchRsp, .pCont = pFetchRsp,
.contLen = sizeof(SVShowTablesFetchRsp) + payloadLen, .contLen = sizeof(SVShowTablesFetchRsp) + payloadLen,
.code = 0, .code = 0,
}; };
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
......
...@@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
} }
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SRpcMsg * pMsg; SRpcMsg *pMsg;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
...@@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { ...@@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SVCreateTbReq vCreateTbReq; SVCreateTbReq vCreateTbReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen); SVCreateTbBatchReq vCreateTbBatchReq;
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) { if (ptr == NULL) {
// TODO: handle error // TODO: handle error
} }
...@@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_CREATE_STB: case TDMT_VND_CREATE_STB:
case TDMT_VND_CREATE_TABLE:
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) {
// TODO: handle error // TODO: handle error
...@@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: maybe need to clear the requst struct // TODO: maybe need to clear the requst struct
break; break;
case TDMT_VND_CREATE_TABLE:
tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq);
for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) {
SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i);
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
// TODO: handle error
}
}
case TDMT_VND_DROP_STB: case TDMT_VND_DROP_STB:
case TDMT_VND_DROP_TABLE: case TDMT_VND_DROP_TABLE:
// if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
......
...@@ -22,10 +22,6 @@ ...@@ -22,10 +22,6 @@
extern "C" { extern "C" {
#endif #endif
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
int metaValidateTbCfg(SMeta *pMeta, const STbCfg *); int metaValidateTbCfg(SMeta *pMeta, const STbCfg *);
size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize); size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize);
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int32_t sver; int32_t sver;
int32_t padding;
} SSchemaKey; } SSchemaKey;
struct SMetaDB { struct SMetaDB {
...@@ -55,6 +56,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * ...@@ -55,6 +56,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static void metaClearTbCfg(STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
...@@ -169,18 +172,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { ...@@ -169,18 +172,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
pBuf = buf; pBuf = buf;
memset(&key, 0, sizeof(key)); memset(&key, 0, sizeof(key));
memset(&value, 0, sizeof(key)); memset(&value, 0, sizeof(key));
SSchemaKey schemaKey = {uid, 0 /*TODO*/}; SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
key.data = &schemaKey; key.data = &schemaKey;
key.size = sizeof(schemaKey); key.size = sizeof(schemaKey);
taosEncodeFixedU32(&pBuf, ncols); SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
for (size_t i = 0; i < ncols; i++) { metaEncodeSchema(&pBuf, &sw);
taosEncodeFixedI8(&pBuf, pSchema[i].type);
taosEncodeFixedI32(&pBuf, pSchema[i].colId);
taosEncodeFixedI32(&pBuf, pSchema[i].bytes);
taosEncodeString(&pBuf, pSchema[i].name);
}
value.data = buf; value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf); value.size = POINTER_DISTANCE(pBuf, buf);
...@@ -197,6 +195,38 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { ...@@ -197,6 +195,38 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
int tlen = 0;
SSchema *pSchema;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI32(buf, pSchema->colId);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
tlen += taosEncodeString(buf, pSchema->name);
}
return tlen;
}
static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) {
SSchema *pSchema;
buf = taosDecodeFixedU32(buf, &pSW->nCols);
pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * pSW->nCols);
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI32(buf, &pSchema->colId);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeStringTo(buf, pSchema->name);
}
return buf;
}
static SMetaDB *metaNewDB() { static SMetaDB *metaNewDB() {
SMetaDB *pDB = NULL; SMetaDB *pDB = NULL;
pDB = (SMetaDB *)calloc(1, sizeof(*pDB)); pDB = (SMetaDB *)calloc(1, sizeof(*pDB));
...@@ -376,15 +406,8 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { ...@@ -376,15 +406,8 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
tsize += taosEncodeFixedU8(buf, pTbCfg->type); tsize += taosEncodeFixedU8(buf, pTbCfg->type);
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
tsize += taosEncodeVariantU32(buf, pTbCfg->stbCfg.nTagCols); SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema};
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { tsize += metaEncodeSchema(buf, &sw);
tsize += taosEncodeFixedI8(buf, pTbCfg->stbCfg.pTagSchema[i].type);
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].colId);
tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].bytes);
tsize += taosEncodeString(buf, pTbCfg->stbCfg.pTagSchema[i].name);
}
// tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema);
} else if (pTbCfg->type == META_CHILD_TABLE) { } else if (pTbCfg->type == META_CHILD_TABLE) {
tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid); tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid);
tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag); tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag);
...@@ -403,14 +426,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { ...@@ -403,14 +426,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); buf = taosDecodeFixedU8(buf, &(pTbCfg->type));
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
buf = taosDecodeVariantU32(buf, &(pTbCfg->stbCfg.nTagCols)); SSchemaWrapper sw;
pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols); buf = metaDecodeSchema(buf, &sw);
for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { pTbCfg->stbCfg.nTagCols = sw.nCols;
buf = taosDecodeFixedI8(buf, &(pTbCfg->stbCfg.pTagSchema[i].type)); pTbCfg->stbCfg.pTagSchema = sw.pSchema;
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pTagSchema[i].name);
}
} else if (pTbCfg->type == META_CHILD_TABLE) { } else if (pTbCfg->type == META_CHILD_TABLE) {
buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid)); buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid));
buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag)); buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag));
...@@ -496,7 +515,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -496,7 +515,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
int ret; int ret;
void * pBuf; void * pBuf;
SSchema * pSchema; SSchema * pSchema;
SSchemaKey schemaKey = {uid, sver}; SSchemaKey schemaKey = {uid, sver, 0};
DBT key = {0}; DBT key = {0};
DBT value = {0}; DBT value = {0};
...@@ -507,38 +526,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo ...@@ -507,38 +526,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
// Query // Query
ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
if (ret != 0) { if (ret != 0) {
printf("failed to query schema DB since %s================\n", db_strerror(ret));
return NULL; return NULL;
} }
// Decode the schema // Decode the schema
pBuf = value.data; pBuf = value.data;
taosDecodeFixedI32(&pBuf, &nCols); pSW = malloc(sizeof(*pSW));
if (isinline) { metaDecodeSchema(pBuf, pSW);
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols);
if (pSW == NULL) {
return NULL;
}
pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW));
} else {
pSW = (SSchemaWrapper *)malloc(sizeof(*pSW));
if (pSW == NULL) {
return NULL;
}
pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols);
if (pSW->pSchema == NULL) {
free(pSW);
return NULL;
}
}
for (int i = 0; i < nCols; i++) {
pSchema = pSW->pSchema + i;
taosDecodeFixedI8(&pBuf, &(pSchema->type));
taosDecodeFixedI32(&pBuf, &(pSchema->colId));
taosDecodeFixedI32(&pBuf, &(pSchema->bytes));
taosDecodeStringTo(&pBuf, pSchema->name);
}
return pSW; return pSW;
} }
......
...@@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE
char tbFullName[TSDB_TABLE_FNAME_LEN]; char tbFullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName); tNameExtractFullName(pTableName, tbFullName);
SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName}; SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
char *msg = NULL; char *msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE ...@@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
char tbFullName[TSDB_TABLE_FNAME_LEN]; char dbFullName[TSDB_DB_FNAME_LEN];
tNameExtractFullName(pTableName, tbFullName); tNameGetFullDbName(pTableName, dbFullName);
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname};
char *msg = NULL; char *msg = NULL;
SEpSet *pVnodeEpSet = NULL; SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
...@@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
if (output->metaNum != 1 && output->metaNum != 2) { if (output->metaNum != 1 && output->metaNum != 2) {
ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (NULL == output->tbMeta) { if (NULL == output->tbMeta) {
ctgError("no valid table meta got from meta rsp"); ctgError("no valid table meta got from meta rsp");
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.cache) { if (NULL == pCatalog->tableCache.cache) {
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
...@@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->tableCache.stableCache) { if (NULL == pCatalog->tableCache.stableCache) {
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
if (output->metaNum == 2) { if (output->metaNum == 2) {
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
ctgError("push ctable[%s] to table cache failed", output->ctbFname); ctgError("push ctable[%s] to table cache failed", output->ctbFname);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
} }
...@@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out ...@@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push table[%s] to table cache failed", output->tbFname); ctgError("push table[%s] to table cache failed", output->tbFname);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
} else { } else {
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
ctgError("push table[%s] to table cache failed", output->tbFname); ctgError("push table[%s] to table cache failed", output->tbFname);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
} }
} }
_return:
tfree(output->tbMeta);
CTG_RET(code); CTG_RET(code);
} }
......
...@@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) { ...@@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) {
assert(0); assert(0);
} }
tfree(tbMeta);
if (ctgTestEnableSleep) { if (ctgTestEnableSleep) {
usleep(rand()%5); usleep(rand()%5);
} }
...@@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) { ...@@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) {
} }
} }
tfree(output.tbMeta);
return NULL; return NULL;
} }
...@@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { ...@@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
#endif
TEST(multiThread, getSetDbVgroupCase) { TEST(multiThread, getSetDbVgroupCase) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
...@@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) { ...@@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) {
catalogDestroy(); catalogDestroy();
} }
#endif
TEST(multiThread, ctableMeta) { TEST(multiThread, ctableMeta) {
struct SCatalog* pCtg = NULL; struct SCatalog* pCtg = NULL;
void *mockPointer = (void *)0x1; void *mockPointer = (void *)0x1;
...@@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) { ...@@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) {
pthread_attr_init(&thattr); pthread_attr_init(&thattr);
pthread_t thread1, thread2; pthread_t thread1, thread2;
pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg);
pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg); pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg);
sleep(1);
pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg);
while (true) { while (true) {
if (ctgTestDeadLoop) { if (ctgTestDeadLoop) {
......
...@@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ ...@@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type * @param type
* @return * @return
*/ */
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/** /**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.
......
...@@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou ...@@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL; SArray* array = NULL;
SName name = {0}; SName name = {0};
tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db)); tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db));
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
...@@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou ...@@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
pEpSet->numOfEps = info->numOfEps; pEpSet->numOfEps = info->numOfEps;
pEpSet->inUse = info->inUse; pEpSet->inUse = info->inUse;
for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
pEpSet->port[i] = info->epAddr[i].port; pEpSet->port[i] = info->epAddr[i].port;
} }
...@@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { ...@@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) {
val = htonl(pCreate->numOfVgroups); val = htonl(pCreate->numOfVgroups);
if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) {
snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val, snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val,
TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -321,8 +321,12 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) { ...@@ -321,8 +321,12 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, typedef struct SVgroupTablesBatch {
SEpSet* pEpSet) { SVCreateTbBatchReq req;
SVgroupInfo info;
} SVgroupTablesBatch;
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
...@@ -330,17 +334,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -330,17 +334,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// super table name, create table by using dst // super table name, create table by using dst
int32_t numOfTables = (int32_t)taosArrayGetSize(pCreateTable->childTableInfo); size_t numOfTables = taosArrayGetSize(pCreateTable->childTableInfo);
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j);
SToken* pSTableNameToken = &pCreateTableInfo->stbName; SToken* pSTableNameToken = &pCreateTableInfo->stbName;
char buf[TSDB_TABLE_FNAME_LEN];
SToken sTblToken;
sTblToken.z = buf;
int32_t code = parserValidateNameToken(pSTableNameToken); int32_t code = parserValidateNameToken(pSTableNameToken);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
...@@ -357,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -357,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
size_t numOfInputTag = taosArrayGetSize(pValList); size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
assert(pSuperTableMeta != NULL); assert(pSuperTableMeta != NULL);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
...@@ -460,12 +465,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -460,12 +465,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
for (int32_t i = 0; i < numOfInputTag; ++i) { for (int32_t i = 0; i < numOfInputTag; ++i) {
SSchema* pSchema = &pTagSchema[i]; SSchema* pSchema = &pTagSchema[i];
SToken* pItem = taosArrayGet(pValList, i);
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; char* endPtr = NULL;
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
char* endPtr = NULL; SToken* pItem = taosArrayGet(pValList, i);
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf); code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -478,7 +484,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -478,7 +484,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
if (row == NULL) { if (row == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
...@@ -489,40 +495,73 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p ...@@ -489,40 +495,73 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
return code; return code;
} }
SVgroupInfo info = {0};
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info);
struct SVCreateTbReq req = {0}; struct SVCreateTbReq req = {0};
req.type = TD_CHILD_TABLE; req.type = TD_CHILD_TABLE;
req.name = strdup(tNameGetTableName(&tableName)); req.name = strdup(tNameGetTableName(&tableName));
req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
int32_t serLen = sizeof(SMsgHead) + tSerializeSVCreateTbReq(NULL, &req); SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
char* buf1 = calloc(1, serLen); if (pTableBatch == NULL) {
*pOutput = buf1; SVgroupTablesBatch tBatch = {0};
buf1 += sizeof(SMsgHead); tBatch.info = info;
tSerializeSVCreateTbReq((void*)&buf1, &req);
*len = serLen;
SVgroupInfo info = {0}; tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); taosArrayPush(tBatch.req.pArray, &req);
pEpSet->inUse = info.inUse; taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch));
pEpSet->numOfEps = info.numOfEps; } else { // add to the correct vgroup
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { assert(info.vgId == pTableBatch->info.vgId);
pEpSet->port[i] = info.epAddr[i].port; taosArrayPush(pTableBatch->req.pArray, &req);
tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
} }
((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
} }
// TODO: serialize and
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch* pTbBatch = NULL;
do {
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (pTbBatch == NULL) break;
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen);
if (buf == NULL) {
// TODO: handle error
}
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
((SMsgHead*)buf)->contLen = htonl(tlen);
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
} while (true);
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = pStmtInfo;
*len = sizeof(SInsertStmtInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
int32_t msgBufLen) {
int32_t code = 0; int32_t code = 0;
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
...@@ -539,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -539,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pUser->passwd; SToken* pPwd = &pUser->passwd;
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
if (pInfo->type == TSDB_SQL_CREATE_USER) { if (pInfo->type == TSDB_SQL_CREATE_USER) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else { } else {
if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (pUser->type == TSDB_ALTER_USER_PASSWD) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
assert(pPwd->type == TSDB_DATA_TYPE_NULL); assert(pPwd->type == TSDB_DATA_TYPE_NULL);
...@@ -564,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -564,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) {
// pCmd->count = 2; // pCmd->count = 2;
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg4); code = buildInvalidOperationMsg(pMsgBuf, msg4);
goto _error;
} }
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
...@@ -586,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -586,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pInfo->pMiscInfo->user.passwd; SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
...@@ -604,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -604,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
...@@ -623,7 +672,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -623,7 +672,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_SHOW: { case TSDB_SQL_SHOW: {
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf);
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW;
break; break;
} }
...@@ -632,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -632,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SName n = {0}; SName n = {0};
int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
...@@ -657,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -657,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt);
if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
char buf[TSDB_DB_NAME_LEN] = {0}; char buf[TSDB_DB_NAME_LEN] = {0};
SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf));
if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
pDcl->pMsg = (char*)pCreateMsg; pDcl->pMsg = (char*)pCreateMsg;
...@@ -687,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -687,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SName name = {0}; SName name = {0};
code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
...@@ -699,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -699,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg); pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->pMsg = (char*)pDropDbMsg; pDcl->pMsg = (char*)pDropDbMsg;
return TSDB_CODE_SUCCESS; break;
} }
case TSDB_SQL_CREATE_TABLE: { case TSDB_SQL_CREATE_TABLE: {
...@@ -707,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -707,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code; terrno = code;
goto _error;
} }
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) { } else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return code; goto _error;
} }
pDcl->msgType = TDMT_VND_CREATE_TABLE; pDcl->msgType = TDMT_VND_CREATE_TABLE;
...@@ -729,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -729,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_TABLE: { case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_STB; pDcl->msgType = TDMT_MND_DROP_STB;
...@@ -739,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -739,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_CREATE_DNODE; pDcl->msgType = TDMT_MND_CREATE_DNODE;
...@@ -749,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -749,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_DNODE: { case TSDB_SQL_DROP_DNODE: {
pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_DNODE; pDcl->msgType = TDMT_MND_DROP_DNODE;
...@@ -760,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm ...@@ -760,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
break; break;
} }
return code; return pDcl;
_error:
terrno = code;
tfree(pDcl);
return NULL;
} }
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SInsertStmtInfo* pInsertStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt);
return NULL;
}
return pInsertStmt;
}
\ No newline at end of file
...@@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) { ...@@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) {
} }
bool qIsDdlQuery(const SQueryNode* pQuery) { bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type;
} }
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
...@@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { ...@@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
} }
if (!isDqlSqlStatement(&info)) { if (!isDqlSqlStatement(&info)) {
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); bool toVnode = false;
if (NULL == pDcl) { if (info.type == TSDB_SQL_CREATE_TABLE) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. SCreateTableSql* pCreateSql = info.pCreateTableInfo;
return terrno; if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
} }
pDcl->nodeType = info.type; if (toVnode) {
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) { if (pInsertInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
return terrno;
}
*pQuery = (SQueryNode*)pDcl; *pQuery = (SQueryNode*)pDcl;
pDcl->nodeType = info.type;
} }
} else { } else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));
......
...@@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) { ...@@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1); SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
// convert the show command to be the select query // convert the show command to be the select query
// select name, privilege, create_time, account from information_schema.users; // select name, privilege, create_time, account from information_schema.users;
...@@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) { ...@@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) {
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDclSqlStatement(&info1), true); ASSERT_EQ(isDclSqlStatement(&info1), true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
destroySqlInfo(&info1); destroySqlInfo(&info1);
} }
\ No newline at end of file
...@@ -40,7 +40,7 @@ extern "C" { ...@@ -40,7 +40,7 @@ extern "C" {
#define QNODE_SESSIONWINDOW 12 #define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13 #define QNODE_STATEWINDOW 13
#define QNODE_FILL 14 #define QNODE_FILL 14
#define QNODE_INSERT 15 #define QNODE_MODIFY 15
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
......
...@@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { ...@@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0; return 0;
} }
int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); *pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) { if (NULL == *pQueryPlan || NULL == blocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pQueryPlan)->info.type = QNODE_INSERT;
(*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks); taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks; (*pQueryPlan)->pExtInfo = blocks;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { ...@@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_SELECT: { case TSDB_SQL_SELECT: {
return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan);
} }
case TSDB_SQL_INSERT: case TSDB_SQL_INSERT:
return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); case TSDB_SQL_CREATE_TABLE:
return createInsertPlan(pNode, pQueryPlan);
default: default:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS;
} }
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
......
...@@ -34,7 +34,7 @@ static const char* gOpName[] = { ...@@ -34,7 +34,7 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME #undef INCLUDE_AS_NAME
}; };
static void* vailidPointer(void* p) { static void* validPointer(void* p) {
if (NULL == p) { if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
} }
...@@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) { ...@@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) {
} }
static SDataSink* initDataSink(int32_t type, int32_t size) { static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type; sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type); sink->info.name = dsinkTypeToDsinkName(type);
return sink; return sink;
...@@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) { ...@@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
} }
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type; node->info.type = type;
node->info.name = opTypeToOpName(type); node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
...@@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable ...@@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} }
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId; subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId); ++(pCxt->nextId.subplanId);
subplan->type = type; subplan->type = type;
...@@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { ...@@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
if (NULL != pCxt->pCurrentSubplan) { if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1; subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) { if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
} }
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
} }
SArray* currentLevel; SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, &currentLevel); taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
} else { } else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
...@@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
case QNODE_INSERT: case QNODE_MODIFY:
// Insert is not an operator in a physical plan. // Insert is not an operator in a physical plan.
break; break;
default: default:
...@@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { ...@@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
} }
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_INSERT == pRoot->info.type) { if (QNODE_MODIFY == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot); splitInsertSubplan(pCxt, pRoot);
} else { } else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
...@@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
.pCatalog = pCatalog, .pCatalog = pCatalog,
.pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {0} // todo queryid .nextId = {0} // todo queryid
}; };
*pDag = context.pDag; *pDag = context.pDag;
context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode); createSubplanByLevel(&context, pQueryNode);
} CATCH(code) { } CATCH(code) {
CLEANUP_EXECUTE(); CLEANUP_EXECUTE();
......
...@@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 ...@@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
bMsg->header.vgId = htonl(bInput->vgId); bMsg->header.vgId = htonl(bInput->vgId);
if (bInput->dbName) {
strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname));
bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0;
}
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
...@@ -243,9 +248,14 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -243,9 +248,14 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { if (pMetaMsg->tableType == TSDB_CHILD_TABLE) {
pOut->metaNum = 2; pOut->metaNum = 2;
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); if (pMetaMsg->dbFname[0]) {
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname);
} else {
memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname));
memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname));
}
pOut->ctbMeta.vgId = pMetaMsg->vgId; pOut->ctbMeta.vgId = pMetaMsg->vgId;
pOut->ctbMeta.tableType = pMetaMsg->tableType; pOut->ctbMeta.tableType = pMetaMsg->tableType;
...@@ -256,7 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { ...@@ -256,7 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) {
} else { } else {
pOut->metaNum = 1; pOut->metaNum = 1;
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); if (pMetaMsg->dbFname[0]) {
snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname);
} else {
memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname));
}
code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta);
} }
......
...@@ -664,7 +664,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { ...@@ -664,7 +664,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
} }
SSubQueryMsg *pMsg = msg; SSubQueryMsg *pMsg = msg;
pMsg->sId = htobe64(schMgmt.sId); pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
......
...@@ -33,4 +33,12 @@ ENDIF() ...@@ -33,4 +33,12 @@ ENDIF()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
# freelistTest
add_executable(freelistTest "")
target_sources(freelistTest
PRIVATE
"freelistTest.cpp"
)
target_link_libraries(freelistTest os util gtest gtest_main)
#include "gtest/gtest.h"
#include "freelist.h"
TEST(TD_UTIL_FREELIST_TEST, simple_test) {
SFreeList fl;
tFreeListInit(&fl);
for (size_t i = 0; i < 1000; i++) {
void *ptr = TFL_MALLOC(1024, &fl);
GTEST_ASSERT_NE(ptr, nullptr);
}
tFreeListClear(&fl);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册