diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5cf027591faa22aeb9e36887e8e8e112b871742a..af250bff03f0ebd0dcf7f339bcf515cd6492e6d5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -174,6 +174,7 @@ typedef enum _mgmt_table { typedef struct SBuildTableMetaInput { int32_t vgId; + char* dbName; char* tableFullName; } SBuildTableMetaInput; @@ -355,9 +356,9 @@ typedef struct SEpSet { } SEpSet; static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { - if(buf == NULL) return sizeof(SEpSet); + if (buf == NULL) return sizeof(SEpSet); memcpy(buf, pEp, sizeof(SEpSet)); - //TODO: endian conversion + // TODO: endian conversion return sizeof(SEpSet); } @@ -776,6 +777,7 @@ typedef struct { typedef struct { SMsgHead header; + char dbFname[TSDB_DB_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; @@ -810,6 +812,7 @@ typedef struct { typedef struct { char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char stbFname[TSDB_TABLE_FNAME_LEN]; + char dbFname[TSDB_DB_FNAME_LEN]; int32_t numOfTags; int32_t numOfColumns; int8_t precision; @@ -1122,10 +1125,10 @@ typedef struct STaskDropRsp { } STaskDropRsp; typedef struct { - int8_t igExists; - char* name; - char* physicalPlan; - char* logicalPlan; + int8_t igExists; + char* name; + char* physicalPlan; + char* logicalPlan; } SCMCreateTopicReq; static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { @@ -1161,8 +1164,8 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi } typedef struct { - char* topicName; - char* consumerGroup; + char* topicName; + char* consumerGroup; int64_t consumerId; } SCMSubscribeReq; @@ -1183,7 +1186,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq typedef struct { int32_t vgId; - SEpSet pEpSet; + SEpSet pEpSet; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { @@ -1252,9 +1255,9 @@ typedef struct SVCreateTbReq { char* name; uint32_t ttl; uint32_t keep; -#define TD_SUPER_TABLE 0 -#define TD_CHILD_TABLE 1 -#define TD_NORMAL_TABLE 2 +#define TD_SUPER_TABLE TSDB_SUPER_TABLE +#define TD_CHILD_TABLE TSDB_CHILD_TABLE +#define TD_NORMAL_TABLE TSDB_NORMAL_TABLE uint8_t type; union { struct { @@ -1282,8 +1285,10 @@ typedef struct { int tmsgSVCreateTbReqEncode(SMsgEncoder* pCoder, SVCreateTbReq* pReq); int tmsgSVCreateTbReqDecode(SMsgDecoder* pCoder, SVCreateTbReq* pReq); -int tSerializeSVCreateTbReq(void** buf, const SVCreateTbReq* pReq); +int tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); +int tSVCreateTbBatchReqSerialize(void** buf, SVCreateTbBatchReq* pReq); +void* tSVCreateTbBatchReqDeserialize(void* buf, SVCreateTbBatchReq* pReq); typedef struct SVCreateTbRsp { } SVCreateTbRsp; diff --git a/include/common/tmsgtype.h b/include/common/tmsgtype.h index 8e7ad87a0ac9184e18a6e62837bbf821192b1ec0..ebbf99b942199d2a1b247e8159eb2364b39e9830 100644 --- a/include/common/tmsgtype.h +++ b/include/common/tmsgtype.h @@ -40,8 +40,9 @@ enum { // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" ) diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index d5871072704534c3f52e66dbb69210c573747160..86ebb643a4f392b3c7d165f989422aa23be7b10c 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -25,20 +25,20 @@ extern "C" { #endif +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + // Types exported typedef struct SMeta SMeta; -#define META_SUPER_TABLE 0 -#define META_CHILD_TABLE 1 -#define META_NORMAL_TABLE 2 - typedef struct SMetaCfg { /// LRU cache size uint64_t lruSize; } SMetaCfg; typedef struct { - int32_t nCols; + uint32_t nCols; SSchema *pSchema; } SSchemaWrapper; diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 041adbb582494d2284056da814bc19574f0c4b6f..b326ac032ca50ee5df1b2a263e2b282fd379ba48 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -27,7 +27,7 @@ extern "C" { #include "tname.h" #include "tvariant.h" -/* +/** * The first field of a node of any type is guaranteed to be the int16_t. * Hence the type of any node can be gotten by casting it to SQueryNode. */ @@ -157,7 +157,7 @@ typedef struct SVgDataBlocks { typedef struct SInsertStmtInfo { int16_t nodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. - int8_t schemaAttache; // denote if submit block is built with table schema or not + int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position diff --git a/include/util/freelist.h b/include/util/freelist.h new file mode 100644 index 0000000000000000000000000000000000000000..497a6d58c3f0791a90765f0682f5474ca8a2b6cf --- /dev/null +++ b/include/util/freelist.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_FREELIST_H_ +#define _TD_UTIL_FREELIST_H_ + +#include "os.h" +#include "tlist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +struct SFreeListNode { + TD_SLIST_NODE(SFreeListNode); + char payload[]; +}; + +typedef TD_SLIST(SFreeListNode) SFreeList; + +#define TFL_MALLOC(SIZE, LIST) \ + ({ \ + void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \ + if (ptr) { \ + TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \ + ptr = ((struct SFreeListNode *)ptr)->payload; \ + } \ + ptr; \ + }) + +#define tFreeListInit(pFL) TD_SLIST_INIT(pFL) + +static FORCE_INLINE void tFreeListClear(SFreeList *pFL) { + struct SFreeListNode *pNode; + for (;;) { + pNode = TD_SLIST_HEAD(pFL); + if (pNode == NULL) break; + TD_SLIST_POP(pFL); + free(pNode); + } +} + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_FREELIST_H_*/ \ No newline at end of file diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 050e7639196095f67cbee9cab79d668ee54bad84..b0b2c57ee462ae6960e08de517edde1931e7aba3 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { - if (TSDB_SQL_INSERT == pRequest->type) { + if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 97692d71b465bc5cec74ce80de6004066fefe27f..7a87acfc36433d530e94aac7b885b4909ae38446 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -435,42 +435,14 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} -TEST(testCase, show_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "show tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -//TEST(testCase, create_multiple_tables) { +//TEST(testCase, show_table_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // // TAOS_RES* pRes = taos_query(pConn, "use abc1"); // taos_free_result(pRes); // -// pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st2 tags(2)"); +// pRes = taos_query(pConn, "show tables"); // if (taos_errno(pRes) != 0) { // printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); // taos_free_result(pRes); @@ -490,3 +462,31 @@ TEST(testCase, show_table_Test) { // taos_free_result(pRes); // taos_close(pConn); //} + +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b81143ee62b6a355b6ddf4a9a59b63733c62ac32..a18a472dba3f231473a7bb6aa07aafed9f2801e6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -98,7 +98,7 @@ int tmsgSVCreateTbReqDecode(SMsgDecoder *pCoder, SVCreateTbReq *pReq) { return 0; } -int tSerializeSVCreateTbReq(void **buf, const SVCreateTbReq *pReq) { +int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; tlen += taosEncodeFixedU64(buf, pReq->ver); @@ -193,6 +193,33 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { return buf; } +int tSVCreateTbBatchReqSerialize(void **buf, SVCreateTbBatchReq *pReq) { + int tlen = 0; + + tlen += taosEncodeFixedU64(buf, pReq->ver); + tlen += taosEncodeFixedU32(buf, taosArrayGetSize(pReq->pArray)); + for (size_t i = 0; i < taosArrayGetSize(pReq->pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(pReq->pArray, i); + tlen += tSerializeSVCreateTbReq(buf, pCreateTbReq); + } + + return tlen; +} + +void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { + uint32_t nsize = 0; + + buf = taosDecodeFixedU64(buf, &pReq->ver); + buf = taosDecodeFixedU32(buf, &nsize); + for (size_t i = 0; i < nsize; i++) { + SVCreateTbReq req; + buf = tDeserializeSVCreateTbReq(buf, &req); + taosArrayPush(pReq->pArray, &req); + } + + return buf; +} + /* ------------------------ STATIC METHODS ------------------------ */ static int tmsgStartEncode(SMsgEncoder *pME) { struct SMEListNode *pNode = (struct SMEListNode *)malloc(sizeof(*pNode)); diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index daa1e964dea80b11f12bb3a861a69c6437563f37..cbc4d75e8bd034e23f02221d41e6f0f4d543bf27 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -17,7 +17,7 @@ #include "vnodeDef.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } @@ -43,7 +43,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_SHOW_TABLES_FETCH: return vnodeGetTableList(pVnode, pMsg); -// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); + // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); default: @@ -60,18 +60,21 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t nCols; int32_t nTagCols; SSchemaWrapper *pSW; - STableMetaMsg * pTbMetaMsg; + STableMetaMsg * pTbMetaMsg = NULL; SSchema * pTagSchema; + SRpcMsg rpcMsg; + int msgLen = 0; + int32_t code = TSDB_CODE_VND_APP_ERROR; pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid); if (pTbCfg == NULL) { - return -1; + goto _exit; } if (pTbCfg->type == META_CHILD_TABLE) { pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid); if (pStbCfg == NULL) { - return -1; + goto _exit; } pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true); @@ -91,12 +94,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pTagSchema = NULL; } - int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); + msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols); pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen); if (pTbMetaMsg == NULL) { - return -1; + goto _exit; } + memcpy(pTbMetaMsg->dbFname, pReq->dbFname, sizeof(pTbMetaMsg->dbFname)); strcpy(pTbMetaMsg->tbFname, pTbCfg->name); if (pTbCfg->type == META_CHILD_TABLE) { strcpy(pTbMetaMsg->stbFname, pStbCfg->name); @@ -119,13 +123,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { pSch->bytes = htonl(pSch->bytes); } - SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .pCont = pTbMetaMsg, - .contLen = msgLen, - .code = 0, - }; + code = 0; + +_exit: + + rpcMsg.handle = pMsg->handle; + rpcMsg.ahandle = pMsg->ahandle; + rpcMsg.pCont = pTbMetaMsg; + rpcMsg.contLen = msgLen; + rpcMsg.code = code; rpcSendResponse(&rpcMsg); @@ -138,10 +144,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { * @param pRsp */ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { - SMTbCursor* pCur = metaOpenTbCursor(pVnode->pMeta); - SArray* pArray = taosArrayInit(10, POINTER_BYTES); + SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta); + SArray * pArray = taosArrayInit(10, POINTER_BYTES); - char* name = NULL; + char * name = NULL; int32_t totalLen = 0; while ((name = metaTbCursorNext(pCur)) != NULL) { taosArrayPush(pArray, &name); @@ -150,18 +156,19 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { metaCloseTbCursor(pCur); - int32_t rowLen = (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4; - int32_t numOfTables = (int32_t) taosArrayGetSize(pArray); + int32_t rowLen = + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4; + int32_t numOfTables = (int32_t)taosArrayGetSize(pArray); int32_t payloadLen = rowLen * numOfTables; -// SVShowTablesFetchReq *pFetchReq = pMsg->pCont; + // SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen); memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen); - char* p = pFetchRsp->data; - for(int32_t i = 0; i < numOfTables; ++i) { - char* n = taosArrayGetP(pArray, i); + char *p = pFetchRsp->data; + for (int32_t i = 0; i < numOfTables; ++i) { + char *n = taosArrayGetP(pArray, i); STR_TO_VARSTR(p, n); p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE); @@ -171,11 +178,11 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { pFetchRsp->precision = 0; SRpcMsg rpcMsg = { - .handle = pMsg->handle, + .handle = pMsg->handle, .ahandle = pMsg->ahandle, - .pCont = pFetchRsp, + .pCont = pFetchRsp, .contLen = sizeof(SVShowTablesFetchRsp) + payloadLen, - .code = 0, + .code = 0, }; rpcSendResponse(&rpcMsg); diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 3b1442a02caea4314dc99a35f3de3cfd8b821715..88a73ca17436b8473305a78324665868bb562224 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -27,7 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { } int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg * pMsg; + SRpcMsg *pMsg; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); @@ -50,8 +50,9 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - SVCreateTbReq vCreateTbReq; - void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + SVCreateTbReq vCreateTbReq; + SVCreateTbBatchReq vCreateTbBatchReq; + void * ptr = vnodeMalloc(pVnode, pMsg->contLen); if (ptr == NULL) { // TODO: handle error } @@ -68,7 +69,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { switch (pMsg->msgType) { case TDMT_VND_CREATE_STB: - case TDMT_VND_CREATE_TABLE: tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq); if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { // TODO: handle error @@ -76,6 +76,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: maybe need to clear the requst struct break; + case TDMT_VND_CREATE_TABLE: + tSVCreateTbBatchReqDeserialize(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbBatchReq); + for (int i = 0; i < taosArrayGetSize(vCreateTbBatchReq.pArray); i++) { + SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + // TODO: handle error + } + } + case TDMT_VND_DROP_STB: case TDMT_VND_DROP_TABLE: // if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { diff --git a/source/dnode/vnode/meta/inc/metaTbCfg.h b/source/dnode/vnode/meta/inc/metaTbCfg.h index b4ee095967a313dce7078176f3d2b94e0585dc09..b7b3924d1487520ded604f2e709c530f448edcce 100644 --- a/source/dnode/vnode/meta/inc/metaTbCfg.h +++ b/source/dnode/vnode/meta/inc/metaTbCfg.h @@ -22,10 +22,6 @@ extern "C" { #endif -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE - int metaValidateTbCfg(SMeta *pMeta, const STbCfg *); size_t metaEncodeTbObjFromTbOptions(const STbCfg *, void *pBuf, size_t bsize); diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index af8af6a05268fc1532526cdf03509a5d2e8c97a6..4254ad0acde0fcdda9027b2ab5aca76a5bf3ad71 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -23,6 +23,7 @@ typedef struct { tb_uid_t uid; int32_t sver; + int32_t padding; } SSchemaKey; struct SMetaDB { @@ -55,6 +56,8 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT * static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); +static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); +static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -169,18 +172,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { pBuf = buf; memset(&key, 0, sizeof(key)); memset(&value, 0, sizeof(key)); - SSchemaKey schemaKey = {uid, 0 /*TODO*/}; + SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0}; key.data = &schemaKey; key.size = sizeof(schemaKey); - taosEncodeFixedU32(&pBuf, ncols); - for (size_t i = 0; i < ncols; i++) { - taosEncodeFixedI8(&pBuf, pSchema[i].type); - taosEncodeFixedI32(&pBuf, pSchema[i].colId); - taosEncodeFixedI32(&pBuf, pSchema[i].bytes); - taosEncodeString(&pBuf, pSchema[i].name); - } + SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; + metaEncodeSchema(&pBuf, &sw); value.data = buf; value.size = POINTER_DISTANCE(pBuf, buf); @@ -197,6 +195,38 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { } /* ------------------------ STATIC METHODS ------------------------ */ +static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) { + int tlen = 0; + SSchema *pSchema; + + tlen += taosEncodeFixedU32(buf, pSW->nCols); + for (int i = 0; i < pSW->nCols; i++) { + pSchema = pSW->pSchema + i; + tlen += taosEncodeFixedI8(buf, pSchema->type); + tlen += taosEncodeFixedI32(buf, pSchema->colId); + tlen += taosEncodeFixedI32(buf, pSchema->bytes); + tlen += taosEncodeString(buf, pSchema->name); + } + + return tlen; +} + +static void *metaDecodeSchema(void *buf, SSchemaWrapper *pSW) { + SSchema *pSchema; + + buf = taosDecodeFixedU32(buf, &pSW->nCols); + pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * pSW->nCols); + for (int i = 0; i < pSW->nCols; i++) { + pSchema = pSW->pSchema + i; + buf = taosDecodeFixedI8(buf, &pSchema->type); + buf = taosDecodeFixedI32(buf, &pSchema->colId); + buf = taosDecodeFixedI32(buf, &pSchema->bytes); + buf = taosDecodeStringTo(buf, pSchema->name); + } + + return buf; +} + static SMetaDB *metaNewDB() { SMetaDB *pDB = NULL; pDB = (SMetaDB *)calloc(1, sizeof(*pDB)); @@ -376,15 +406,8 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { tsize += taosEncodeFixedU8(buf, pTbCfg->type); if (pTbCfg->type == META_SUPER_TABLE) { - tsize += taosEncodeVariantU32(buf, pTbCfg->stbCfg.nTagCols); - for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { - tsize += taosEncodeFixedI8(buf, pTbCfg->stbCfg.pTagSchema[i].type); - tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].colId); - tsize += taosEncodeFixedI32(buf, pTbCfg->stbCfg.pTagSchema[i].bytes); - tsize += taosEncodeString(buf, pTbCfg->stbCfg.pTagSchema[i].name); - } - - // tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema); + SSchemaWrapper sw = {.nCols = pTbCfg->stbCfg.nTagCols, .pSchema = pTbCfg->stbCfg.pTagSchema}; + tsize += metaEncodeSchema(buf, &sw); } else if (pTbCfg->type == META_CHILD_TABLE) { tsize += taosEncodeFixedU64(buf, pTbCfg->ctbCfg.suid); tsize += tdEncodeKVRow(buf, pTbCfg->ctbCfg.pTag); @@ -403,14 +426,10 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); if (pTbCfg->type == META_SUPER_TABLE) { - buf = taosDecodeVariantU32(buf, &(pTbCfg->stbCfg.nTagCols)); - pTbCfg->stbCfg.pTagSchema = (SSchema *)malloc(sizeof(SSchema) * pTbCfg->stbCfg.nTagCols); - for (uint32_t i = 0; i < pTbCfg->stbCfg.nTagCols; i++) { - buf = taosDecodeFixedI8(buf, &(pTbCfg->stbCfg.pTagSchema[i].type)); - buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].colId); - buf = taosDecodeFixedI32(buf, &pTbCfg->stbCfg.pTagSchema[i].bytes); - buf = taosDecodeStringTo(buf, pTbCfg->stbCfg.pTagSchema[i].name); - } + SSchemaWrapper sw; + buf = metaDecodeSchema(buf, &sw); + pTbCfg->stbCfg.nTagCols = sw.nCols; + pTbCfg->stbCfg.pTagSchema = sw.pSchema; } else if (pTbCfg->type == META_CHILD_TABLE) { buf = taosDecodeFixedU64(buf, &(pTbCfg->ctbCfg.suid)); buf = tdDecodeKVRow(buf, &(pTbCfg->ctbCfg.pTag)); @@ -496,7 +515,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo int ret; void * pBuf; SSchema * pSchema; - SSchemaKey schemaKey = {uid, sver}; + SSchemaKey schemaKey = {uid, sver, 0}; DBT key = {0}; DBT value = {0}; @@ -507,38 +526,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo // Query ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); if (ret != 0) { + printf("failed to query schema DB since %s================\n", db_strerror(ret)); return NULL; } // Decode the schema pBuf = value.data; - taosDecodeFixedI32(&pBuf, &nCols); - if (isinline) { - pSW = (SSchemaWrapper *)malloc(sizeof(*pSW) + sizeof(SSchema) * nCols); - if (pSW == NULL) { - return NULL; - } - pSW->pSchema = POINTER_SHIFT(pSW, sizeof(*pSW)); - } else { - pSW = (SSchemaWrapper *)malloc(sizeof(*pSW)); - if (pSW == NULL) { - return NULL; - } - - pSW->pSchema = (SSchema *)malloc(sizeof(SSchema) * nCols); - if (pSW->pSchema == NULL) { - free(pSW); - return NULL; - } - } - - for (int i = 0; i < nCols; i++) { - pSchema = pSW->pSchema + i; - taosDecodeFixedI8(&pBuf, &(pSchema->type)); - taosDecodeFixedI32(&pBuf, &(pSchema->colId)); - taosDecodeFixedI32(&pBuf, &(pSchema->bytes)); - taosDecodeStringTo(&pBuf, pSchema->name); - } + pSW = malloc(sizeof(*pSW)); + metaDecodeSchema(pBuf, pSW); return pSW; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 68ff1b8557f3276f800f0bb33ea21bcbcab12444..236264873e2b62e5ba3696cccdec0fd7db1b6c1a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -161,7 +161,7 @@ int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SE char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -194,10 +194,10 @@ int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SE CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - char tbFullName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(pTableName, tbFullName); + char dbFullName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFullName); - SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName}; + SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname}; char *msg = NULL; SEpSet *pVnodeEpSet = NULL; int32_t msgLen = 0; @@ -355,19 +355,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (output->metaNum != 1 && output->metaNum != 2) { ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == output->tbMeta) { ctgError("no valid table meta got from meta rsp"); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (NULL == pCatalog->tableCache.cache) { pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.cache) { ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } @@ -375,19 +375,19 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); if (NULL == pCatalog->tableCache.stableCache) { ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } if (output->metaNum == 2) { if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { ctgError("push ctable[%s] to table cache failed", output->ctbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } if (TSDB_SUPER_TABLE != output->tbMeta->tableType) { ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } } @@ -398,26 +398,23 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname)); if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) { CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock); } else { if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) { ctgError("push table[%s] to table cache failed", output->tbFname); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } } -_return: - tfree(output->tbMeta); - CTG_RET(code); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 5979d3a147c26cbd154f0a79c92be9004d40314e..1d8a48dfcbb5670c6a997afb4364b7f3341977b8 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -557,6 +557,8 @@ void *ctgTestGetCtableMetaThread(void *param) { assert(0); } + tfree(tbMeta); + if (ctgTestEnableSleep) { usleep(rand()%5); } @@ -592,6 +594,8 @@ void *ctgTestSetCtableMetaThread(void *param) { } } + tfree(output.tbMeta); + return NULL; } @@ -944,7 +948,6 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } -#endif TEST(multiThread, getSetDbVgroupCase) { struct SCatalog* pCtg = NULL; @@ -996,6 +999,9 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +#endif + + TEST(multiThread, ctableMeta) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1024,8 +1030,9 @@ TEST(multiThread, ctableMeta) { pthread_attr_init(&thattr); pthread_t thread1, thread2; - pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); pthread_create(&(thread1), &thattr, ctgTestSetCtableMetaThread, pCtg); + sleep(1); + pthread_create(&(thread1), &thattr, ctgTestGetCtableMetaThread, pCtg); while (true) { if (ctgTestDeadLoop) { diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 048c9e804e6114017f2f91ed7bb2e70c50b52841..6e1256d857ee0d4accbf67912354e94d1e681df5 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy); extern void* indexQhandle; +typedef struct TFileCacheKey { + uint64_t suid; + uint8_t colType; + char* colName; + int32_t nColName; +} ICacheKey; + int indexFlushCacheTFile(SIndex* sIdx, void*); +int32_t indexSerialCacheKey(ICacheKey* key, char* buf); + #define indexFatal(...) \ do { \ if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 12b66bca2cab9ff4b658f7aeaacd9268facca6a3..805137ccaf8e95885b77e1d440372d8141bf77e7 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -42,6 +42,7 @@ typedef struct IndexCache { int32_t version; int32_t nTerm; int8_t type; + uint64_t suid; pthread_mutex_t mtx; } IndexCache; @@ -58,7 +59,7 @@ typedef struct CacheTerm { } CacheTerm; // -IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); +IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type); void indexCacheDestroy(void* cache); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 4928e01a6322ae487c00580d935ba37d9f0c00fd..4618a39197d1d87b1e9d50f895578e9d2dd8fb85 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -49,13 +49,6 @@ typedef struct TFileValue { int32_t offset; } TFileValue; -typedef struct TFileCacheKey { - uint64_t suid; - uint8_t colType; - char* colName; - int32_t nColName; -} TFileCacheKey; - // table cache // refactor to LRU cache later typedef struct TFileCache { @@ -103,10 +96,10 @@ typedef struct TFileReaderOpt { // tfile cache, manage tindex reader TFileCache* tfileCacheCreate(const char* path); void tfileCacheDestroy(TFileCache* tcache); -TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); -void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); +TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key); +void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader); -TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); +TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); @@ -124,6 +117,7 @@ int tfileWriterFinish(TFileWriter* tw); // IndexTFile* indexTFileCreate(const char* path); +void indexTFileDestroy(IndexTFile* tfile); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result); diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 21c5ca155b3bb252429be2dd6c30a344694b168a..adeb52bb8c989332ce8284d24d2369685c147323 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -34,7 +34,7 @@ extern "C" { #define SERIALIZE_VAR_TO_BUF(buf, var, type) \ do { \ type c = var; \ - assert(sizeof(var) == sizeof(type)); \ + assert(sizeof(type) == sizeof(c)); \ memcpy((void*)buf, (void*)&c, sizeof(c)); \ buf += sizeof(c); \ } while (0) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 5167196031eb822b5540fa3c7b224e64ac859b0a..9c7320b301058a0c91e59d28dc676f9da29f385b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -17,6 +17,7 @@ #include "indexInt.h" #include "index_cache.h" #include "index_tfile.h" +#include "index_util.h" #include "tdef.h" #include "tsched.h" @@ -102,6 +103,7 @@ void indexClose(SIndex* sIdx) { } taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); + indexTFileDestroy(sIdx->tindex); #endif free(sIdx->path); free(sIdx); @@ -130,18 +132,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { - SIndexTerm* p = taosArrayGetP(fVals, i); - IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + SIndexTerm* p = taosArrayGetP(fVals, i); + + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { - IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); - taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); + IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType); + taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); } } pthread_mutex_unlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { - SIndexTerm* p = taosArrayGetP(fVals, i); - IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + SIndexTerm* p = taosArrayGetP(fVals, i); + + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); int ret = indexCachePut(*cache, p, uid); if (ret != 0) { return ret; } @@ -296,7 +308,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // Get col info IndexCache* cache = NULL; pthread_mutex_lock(&sIdx->mtx); - IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName); + + char buf[128] = {0}; + ICacheKey key = {.suid = term->suid, .colName = term->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); if (pCache == NULL) { pthread_mutex_unlock(&sIdx->mtx); return -1; @@ -360,6 +377,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { if (sz > 0) { // TODO(yihao): remove duplicate tableid TFileValue* lv = taosArrayGetP(result, sz - 1); + // indexError("merge colVal: %s", lv->colVal); if (strcmp(lv->colVal, tv->colVal) == 0) { taosArrayAddAll(lv->tableId, tv->tableId); tfileValueDestroy(tv); @@ -368,6 +386,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { } } else { taosArrayPush(result, &tv); + // indexError("merge colVal: %s", tv->colVal); } } static void indexDestroyTempResult(SArray* result) { @@ -383,10 +402,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); IndexCache* pCache = (IndexCache*)cache; - TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); + if (pReader == NULL) { indexWarn("empty pReader found"); } // handle flush Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* tfileIter = tfileIteratorCreate(pReader); + if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); } SArray* result = taosArrayInit(1024, sizeof(void*)); @@ -459,14 +480,14 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { } else { if (value->val != NULL) { taosArrayClear(value->val); } } - // free(value->colVal); + free(value->colVal); value->colVal = NULL; } static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { int32_t version = CACHE_VERSION(cache); uint8_t colType = cache->type; - TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType); + TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); if (tw == NULL) { indexError("failed to open file to write"); return -1; @@ -479,14 +500,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { } tfileWriterClose(tw); - TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName); + TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); + + char buf[128] = {0}; + TFileHeader* header = &reader->header; + ICacheKey key = { + .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; - char buf[128] = {0}; - TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; pthread_mutex_lock(&sIdx->mtx); IndexTFile* ifile = (IndexTFile*)sIdx->tindex; @@ -497,3 +517,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { END: tfileWriterClose(tw); } + +int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { + char* p = buf; + SERIALIZE_MEM_TO_BUF(buf, key, suid); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, colType); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + return buf - p; +} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 0e46445a00db0a2bcc4c40b03b4b6ea95fa64e52..b4c533e998647c6540da8eef35604a4f05922377 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define MEM_TERM_LIMIT 10000 * 10 +#define MEM_TERM_LIMIT 10 * 10000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + @@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera); static IterateValue* indexCacheIteratorGetValue(Iterate* iter); -IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { +IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); if (cache == NULL) { indexError("failed to create index cache"); @@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { cache->type = type; cache->index = idx; cache->version = 0; - + cache->suid = suid; pthread_mutex_init(&cache->mtx, NULL); indexCacheRef(cache); return cache; @@ -150,6 +150,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) { MemTable* tbl = cache->imm; iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); + iiter->val.colVal = NULL; iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL; iiter->next = indexCacheIteratorNext; iiter->getValue = indexCacheIteratorGetValue; @@ -353,6 +354,9 @@ static bool indexCacheIteratorNext(Iterate* itera) { SSkipListIterator* iter = itera->iter; if (iter == NULL) { return false; } IterateValue* iv = &itera->val; + if (iv->colVal != NULL && iv->val != NULL) { + // indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val)); + } iterateValueDestroy(iv, false); bool next = tSkipListIterNext(iter); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 04a08dafd2641f7fd1f91d660d8c4ff4013abeb2..bfaeeaaa33425c8c51f832e8c2512f4906cedffa 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -319,7 +319,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) { assert(s->state == OneTransNext || s->state == OneTrans); uint8_t val; - COMMON_INDEX(inp, 0x111111, val); + COMMON_INDEX(inp, 0b111111, val); s->val = (s->val & fstStateDict[s->state].val) | val; } @@ -369,7 +369,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) { bool null = false; uint8_t inp = fstStateCommInput(s, &null); uint8_t* data = fstSliceData(slice, NULL); - return null == false ? inp : data[-1]; + return null == false ? inp : data[node->start - 1]; } uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) { assert(s->state == AnyTrans); @@ -1062,6 +1062,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) { } else { *null = true; } + fstNodeDestroy(node); return res; } @@ -1286,6 +1287,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState); free(buf); fstSliceDestroy(&slice); + taosArrayDestroy(nodes); return result; } free(buf); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index fc4f8593a105f5898fb977a8de0110bf83e5d6e5..95c713fb0a5a328c50761776db76725da2fa0229 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -51,7 +51,6 @@ static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); -static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); @@ -80,18 +79,18 @@ TFileCache* tfileCacheCreate(const char* path) { goto End; } - char buf[128] = {0}; - TFileReader* reader = tfileReaderCreate(wc); - TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; - tfileSerialCacheKey(&key, buf); - + char buf[128] = {0}; + TFileReader* reader = tfileReaderCreate(wc); + TFileHeader* header = &reader->header; + ICacheKey key = {.suid = header->suid, + .colName = header->colName, + .nColName = strlen(header->colName), + .colType = header->colType}; + + int32_t sz = indexSerialCacheKey(&key, buf); + assert(sz < sizeof(buf)); + taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); tfileReaderRef(reader); - // indexTable - taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); } taosArrayDestroyEx(files, tfileDestroyFileName); return tcache; @@ -117,30 +116,30 @@ void tfileCacheDestroy(TFileCache* tcache) { free(tcache); } -TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { - char buf[128] = {0}; - tfileSerialCacheKey(key, buf); - - TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); +TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { + char buf[128] = {0}; + int32_t sz = indexSerialCacheKey(key, buf); + assert(sz < sizeof(buf)); + TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); if (reader == NULL) { return NULL; } tfileReaderRef(*reader); return *reader; } -void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { - char buf[128] = {0}; - tfileSerialCacheKey(key, buf); +void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { + char buf[128] = {0}; + int32_t sz = indexSerialCacheKey(key, buf); // remove last version index reader - TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf)); + TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); if (p != NULL) { TFileReader* oldReader = *p; - taosHashRemove(tcache->tableCache, buf, strlen(buf)); + taosHashRemove(tcache->tableCache, buf, sz); oldReader->remove = true; tfileReaderUnRef(oldReader); } + taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); tfileReaderRef(reader); - taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } TFileReader* tfileReaderCreate(WriterCtx* ctx) { @@ -230,8 +229,6 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c TFileReader* reader = tfileReaderCreate(wc); return reader; - - // tfileSerialCacheKey(&key, buf); } TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { // char pathBuf[128] = {0}; @@ -325,15 +322,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { tfileWriterClose(tw); return -1; } - // write fst + + // write data indexError("--------Begin----------------"); for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later TFileValue* v = taosArrayGetP((SArray*)data, i); - if (tfileWriteData(tw, v) == 0) { - // + if (tfileWriteData(tw, v) != 0) { + indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, + (int)taosArrayGetSize(v->tableId)); + } else { + indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, + (int)taosArrayGetSize(v->tableId)); } - indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } indexError("--------End----------------"); fstBuilderFinish(tw->fb); @@ -359,7 +360,7 @@ IndexTFile* indexTFileCreate(const char* path) { tfile->cache = tfileCacheCreate(path); return tfile; } -void IndexTFileDestroy(IndexTFile* tfile) { +void indexTFileDestroy(IndexTFile* tfile) { tfileCacheDestroy(tfile->cache); free(tfile); } @@ -369,9 +370,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { if (tfile == NULL) { return ret; } IndexTFile* pTfile = (IndexTFile*)tfile; - SIndexTerm* term = query->term; - TFileCacheKey key = { - .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + SIndexTerm* term = query->term; + ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; TFileReader* reader = tfileCacheGet(pTfile->cache, &key); if (reader == NULL) { return 0; } @@ -385,8 +385,10 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { } static bool tfileIteratorNext(Iterate* iiter) { IterateValue* iv = &iiter->val; + if (iv->colVal != NULL && iv->val != NULL) { + // indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val)); + } iterateValueDestroy(iv, false); - // SArray* tblIds = iv->val; char* colVal = NULL; uint64_t offset = 0; @@ -406,14 +408,14 @@ static bool tfileIteratorNext(Iterate* iiter) { if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } iv->colVal = colVal; - + return true; // std::string key(ch, sz); } static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { - TFileFstIter* tIter = calloc(1, sizeof(Iterate)); + TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter)); if (tIter == NULL) { return NULL; } tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); @@ -435,6 +437,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) { iter->next = tfileIteratorNext; iter->getValue = tifileIterateGetValue; iter->val.val = taosArrayInit(1, sizeof(uint64_t)); + iter->val.colVal = NULL; return iter; } void tfileIteratorDestroy(Iterate* iter) { @@ -447,13 +450,14 @@ void tfileIteratorDestroy(Iterate* iter) { streamWithStateDestroy(tIter->st); fstStreamBuilderDestroy(tIter->fb); automCtxDestroy(tIter->ctx); + free(tIter); free(iter); } -TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { +TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { if (tf == NULL) { return NULL; } - TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; return tfileCacheGet(tf->cache, &key); } @@ -480,7 +484,7 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { TFileValue* tfileValueCreate(char* val) { TFileValue* tf = calloc(1, sizeof(TFileValue)); if (tf == NULL) { return NULL; } - tf->colVal = val; + tf->colVal = tstrdup(val); tf->tableId = taosArrayInit(32, sizeof(uint64_t)); return tf; } @@ -491,6 +495,7 @@ int tfileValuePush(TFileValue* tf, uint64_t val) { } void tfileValueDestroy(TFileValue* tf) { taosArrayDestroy(tf->tableId); + free(tf->colVal); free(tf); } static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { @@ -648,10 +653,3 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, } return -1; } -static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { - // SERIALIZE_MEM_TO_BUF(buf, key, suid); - // SERIALIZE_VAR_TO_BUF(buf, '_', char); - // SERIALIZE_MEM_TO_BUF(buf, key, colType); - // SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); -} diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index da974ce6c4be500370eefe4b112f507b5e7912e6..3d978c05a5725e8f6551e1b3e15a0bffc2353663 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -24,8 +24,13 @@ class FstWriter { _b = fstBuilderCreate(_wc, 0); } bool Put(const std::string& key, uint64_t val) { + // char buf[128] = {0}; + // int len = 0; + // taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len); + // FstSlice skey = fstSliceCreate((uint8_t*)buf, len); FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); bool ok = fstBuilderInsert(_b, skey, val); + fstSliceDestroy(&skey); return ok; } @@ -61,6 +66,11 @@ class FstReadMemory { return _fst != NULL; } bool Get(const std::string& key, uint64_t* val) { + // char buf[128] = {0}; + // int len = 0; + // taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len); + // FstSlice skey = fstSliceCreate((uint8_t*)buf, len); + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); bool ok = fstGet(_fst, &skey, val); fstSliceDestroy(&skey); @@ -135,15 +145,109 @@ int Performance_fstWriteRecords(FstWriter* b) { } return L * M * N; } +void Performance_fstReadRecords(FstReadMemory* m) { + std::string str("aa"); + for (int i = 0; i < M; i++) { + str[0] = 'a' + i; + str.resize(2); + for (int j = 0; j < N; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < L; k++) { + str.push_back('a'); + uint64_t val, cost; + if (m->GetWithTimeCostUs(str, &val, &cost)) { + printf("succes to get kv(%s, %" PRId64 "), cost: %" PRId64 "\n", str.c_str(), val, cost); + } else { + printf("failed to get key: %s\n", str.c_str()); + } + } + } + } +} + +void checkMillonWriteAndReadOfFst() { + tfInit(); + FstWriter* fw = new FstWriter; + Performance_fstWriteRecords(fw); + delete fw; + FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024); + + if (fr->init()) { printf("success to init fst read"); } + + Performance_fstReadRecords(fr); + tfCleanup(); + delete fr; +} +void checkFstLongTerm() { + tfInit(); + FstWriter* fw = new FstWriter; + // Performance_fstWriteRecords(fw); + + fw->Put("A B", 1); + fw->Put("C", 2); + fw->Put("a", 3); + delete fw; + + FstReadMemory* m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + delete m; + return; + } + { + uint64_t val = 0; + if (m->Get("A B", &val)) { + std::cout << "success to Get: " << val << std::endl; + } else { + std::cout << "failed to Get:" << val << std::endl; + } + } + { + uint64_t val = 0; + if (m->Get("C", &val)) { + std::cout << "success to Get: " << val << std::endl; + } else { + std::cout << "failed to Get:" << val << std::endl; + } + } + { + uint64_t val = 0; + if (m->Get("a", &val)) { + std::cout << "success to Get: " << val << std::endl; + } else { + std::cout << "failed to Get:" << val << std::endl; + } + } + + // prefix search + // std::vector result; + + // AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + // m->Search(ctx, result); + // std::cout << "size: " << result.size() << std::endl; + // assert(result.size() == count); + // for (int i = 0; i < result.size(); i++) { + // assert(result[i] == i); // check result + //} + tfCleanup(); + // free(ctx); + // delete m; +} void checkFstCheckIterator() { tfInit(); FstWriter* fw = new FstWriter; int64_t s = taosGetTimestampUs(); int count = 2; - Performance_fstWriteRecords(fw); + // Performance_fstWriteRecords(fw); int64_t e = taosGetTimestampUs(); std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; + + fw->Put("Hello world", 1); + fw->Put("hello world", 2); + fw->Put("hello worle", 3); + fw->Put("hello worlf", 4); delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); @@ -171,7 +275,7 @@ void checkFstCheckIterator() { void fst_get(Fst* fst) { for (int i = 0; i < 10000; i++) { - std::string term = "Hello"; + std::string term = "Hello World"; FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size()); uint64_t offset = 0; bool ret = fstGet(fst, &key, &offset); @@ -189,7 +293,7 @@ void validateTFile(char* arg) { std::thread threads[NUM_OF_THREAD]; // std::vector threads; - TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1"); + TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1"); for (int i = 0; i < NUM_OF_THREAD; i++) { threads[i] = std::thread(fst_get, reader->fst); @@ -203,9 +307,12 @@ void validateTFile(char* arg) { tfCleanup(); } int main(int argc, char* argv[]) { - if (argc > 1) { validateTFile(argv[1]); } + // tool to check all kind of fst test + // if (argc > 1) { validateTFile(argv[1]); } // checkFstCheckIterator(); + // checkFstLongTerm(); // checkFstPrefixSearch(); + checkMillonWriteAndReadOfFst(); return 1; } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 080becccf183cd374ed833895c02b1f5a949803f..bdfb86ce1714ab3b309091718c1b72676b8120fd 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -457,7 +457,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) { // taosArrayPush(data, &v4); fObj->Put(data); - for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); } + for (size_t i = 0; i < taosArrayGetSize(data); i++) { + // data + destroyTFileValue(taosArrayGetP(data, i)); + } taosArrayDestroy(data); std::string colName("voltage"); @@ -470,6 +473,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) { fObj->Get(&query, result); assert(taosArrayGetSize(result) == 200); indexTermDestroy(term); + taosArrayDestroy(result); // tfileWriterDestroy(twrite); } @@ -477,7 +481,7 @@ class CacheObj { public: CacheObj() { // TODO - cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY); + cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY); } int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { int ret = indexCachePut(cache, term, uid); @@ -534,6 +538,7 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); // indexTermDestry(term); } { @@ -541,24 +546,28 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); } { std::string colVal("v2"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); } { std::string colVal("v3"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); } { std::string colVal("v3"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); } coj->Debug(); std::cout << "--------first----------" << std::endl; @@ -567,12 +576,14 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); + indexTermDestroy(term); } { std::string colVal("v4"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); + indexTermDestroy(term); } coj->Debug(); std::cout << "--------second----------" << std::endl; @@ -583,6 +594,7 @@ TEST_F(IndexCacheEnv, cache_test) { SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); + indexTermDestroy(term); } } coj->Debug(); @@ -598,6 +610,9 @@ TEST_F(IndexCacheEnv, cache_test) { coj->Get(&query, colId, 10000, ret, &valType); std::cout << "size : " << taosArrayGetSize(ret) << std::endl; assert(taosArrayGetSize(ret) == 4); + taosArrayDestroy(ret); + + indexTermDestroy(term); } { std::string colVal("v2"); @@ -609,6 +624,9 @@ TEST_F(IndexCacheEnv, cache_test) { coj->Get(&query, colId, 10000, ret, &valType); assert(taosArrayGetSize(ret) == 1); + taosArrayDestroy(ret); + + indexTermDestroy(term); } } class IndexObj { @@ -678,13 +696,16 @@ class IndexObj { SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; } - return taosArrayGetSize(result); + int sz = taosArrayGetSize(result); + indexMultiTermQueryDestroy(mq); + taosArrayDestroy(result); + return sz; // assert(taosArrayGetSize(result) == targetSize); } void PutOne(const std::string& colName, const std::string& colVal) { + SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); - SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); Put(terms, 10); indexMultiTermDestroy(terms); @@ -783,18 +804,21 @@ TEST_F(IndexEnv2, testIndexOpen) { index->Search(mq, result); std::cout << "target size: " << taosArrayGetSize(result) << std::endl; assert(taosArrayGetSize(result) == 400); + taosArrayDestroy(result); + indexMultiTermQueryDestroy(mq); } } TEST_F(IndexEnv2, testIndex_TrigeFlush) { - std::string path = "/tmp/test"; + std::string path = "/tmp/test1"; if (index->Init(path) != 0) { // r std::cout << "failed to init" << std::endl; } int numOfTable = 100 * 10000; - index->WriteMillonData("tag1", "Hello", numOfTable); - int target = index->SearchOne("tag1", "Hello"); + index->WriteMillonData("tag1", "Hello Wolrd", numOfTable); + int target = index->SearchOne("tag1", "Hello Wolrd"); + std::cout << "Get Index: " << target << std::endl; assert(numOfTable == target); } @@ -821,14 +845,6 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { threads[i].join(); } } -TEST_F(IndexEnv2, testIndex_multi_thread_write) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} -} -TEST_F(IndexEnv2, testIndex_multi_thread_read) { - std::string path = "/tmp"; - if (index->Init(path) != 0) {} -} TEST_F(IndexEnv2, testIndex_restart) { std::string path = "/tmp"; diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 4bbe6ab9078f5e7b43f3cc7f84e39150cf467936..346bd0cbe4c13e7f22a97b8725ccdddc05de9656 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ * @param type * @return */ -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index fd99cb6f669ab944b5bd042cf293fb14ed5c5165..76e26c159b6d1c127c2ea02f55d918f3be1c04c0 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -35,7 +35,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); SArray* array = NULL; - SName name = {0}; + SName name = {0}; tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db)); char dbFname[TSDB_DB_FNAME_LEN] = {0}; @@ -48,7 +48,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou pEpSet->numOfEps = info->numOfEps; pEpSet->inUse = info->inUse; - for(int32_t i = 0; i < pEpSet->numOfEps; ++i) { + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { strncpy(pEpSet->fqdn[i], info->epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); pEpSet->port[i] = info->epAddr[i].port; } @@ -190,7 +190,7 @@ static int32_t doCheckDbOptions(SCreateDbMsg* pCreate, SMsgBuf* pMsgBuf) { val = htonl(pCreate->numOfVgroups); if (val < TSDB_MIN_VNODES_PER_DB || val > TSDB_MAX_VNODES_PER_DB) { snprintf(msg, tListLen(msg), "invalid number of vgroups for DB:%d valid range: [%d, %d]", val, - TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); + TSDB_MIN_VNODES_PER_DB, TSDB_MAX_VNODES_PER_DB); } return TSDB_CODE_SUCCESS; @@ -321,8 +321,12 @@ int32_t doCheckForCreateTable(SSqlInfo* pInfo, SMsgBuf* pMsgBuf) { return TSDB_CODE_SUCCESS; } -int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, - SEpSet* pEpSet) { +typedef struct SVgroupTablesBatch { + SVCreateTbBatchReq req; + SVgroupInfo info; +} SVgroupTablesBatch; + +int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { const char* msg1 = "invalid table name"; const char* msg2 = "tags number not matched"; const char* msg3 = "tag value too long"; @@ -330,17 +334,14 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; + SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + // super table name, create table by using dst - int32_t numOfTables = (int32_t)taosArrayGetSize(pCreateTable->childTableInfo); + size_t numOfTables = taosArrayGetSize(pCreateTable->childTableInfo); for (int32_t j = 0; j < numOfTables; ++j) { SCreatedTableInfo* pCreateTableInfo = taosArrayGet(pCreateTable->childTableInfo, j); SToken* pSTableNameToken = &pCreateTableInfo->stbName; - - char buf[TSDB_TABLE_FNAME_LEN]; - SToken sTblToken; - sTblToken.z = buf; - int32_t code = parserValidateNameToken(pSTableNameToken); if (code != TSDB_CODE_SUCCESS) { return buildInvalidOperationMsg(pMsgBuf, msg1); @@ -357,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p size_t numOfInputTag = taosArrayGetSize(pValList); STableMeta* pSuperTableMeta = NULL; - catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + assert(pSuperTableMeta != NULL); // too long tag values will return invalid sql, not be truncated automatically @@ -460,12 +465,13 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p for (int32_t i = 0; i < numOfInputTag; ++i) { SSchema* pSchema = &pTagSchema[i]; - SToken* pItem = taosArrayGet(pValList, i); - char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; + char* endPtr = NULL; + char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; + SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema}; - char* endPtr = NULL; + SToken* pItem = taosArrayGet(pValList, i); code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, ¶m, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { @@ -478,7 +484,7 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder); if (row == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; + return TSDB_CODE_QRY_OUT_OF_MEMORY; } tdSortKVRowByColIdx(row); @@ -489,40 +495,73 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p return code; } + SVgroupInfo info = {0}; + catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(tNameGetTableName(&tableName)); req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.pTag = row; - int32_t serLen = sizeof(SMsgHead) + tSerializeSVCreateTbReq(NULL, &req); - char* buf1 = calloc(1, serLen); - *pOutput = buf1; - buf1 += sizeof(SMsgHead); - tSerializeSVCreateTbReq((void*)&buf1, &req); - *len = serLen; + SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); + if (pTableBatch == NULL) { + SVgroupTablesBatch tBatch = {0}; + tBatch.info = info; - SVgroupInfo info = {0}; - catalogGetTableHashVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &tableName, &info); + tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq)); + taosArrayPush(tBatch.req.pArray, &req); - pEpSet->inUse = info.inUse; - pEpSet->numOfEps = info.numOfEps; - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - pEpSet->port[i] = info.epAddr[i].port; - tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); + taosHashPut(pVgroupHashmap, &info.vgId, sizeof(info.vgId), &tBatch, sizeof(tBatch)); + } else { // add to the correct vgroup + assert(info.vgId == pTableBatch->info.vgId); + taosArrayPush(pTableBatch->req.pArray, &req); } - - ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); - ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); } + // TODO: serialize and + SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); + + SVgroupTablesBatch* pTbBatch = NULL; + do { + pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); + if (pTbBatch == NULL) break; + + int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); + void* buf = malloc(tlen); + if (buf == NULL) { + // TODO: handle error + } + + ((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId); + ((SMsgHead*)buf)->contLen = htonl(tlen); + + void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); + + SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks)); + pVgData->vg = pTbBatch->info; + pVgData->pData = buf; + pVgData->size = tlen; + pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray); + + taosArrayPush(pBufArray, &pVgData); + } while (true); + + SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + pStmtInfo->pDataBlocks = pBufArray; + *pOutput = pStmtInfo; + *len = sizeof(SInsertStmtInfo); + return TSDB_CODE_SUCCESS; } -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, - int32_t msgBufLen) { +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { int32_t code = 0; + SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; @@ -539,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pUser->passwd; if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } if (pInfo->type == TSDB_SQL_CREATE_USER) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else { if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { assert(pPwd->type == TSDB_DATA_TYPE_NULL); @@ -564,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { // pCmd->count = 2; } else { - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -586,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pInfo->pMiscInfo->user.passwd; if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; @@ -604,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -623,7 +672,11 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_SHOW: { SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); - pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW; + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; break; } @@ -632,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SName n = {0}; int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); if (ret != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); @@ -657,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } char buf[TSDB_DB_NAME_LEN] = {0}; SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } pDcl->pMsg = (char*)pCreateMsg; @@ -687,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SName name = {0}; code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); @@ -699,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgLen = sizeof(SDropDbMsg); pDcl->pMsg = (char*)pDropDbMsg; - return TSDB_CODE_SUCCESS; + break; } case TSDB_SQL_CREATE_TABLE: { @@ -707,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { - return code; + terrno = code; + goto _error; } + pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; } else if (pCreateTable->type == TSQL_CREATE_CTABLE) { - if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != + if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) != TSDB_CODE_SUCCESS) { - return code; + goto _error; } pDcl->msgType = TDMT_VND_CREATE_TABLE; @@ -729,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_TABLE: { pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_STB; @@ -739,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_CREATE_DNODE: { pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_CREATE_DNODE; @@ -749,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_DNODE: { pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_DNODE; @@ -760,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm break; } - return code; + return pDcl; + + _error: + terrno = code; + tfree(pDcl); + return NULL; } + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { + SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; + assert(pCreateTable->type == TSQL_CREATE_CTABLE); + + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; + SMsgBuf* pMsgBuf = &m; + + SInsertStmtInfo* pInsertStmt = NULL; + + int32_t msgLen = 0; + int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); + if (code != TSDB_CODE_SUCCESS) { + tfree(pInsertStmt); + return NULL; + } + + return pInsertStmt; +} \ No newline at end of file diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 710cf4b5d0091f9801e6bf0f49f0aea6d0b908ad..1b4d05808c055c8e567354b1ce2798de613f4540 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) { } bool qIsDdlQuery(const SQueryNode* pQuery) { - return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; + return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type; } int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { @@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { - SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); - if (NULL == pDcl) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. - return terrno; + bool toVnode = false; + if (info.type == TSDB_SQL_CREATE_TABLE) { + SCreateTableSql* pCreateSql = info.pCreateTableInfo; + if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) { + toVnode = true; + } } - pDcl->nodeType = info.type; - int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); - if (code == TSDB_CODE_SUCCESS) { + if (toVnode) { + SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pInsertInfo == NULL) { + return terrno; + } + + *pQuery = (SQueryNode*) pInsertInfo; + } else { + SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pDcl == NULL) { + return terrno; + } + *pQuery = (SQueryNode*)pDcl; + pDcl->nodeType = info.type; } } else { SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index a67a9a8be815cb2867181160079414c237a371bc..fe430c5f5e53b63ce8be36d896603200f894baf8 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) { SSqlInfo info1 = doGenerateAST(sql1); ASSERT_EQ(info1.valid, true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); // convert the show command to be the select query // select name, privilege, create_time, account from information_schema.users; @@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) { ASSERT_EQ(info1.valid, true); ASSERT_EQ(isDclSqlStatement(&info1), true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); destroySqlInfo(&info1); } \ No newline at end of file diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 35c6d59ffe1214f68f1b9930d18684baed769d81..a68102ea6e677818e3945bb6fa46fab66e51bac1 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -40,7 +40,7 @@ extern "C" { #define QNODE_SESSIONWINDOW 12 #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 -#define QNODE_INSERT 15 +#define QNODE_MODIFY 15 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 136073aa6030945962b6740bfd3f74d55b30fc78..5f95f86d4ac853f0a3cb237fb1dda4842a66c421 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { +static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { + SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; + *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); if (NULL == *pQueryPlan || NULL == blocks) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - (*pQueryPlan)->info.type = QNODE_INSERT; + + (*pQueryPlan)->info.type = QNODE_MODIFY; taosArrayAddAll(blocks, pInsert->pDataBlocks); (*pQueryPlan)->pExtInfo = blocks; + return TSDB_CODE_SUCCESS; } @@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { case TSDB_SQL_SELECT: { return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); } + case TSDB_SQL_INSERT: - return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); + case TSDB_SQL_CREATE_TABLE: + return createInsertPlan(pNode, pQueryPlan); + default: return TSDB_CODE_FAILED; } - - return TSDB_CODE_SUCCESS; } int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 22a1beaa35b84444a050e9443d207ccf51109a1a..5f96d23d4e9db4d09fde91716defd5876b7181c6 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -34,7 +34,7 @@ static const char* gOpName[] = { #undef INCLUDE_AS_NAME }; -static void* vailidPointer(void* p) { +static void* validPointer(void* p) { if (NULL == p) { THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); } @@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) { } static SDataSink* initDataSink(int32_t type, int32_t size) { - SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); + SDataSink* sink = (SDataSink*)validPointer(calloc(1, size)); sink->info.type = type; sink->info.name = dsinkTypeToDsinkName(type); return sink; @@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) { } static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { - SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); + SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size)); node->info.type = type; node->info.name = opTypeToOpName(type); if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { @@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { - SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); + SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); subplan->type = type; @@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; if (NULL == pCxt->pCurrentSubplan->pChildern) { - pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); } taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); - subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); } SArray* currentLevel; if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { - currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel); } else { currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); @@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; - case QNODE_INSERT: + case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; default: @@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { } static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { - if (QNODE_INSERT == pRoot->info.type) { + if (QNODE_MODIFY == pRoot->info.type) { splitInsertSubplan(pCxt, pRoot); } else { SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); @@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { .pCatalog = pCatalog, - .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), + .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, .nextId = {0} // todo queryid }; *pDag = context.pDag; - context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); createSubplanByLevel(&context, pQueryNode); } CATCH(code) { CLEANUP_EXECUTE(); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 117297b9ffa90b93d56be5a872950a4f0ac9d1ff..b50eb2c92d4b6875d012e4146b8a85c990e2ab94 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,11 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 bMsg->header.vgId = htonl(bInput->vgId); + if (bInput->dbName) { + strncpy(bMsg->dbFname, bInput->dbName, sizeof(bMsg->dbFname)); + bMsg->dbFname[sizeof(bMsg->dbFname) - 1] = 0; + } + strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; @@ -243,9 +248,14 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { pOut->metaNum = 2; - - memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); - memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + + if (pMetaMsg->dbFname[0]) { + snprintf(pOut->ctbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); + snprintf(pOut->tbFname, "%s.%s", pMetaMsg->dbFname, pMetaMsg->stbFname); + } else { + memcpy(pOut->ctbFname, pMetaMsg->tbFname, sizeof(pOut->ctbFname)); + memcpy(pOut->tbFname, pMetaMsg->stbFname, sizeof(pOut->tbFname)); + } pOut->ctbMeta.vgId = pMetaMsg->vgId; pOut->ctbMeta.tableType = pMetaMsg->tableType; @@ -256,7 +266,11 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { } else { pOut->metaNum = 1; - memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + if (pMetaMsg->dbFname[0]) { + snprintf(pOut->tbFname, sizeof(pOut->tbFname), "%s.%s", pMetaMsg->dbFname, pMetaMsg->tbFname); + } else { + memcpy(pOut->tbFname, pMetaMsg->tbFname, sizeof(pOut->tbFname)); + } code = queryCreateTableMetaFromMsg(pMetaMsg, false, &pOut->tbMeta); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f1799507df67520d966ac527d0da03b0b75ad8f..20eb94c2ff65333fda5834a77359188b9ee2cb2f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -664,7 +664,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SSubQueryMsg *pMsg = msg; - + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 79aaa1beb0a48856814f310c1824a695b1ee0082..bfc3906f79e97d00b7d2282d79628055e61e23f6 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -33,4 +33,12 @@ ENDIF() INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc) +# freelistTest +add_executable(freelistTest "") +target_sources(freelistTest + PRIVATE + "freelistTest.cpp" +) +target_link_libraries(freelistTest os util gtest gtest_main) + diff --git a/source/util/test/freelistTest.cpp b/source/util/test/freelistTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7a4e8be5b7f908fdccfe524c2ec2729a6090ec90 --- /dev/null +++ b/source/util/test/freelistTest.cpp @@ -0,0 +1,16 @@ +#include "gtest/gtest.h" + +#include "freelist.h" + +TEST(TD_UTIL_FREELIST_TEST, simple_test) { + SFreeList fl; + + tFreeListInit(&fl); + + for (size_t i = 0; i < 1000; i++) { + void *ptr = TFL_MALLOC(1024, &fl); + GTEST_ASSERT_NE(ptr, nullptr); + } + + tFreeListClear(&fl); +} \ No newline at end of file