diff --git a/cmake/bdb_CMakeLists.txt.in.bak b/cmake/bdb_CMakeLists.txt.in similarity index 100% rename from cmake/bdb_CMakeLists.txt.in.bak rename to cmake/bdb_CMakeLists.txt.in diff --git a/cmake/cmake.options b/cmake/cmake.options index a60f5c728296c4a0d8906dba3b2bdbb60537efde..d83ab49fd5fa6e987fb8a3a7e82c770c2387fd78 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -78,6 +78,12 @@ option( OFF ) +option( + BUILD_WITH_BDB + "If build with BDB" + OFF +) + option( BUILD_WITH_LUCENE "If build with lucene" diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 14a85ee4f6f028b3af52f8e78c0029fc34a1223a..97bfcfb8c039c05a4d997fb2b6adadb2a00d0b75 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -78,9 +78,9 @@ if(${BUILD_WITH_UV}) endif(${BUILD_WITH_UV}) # bdb -#if(${BUILD_WITH_BDB}) - #cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -#endif(${BUILD_WITH_BDB}) +if(${BUILD_WITH_BDB}) + cat("${TD_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(${BUILD_WITH_BDB}) # sqlite if(${BUILD_WITH_SQLITE}) diff --git a/contrib/test/CMakeLists.txt b/contrib/test/CMakeLists.txt index 740488b39b1efa7de141196983aa6a46dda9f9a9..eacaeb9524be5dde7a231cfd7090f8dfe45f61ae 100644 --- a/contrib/test/CMakeLists.txt +++ b/contrib/test/CMakeLists.txt @@ -7,9 +7,9 @@ if(${BUILD_WITH_LUCENE}) add_subdirectory(lucene) endif(${BUILD_WITH_LUCENE}) -#if(${BUILD_WITH_BDB}) - #add_subdirectory(bdb) -#endif(${BUILD_WITH_BDB}) +if(${BUILD_WITH_BDB}) + add_subdirectory(bdb) +endif(${BUILD_WITH_BDB}) if(${BUILD_WITH_SQLITE}) add_subdirectory(sqlite) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1fde647603f0db1f1ee8a128ff837e9188616cbb..3e03a51852c7ccb517bc6102e08364b6ee0232e1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1673,6 +1673,7 @@ typedef struct SVDropStbReq { int32_t tEncodeSVDropStbReq(SEncoder* pCoder, const SVDropStbReq* pReq); int32_t tDecodeSVDropStbReq(SDecoder* pCoder, SVDropStbReq* pReq); +// TDMT_VND_CREATE_TABLE ============== #define TD_CREATE_IF_NOT_EXISTS 0x1 typedef struct SVCreateTbReq { int32_t flags; @@ -1762,6 +1763,43 @@ typedef struct { int32_t tEncodeSVDropTbBatchRsp(SEncoder* pCoder, const SVDropTbBatchRsp* pRsp); int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp); +// TDMT_VND_ALTER_TABLE ===================== +typedef struct { + const char* tbName; + int8_t action; + const char* colName; + // TSDB_ALTER_TABLE_ADD_COLUMN + int8_t type; + int8_t flags; + int32_t bytes; + // TSDB_ALTER_TABLE_DROP_COLUMN + // TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES + int32_t colModBytes; + // TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME + const char* colNewName; + // TSDB_ALTER_TABLE_UPDATE_TAG_VAL + const char* tagName; + int8_t isNull; + uint32_t nTagVal; + const uint8_t* pTagVal; + // TSDB_ALTER_TABLE_UPDATE_OPTIONS + int8_t updateTTL; + int32_t newTTL; + int8_t updateComment; + const char* newComment; +} SVAlterTbReq; + +int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); +int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq); + +typedef struct { + int32_t code; +} SVAlterTbRsp; + +int32_t tEncodeSVAlterTbRsp(SEncoder* pEncoder, const SVAlterTbRsp* pRsp); +int32_t tDecodeSVAlterTbRsp(SDecoder* pDecoder, SVAlterTbRsp* pRsp); +// ====================== + typedef struct { SMsgHead head; int64_t uid; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ca10465ed40962c5138839004f3301f4f9f5ce18..551e0fc7b84b5847755ff2ca1d3edeab2cb87d2d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -144,6 +144,7 @@ int32_t syncInit(); void syncCleanUp(); int64_t syncOpen(const SSyncInfo* pSyncInfo); void syncStart(int64_t rid); +void syncStartStandBy(int64_t rid); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); ESyncState syncGetMyRole(int64_t rid); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 96fb210e26c1c1bbc88646c4e04a780d3c08b482..32bda084ed5ab1ddac400354264f18545d88e001 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -323,6 +323,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_SMA_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0516) #define TSDB_CODE_VND_HASH_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x0517) #define TSDB_CODE_VND_TABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0518) +#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) +#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) +#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/include/util/tencode.h b/include/util/tencode.h index 2a43d7934f41091587d17fb2f60a397b8d705473..e49429f865067f61cae6ae766ea9b0f6fca216dc 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -18,7 +18,6 @@ #include "tcoding.h" #include "tlist.h" -// #include "tfreelist.h" #ifdef __cplusplus extern "C" { diff --git a/include/util/tfreelist.h b/include/util/tfreelist.h deleted file mode 100644 index e9b5ca5fcaa0e26c45cd01d809ce9a4d7def42a9..0000000000000000000000000000000000000000 --- a/include/util/tfreelist.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_UTIL_FREELIST_H_ -#define _TD_UTIL_FREELIST_H_ - -#include "tlist.h" - -#ifdef __cplusplus -extern "C" { -#endif - -struct SFreeListNode { - TD_SLIST_NODE(SFreeListNode); - char payload[]; -}; - -typedef TD_SLIST(SFreeListNode) SFreeList; - -#define TFL_MALLOC(PTR, TYPE, SIZE, LIST) \ - do { \ - void *ptr = taosMemoryMalloc((SIZE) + sizeof(struct SFreeListNode)); \ - if (ptr) { \ - TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \ - ptr = ((struct SFreeListNode *)ptr)->payload; \ - (PTR) = (TYPE)(ptr); \ - }else{ \ - (PTR) = NULL; \ - } \ - }while(0); - -#define tFreeListInit(pFL) TD_SLIST_INIT(pFL) - -static FORCE_INLINE void tFreeListClear(SFreeList *pFL) { - struct SFreeListNode *pNode; - for (;;) { - pNode = TD_SLIST_HEAD(pFL); - if (pNode == NULL) break; - TD_SLIST_POP(pFL); - taosMemoryFree(pNode); - } -} - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_UTIL_FREELIST_H_*/ \ No newline at end of file diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 666e9e0ffac0bb3f92c592d5d237051b80e16268..100ec42899ea88dcecc3aa6365507a8a4d0c53da 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -518,8 +518,9 @@ STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __t if (pRequest->code != TSDB_CODE_SUCCESS) { const char* errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code); - printf("failed to connect to server, reason: %s\n\n", errorMsg); + fprintf(stderr,"failed to connect to server, reason: %s\n\n", errorMsg); + terrno = pRequest->code; destroyRequest(pRequest); taos_close(pTscObj); pTscObj = NULL; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b954edcbfa9c2bee8bb7528794d12182451a6812..52edd79f3bacefe10cd02e0b139d87a95fabc827 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4163,3 +4163,114 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) { taosMemoryFree(pRsp); } + +int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + + if (tEncodeCStr(pEncoder, pReq->tbName) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->action) < 0) return -1; + switch (pReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->type) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->flags) < 0) return -1; + if (tEncodeI32v(pEncoder, pReq->bytes) < 0) return -1; + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; + if (tEncodeI32v(pEncoder, pReq->colModBytes) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (tEncodeCStr(pEncoder, pReq->colName) < 0) return -1; + if (tEncodeCStr(pEncoder, pReq->colNewName) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: + if (tEncodeCStr(pEncoder, pReq->tagName) < 0) return -1; + if (tEncodeI8(pEncoder, pReq->isNull) < 0) return -1; + if (!pReq->isNull) { + if (tEncodeBinary(pEncoder, pReq->pTagVal, pReq->nTagVal) < 0) return -1; + } + break; + case TSDB_ALTER_TABLE_UPDATE_OPTIONS: + if (tEncodeI8(pEncoder, pReq->updateTTL) < 0) return -1; + if (pReq->updateTTL) { + if (tEncodeI32v(pEncoder, pReq->newTTL) < 0) return -1; + } + if (tEncodeI8(pEncoder, pReq->updateComment) < 0) return -1; + if (pReq->updateComment) { + if (tEncodeCStr(pEncoder, pReq->newComment) < 0) return -1; + } + break; + default: + break; + } + + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + + if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + switch (pReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->flags) < 0) return -1; + if (tDecodeI32v(pDecoder, &pReq->bytes) < 0) return -1; + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + if (tDecodeI32v(pDecoder, &pReq->colModBytes) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (tDecodeCStr(pDecoder, &pReq->colName) < 0) return -1; + if (tDecodeCStr(pDecoder, &pReq->colNewName) < 0) return -1; + break; + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: + if (tDecodeCStr(pDecoder, &pReq->tagName) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->isNull) < 0) return -1; + if (!pReq->isNull) { + if (tDecodeBinary(pDecoder, &pReq->pTagVal, &pReq->nTagVal) < 0) return -1; + } + break; + case TSDB_ALTER_TABLE_UPDATE_OPTIONS: + if (tDecodeI8(pDecoder, &pReq->updateTTL) < 0) return -1; + if (pReq->updateTTL) { + if (tDecodeI32v(pDecoder, &pReq->newTTL) < 0) return -1; + } + if (tDecodeI8(pDecoder, &pReq->updateComment) < 0) return -1; + if (pReq->updateComment) { + if (tDecodeCStr(pDecoder, &pReq->newComment) < 0) return -1; + } + break; + default: + break; + } + + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeSVAlterTbRsp(SEncoder *pEncoder, const SVAlterTbRsp *pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->code) < 0) return -1; + tEndEncode(pEncoder); + return 0; +} + +int32_t tDecodeSVAlterTbRsp(SDecoder *pDecoder, SVAlterTbRsp *pRsp) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->code) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d6e99c0899b24e6b312960ba2fa86f2b070e5b9c..d8309850b3c3f71b59960b644c2db758d20d4f68 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -192,7 +192,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rpcFreeCont(originalRpcMsg.pCont); // if leader, send response - if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { + //if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { + if (pMsg->rpcMsg.handle != NULL) { rsp.ahandle = pMsg->rpcMsg.ahandle; rsp.handle = pMsg->rpcMsg.handle; rsp.refId = pMsg->rpcMsg.refId; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e02d629c128b0ca2057ca1ca527dbdb55b04e533..5a834f4d333b84dddcc15768980820224156f68c 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -383,9 +383,10 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.suid = pStb->uid; req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.schema.nCols = pStb->numOfColumns; - req.schema.sver = 0; + req.schema.sver = pStb->version; req.schema.pSchema = pStb->pColumns; req.schemaTag.nCols = pStb->numOfTags; + req.schemaTag.nCols = 0; req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a2be393eb2b8d9149f863e1f58b5dcfa179f2d5b..4431a2c48bf81ff9f4d46e377774ce44be4b3bc3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -189,6 +189,7 @@ struct SMetaEntry { struct { int64_t ctime; int32_t ttlDays; + int32_t ncid; // next column id SSchemaWrapper schema; } ntbEntry; struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c9d1a0e06e040f1d5b58a6b719a98262ff911bf1..2018f0a68ca956937c2cf1ab30034c616dd38624 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -79,9 +79,11 @@ int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta); int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); +int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); +int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); int metaGetTableEntryByName(SMetaReader* pReader, const char* name); @@ -96,7 +98,7 @@ int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb -int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg *pKeepCfg); +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbClose(STsdb** pTsdb); int tsdbBegin(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb); @@ -181,7 +183,7 @@ typedef enum { TSDB_TYPE_RSMA_L2 = 4, // RSMA Level 2 } ETsdbType; -struct STsdbKeepCfg{ +struct STsdbKeepCfg { int8_t precision; // precision always be used with below keep cfgs int32_t days; int32_t keep0; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index dd36906b19862cd00c5b7d1de90b8aa4e2bd4b45..2bc0d7517db42084ef5aeb22785b189427ae8a8c 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -34,6 +34,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; + if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tEncodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; @@ -65,10 +66,11 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; + if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; - } else { + } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index d666bd22c1ead949a1a3e4f4e466051fb34d8530..e61064fe6701d9d6abc3665b2ae87d0cb2eda9f3 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -131,6 +131,75 @@ _err: return -1; } +int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { + SMetaEntry oStbEntry = {0}; + SMetaEntry nStbEntry = {0}; + TDBC *pUidIdxc = NULL; + TDBC *pTbDbc = NULL; + const void *pData; + int nData; + int64_t oversion; + SDecoder dc = {0}; + int32_t ret; + int32_t c; + + tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + ret = tdbDbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c); + if (ret < 0 || c) { + ASSERT(0); + return -1; + } + + ret = tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + if (ret < 0) { + ASSERT(0); + return -1; + } + + oversion = *(int64_t *)pData; + + tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + ret = tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(ret == 0 && c == 0); + + ret = tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData); + ASSERT(ret == 0); + + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &oStbEntry); + + nStbEntry.version = version; + nStbEntry.type = TSDB_SUPER_TABLE; + nStbEntry.uid = pReq->suid; + nStbEntry.name = pReq->name; + nStbEntry.stbEntry.schema = pReq->schema; + nStbEntry.stbEntry.schemaTag = pReq->schemaTag; + + metaWLock(pMeta); + // compare two entry + if (oStbEntry.stbEntry.schema.sver != pReq->schema.sver) { + if (oStbEntry.stbEntry.schema.nCols != pReq->schema.nCols) { + metaSaveToSkmDb(pMeta, &nStbEntry); + } + } + + // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) { + // // change tag schema + // } + + // update table.db + metaSaveToTbDb(pMeta, &nStbEntry); + + // update uid index + tdbDbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0); + + metaULock(pMeta); + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return 0; +} + int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { SMetaEntry me = {0}; SMetaReader mr = {0}; @@ -171,6 +240,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.schema = pReq->ntb.schema; + me.ntbEntry.ncid = me.ntbEntry.schema.pSchema[me.ntbEntry.schema.nCols - 1].colId + 1; } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -305,6 +375,170 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { return 0; } +static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + void *pVal = NULL; + int nVal = 0; + const void *pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SSchema *pColumn = NULL; + SMetaEntry entry = {0}; + SSchemaWrapper *pSchema; + int c; + + // search name index + ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + if (ret < 0) { + terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; + return -1; + } + + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; + + // search uid index + TDBC *pUidIdxc = NULL; + + tdbDbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbDbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); + ASSERT(c == 0); + + tdbDbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + oversion = *(int64_t *)pData; + + // search table.db + TDBC *pTbDbc = NULL; + + tdbDbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbDbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(c == 0); + tdbDbcGet(pTbDbc, NULL, NULL, &pData, &nData); + + // get table entry + SDecoder dc = {0}; + tDecoderInit(&dc, pData, nData); + metaDecodeEntry(&dc, &entry); + + if (entry.type != TSDB_NORMAL_TABLE) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + + // search the column to add/drop/update + pSchema = &entry.ntbEntry.schema; + int32_t iCol = 0; + for (;;) { + pColumn = NULL; + + if (iCol >= pSchema->nCols) break; + pColumn = &pSchema->pSchema[iCol]; + + if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break; + iCol++; + } + + entry.version = version; + int tlen; + switch (pAlterTbReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + if (pColumn) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + pSchema->sver++; + pSchema->nCols++; + pSchema->pSchema = + taosMemoryRealloc(entry.ntbEntry.schema.pSchema, sizeof(SSchema) * entry.ntbEntry.schema.nCols); + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].bytes = pAlterTbReq->bytes; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].type = pAlterTbReq->type; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].flags = pAlterTbReq->flags; + pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].colId = entry.ntbEntry.ncid++; + strcpy(pSchema->pSchema[entry.ntbEntry.schema.nCols - 1].name, pAlterTbReq->colName); + break; + case TSDB_ALTER_TABLE_DROP_COLUMN: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (pColumn->colId == 0) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pSchema->nCols--; + tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); + if (tlen) { + memmove(pColumn, pColumn + 1, tlen); + } + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes <= pAlterTbReq->bytes) { + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + goto _err; + } + pSchema->sver++; + pColumn->bytes = pAlterTbReq->bytes; + break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + if (pColumn == NULL) { + terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; + goto _err; + } + pSchema->sver++; + strcpy(pColumn->name, pAlterTbReq->colNewName); + break; + } + + entry.version = version; + + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return 0; + +_err: + tDecoderClear(&dc); + tdbDbcClose(pTbDbc); + tdbDbcClose(pUidIdxc); + return -1; +} + +static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + return 0; +} + +static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { + // TODO + ASSERT(0); + return 0; +} + +int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { + switch (pReq->action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: + case TSDB_ALTER_TABLE_DROP_COLUMN: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: + return metaAlterTableColumn(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: + return metaUpdateTableTagVal(pMeta, version, pReq); + case TSDB_ALTER_TABLE_UPDATE_OPTIONS: + return metaUpdateTableOptions(pMeta, version, pReq); + default: + terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; + return -1; + break; + } +} + static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { STbDbKey tbDbKey; void *pKey = NULL; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 996d789e248778906c1b1bc046c7d1c6a58cabfe..8fbd1e24e1307ac36dab8c7c555ff8f217cfbbc4 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -91,7 +91,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p // TODO set to real sversion *pUid = 0; - int32_t sversion = 0; + int32_t sversion = 1; if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) { pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index d180799e583e9a69a44210b1fc6f9e4322ed6cee..76d5c3cb3a9e869a62ae0df3222d1e66ff3ac646 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -465,7 +465,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { pTbData = (STbData *)pNode->pData; pCommitIter = pCommith->iters + i; - pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); // TODO: schema version + pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 1); // TODO: schema version if (pTSchema) { pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); @@ -912,7 +912,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { while (bidx < nBlocks) { if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) { // Set commit table - pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version + pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 1); // TODO: schema version if (!pTSchema) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 927babc26c4195c268f45a4d44e6862f5652efad..55fe8a3945260caf44ec0f5aab55fd213989d3cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -490,7 +490,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableG STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0); - pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 0); + pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, 1); int32_t numOfCols = taosArrayGetSize(pTsdbReadHandle->suppInfo.defaultLoadColumn); int16_t* ids = pTsdbReadHandle->suppInfo.defaultLoadColumn->pData; @@ -1271,7 +1271,6 @@ _error: static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end); -static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols); static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SDataBlockInfo* pBlockInfo, int32_t endPos); @@ -1301,7 +1300,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { // do not load file block into buffer - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; + int32_t step = ascScan ? 1 : -1; TSKEY maxKey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step); @@ -1618,7 +1617,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa if (pSchema1 == NULL) { // pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1)); // TODO: use the real schemaVersion - pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); + pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1); } #ifdef TD_DEBUG_PRINT_ROW @@ -1637,7 +1636,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa if (pSchema2 == NULL) { // pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2)); // TODO: use the real schemaVersion - pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 0); + pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, 1); } if (isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); @@ -1790,22 +1789,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa #endif } -static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) { - if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - return; - } - - // if the buffer is not full in case of descending order query, move the data in the front of the buffer - if (numOfRows < pTsdbReadHandle->outputCapacity) { - int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i); - memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes, - numOfRows * pColInfo->info.bytes); - } - } -} - static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, int32_t numOfExisted, int32_t* start, int32_t* end) { *start = -1; @@ -1891,9 +1874,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa cur->lastKey = tsArray[endPos] + step; cur->blockCompleted = true; - // if the buffer is not full in case of descending order query, move the data in the front of the buffer - moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols); - // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. pos = endPos + step; updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); @@ -1944,18 +1924,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows - 1] == pBlock->keyLast); + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + int32_t step = ascScan ? 1 : -1; + // for search the endPos, so the order needs to reverse - int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); - - STable* pTable = NULL; int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); + STimeWindow* pWin = &blockInfo.window; tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 - " rows:%d, start:%d, end:%d, %s", - pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, + " rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr); // compared with the data from in-memory buffer, to generate the correct timestamp array list @@ -1986,20 +1966,16 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } TSKEY key = TD_ROW_KEY(row1); - if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) { break; } - if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && - ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && - !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) || + ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) { break; } - if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) { if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); rv1 = TD_ROW_SVER(row1); @@ -2054,23 +2030,19 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } #endif if (TD_SUPPORT_UPDATE(pCfg->update)) { - if (lastKeyAppend != key) { - lastKeyAppend = key; - ++curRow; - } numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); + lastKeyAppend = key; if (rv1 != TD_ROW_SVER(row1)) { - // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); rv1 = TD_ROW_SVER(row1); } if (row2 && rv2 != TD_ROW_SVER(row2)) { - // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } - numOfRows += - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); + + // still assign data into current row + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -2081,12 +2053,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf cur->mixBlock = true; moveToNextRowInMem(pCheckInfo); + ++curRow; + pos += step; } else { moveToNextRowInMem(pCheckInfo); } - } else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - (key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) { if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; } @@ -2112,17 +2085,17 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - if ((lastKeyAppend != TSKEY_INITIAL_VAL) && - (lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) { + if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) { ++curRow; } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); pos += (qend - qstart + 1) * step; if (numOfRows > 0) { curRow = numOfRows - 1; } - cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; + cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart]; cur->lastKey = cur->win.ekey + step; lastKeyAppend = cur->win.ekey; } @@ -2134,10 +2107,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && - ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && - !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { + ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) || + ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = tsArray[pos]; @@ -2149,22 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end); pos += (end - start + 1) * step; - cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start]; + cur->win.ekey = ascScan ? tsArray[end] : tsArray[start]; cur->lastKey = cur->win.ekey + step; cur->mixBlock = true; } } } - cur->blockCompleted = - (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))); + cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) || + ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan)); - if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { + if (!ascScan) { TSWAP(cur->win.skey, cur->win.ekey); } - moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols); updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); @@ -2755,7 +2724,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int win->ekey = key; if (rv != TD_ROW_SVER(row)) { - pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); + pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 1); rv = TD_ROW_SVER(row); } numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, @@ -3877,7 +3846,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch // NOTE: not add ref count for super table SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 0, true); + SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true); // no tags and tbname condition, all child tables of this stable are involved if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index e878668654c022fdee5c3572011172478115bd13..158951311000e1f536655ca10a6ba8fdf2ece472 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -2084,7 +2084,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { // TODO: use the proper schema instead of 0, and cache STSchema in cache - STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0); + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f638ac056a2c4fc2954aa78596f70d45c9e51c53..ac49a0b0786fe9e5ffc37d501df8b1de5e528bea 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,10 +16,10 @@ #include "vnd.h" static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); @@ -73,7 +73,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_STB: - if (vnodeProcessAlterStbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_STB: if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -82,7 +82,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_ALTER_TABLE: - if (vnodeProcessAlterTbReq(pVnode, pReq, len, pRsp) < 0) goto _err; + if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; break; case TDMT_VND_DROP_TABLE: if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err; @@ -397,20 +397,32 @@ _exit: return rcode; } -static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // ASSERT(0); -#if 0 - SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); - tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); - // TODO: to encapsule a free API - taosMemoryFree(vAlterTbReq.stbCfg.pSchema); - taosMemoryFree(vAlterTbReq.stbCfg.pTagSchema); - if (vAlterTbReq.stbCfg.pRSmaParam) { - taosMemoryFree(vAlterTbReq.stbCfg.pRSmaParam); +static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVCreateStbReq req = {0}; + SDecoder dc = {0}; + + pRsp->msgType = TDMT_VND_ALTER_STB_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + + tDecoderInit(&dc, pReq, len); + + // decode req + if (tDecodeSVCreateStbReq(&dc, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; } - taosMemoryFree(vAlterTbReq.name); -#endif + + if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) { + pRsp->code = terrno; + tDecoderClear(&dc); + return -1; + } + + tDecoderClear(&dc); + return 0; } @@ -443,9 +455,32 @@ _exit: return 0; } -static int vnodeProcessAlterTbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { - // TODO - ASSERT(0); +static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + SVAlterTbReq vAlterTbReq = {0}; + SDecoder dc = {0}; + + pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + pRsp->code = TSDB_CODE_SUCCESS; + + tDecoderInit(&dc, pReq, len); + + // decode + if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) { + pRsp->code = TSDB_CODE_INVALID_MSG; + tDecoderClear(&dc); + return -1; + } + + // process + if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq) < 0) { + pRsp->code = terrno; + tDecoderClear(&dc); + return -1; + } + + tDecoderClear(&dc); return 0; } @@ -513,7 +548,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub if (pSchema) { taosMemoryFreeClear(pSchema); } - pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 0); // TODO: use the real schema + pSchema = metaGetTbTSchema(pMeta, msgIter->suid, 1); // TODO: use the real schema if (pSchema) { suid = msgIter->suid; } @@ -656,7 +691,7 @@ _exit: static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; - SDecoder coder; + SDecoder coder; pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -670,7 +705,7 @@ static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = terrno; goto _err; } - + if (metaCreateTSma(pVnode->pMeta, version, &req) < 0) { pRsp->code = terrno; goto _err; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d08345a02f3f59d9626ee447d7905719a80b15d0..0ab172f03a3f0cfd07eedcd37e5336e43701057d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -1601,9 +1600,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { } } -static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); -static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes); - static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { SqlFunctionCtx* pCtx = pTableScanInfo->pCtx; uint32_t status = BLK_DATA_NOT_LOAD; @@ -1771,100 +1767,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc return TSDB_CODE_SUCCESS; } -/* - * set tag value in SqlFunctionCtx - * e.g.,tag information into input buffer - */ -static void doSetTagValueInParam(void* pTable, int32_t tagColId, SVariant* tag, int16_t type, int16_t bytes) { - taosVariantDestroy(tag); - - char* val = NULL; - // if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { - // val = tsdbGetTableName(pTable); - // assert(val != NULL); - // } else { - // val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); - // } - - if (val == NULL || isNull(val, type)) { - tag->nType = TSDB_DATA_TYPE_NULL; - return; - } - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - int32_t maxLen = bytes - VARSTR_HEADER_SIZE; - int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val); - taosVariantCreateFromBinary(tag, varDataVal(val), len, type); - // taosVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type); - } else { - taosVariantCreateFromBinary(tag, val, bytes, type); - } -} - -static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) { - assert(pTagColList != NULL && numOfTags > 0); - - for (int32_t i = 0; i < numOfTags; ++i) { - if (pTagColList[i].colId == colId) { - return &pTagColList[i]; - } - } - - return NULL; -} - -void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCtx, int32_t numOfOutput) { - SExprInfo* pExpr = pOperatorInfo->pExpr; - SExprInfo* pExprInfo = &pExpr[0]; - int32_t functionId = getExprFunctionId(pExprInfo); -#if 0 - if (pQueryAttr->numOfOutput == 1 && functionId == FUNCTION_TS_COMP && pQueryAttr->stableQuery) { - assert(pExprInfo->base.numOfParams == 1); - - // int16_t tagColId = (int16_t)pExprInfo->base.param[0].i; - int16_t tagColId = -1; - SColumnInfo* pColInfo = doGetTagColumnInfoById(pQueryAttr->tagColList, pQueryAttr->numOfTags, tagColId); - - doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); - - } else { - // set tag value, by which the results are aggregated. - int32_t offset = 0; - memset(pRuntimeEnv->tagVal, 0, pQueryAttr->tagLen); - - for (int32_t idx = 0; idx < numOfOutput; ++idx) { - SExprInfo* pLocalExprInfo = &pExpr[idx]; - - // ts_comp column required the tag value for join filter - if (!TSDB_COL_IS_TAG(pLocalExprInfo->base.pParam[0].pCol->flag)) { - continue; - } - - // todo use tag column index to optimize performance - doSetTagValueInParam(pTable, pLocalExprInfo->base.pParam[0].pCol->colId, &pCtx[idx].tag, - pLocalExprInfo->base.resSchema.type, pLocalExprInfo->base.resSchema.bytes); - - if (IS_NUMERIC_TYPE(pLocalExprInfo->base.resSchema.type) || - pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_BOOL || - pLocalExprInfo->base.resSchema.type == TSDB_DATA_TYPE_TIMESTAMP) { - memcpy(pRuntimeEnv->tagVal + offset, &pCtx[idx].tag.i, pLocalExprInfo->base.resSchema.bytes); - } else { - if (pCtx[idx].tag.pz != NULL) { - memcpy(pRuntimeEnv->tagVal + offset, pCtx[idx].tag.pz, pCtx[idx].tag.nLen); - } - } - - offset += pLocalExprInfo->base.resSchema.bytes; - } - } - - // set the tsBuf start position before check each data block - if (pRuntimeEnv->pTsBuf != NULL) { - setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable); - } -#endif -} - void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) { pBlock->info.rows = 0; @@ -4038,12 +3940,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pProjectInfo->existDataBlock) { // TODO refactor SSDataBlock* pBlock = pProjectInfo->existDataBlock; pProjectInfo->existDataBlock = NULL; - *newgroup = true; - - // todo dynamic set tags - // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); - // } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); @@ -4084,13 +3980,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } } - // todo set tags - - // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); - // } - // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); @@ -4430,10 +4319,6 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(pInfo, numOfOutput); } -void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { - SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; -} - void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); @@ -4778,7 +4663,6 @@ static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); static SArray* extractPartitionColInfo(SNodeList* pNodeList); -static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { @@ -5447,150 +5331,3 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } - -static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { - SJoinOperatorInfo* pJoinInfo = pOperator->info; - - SSDataBlock* pRes = pJoinInfo->pRes; - blockDataCleanup(pRes); - blockDataEnsureCapacity(pRes, 4096); - - int32_t nrows = 0; - - while (1) { - if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - SOperatorInfo* ds1 = pOperator->pDownstream[0]; - publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); - publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); - - pJoinInfo->leftPos = 0; - if (pJoinInfo->pLeft == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - break; - } - } - - if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - SOperatorInfo* ds2 = pOperator->pDownstream[1]; - publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); - publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); - - pJoinInfo->rightPos = 0; - if (pJoinInfo->pRight == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - break; - } - } - - SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); - char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); - - SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); - char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); - - // only the timestamp match support for ordinary table - ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); - - SExprInfo* pExprInfo = &pOperator->pExpr[i]; - - int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; - int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; - int32_t rowIndex = -1; - - SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { - pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); - rowIndex = pJoinInfo->leftPos; - } else { - pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); - rowIndex = pJoinInfo->rightPos; - } - - if (colDataIsNull_s(pSrc, rowIndex)) { - colDataAppendNULL(pDst, nrows); - } else { - char* p = colDataGetData(pSrc, rowIndex); - colDataAppend(pDst, nrows, p, false); - } - } - - pJoinInfo->leftPos += 1; - pJoinInfo->rightPos += 1; - - nrows += 1; - } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) { - pJoinInfo->leftPos += 1; - - if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - continue; - } - } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) { - pJoinInfo->rightPos += 1; - if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - continue; - } - } - - // the pDataBlock are always the same one, no need to call this again - pRes->info.rows = nrows; - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; - } - } - - return (pRes->info.rows > 0) ? pRes : NULL; -} - -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, - SExecTaskInfo* pTaskInfo) { - SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pOperator == NULL || pInfo == NULL) { - goto _error; - } - - initResultSizeInfo(pOperator, 4096); - - pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExprInfo; - pOperator->numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - - SOperatorNode* pNode = (SOperatorNode*)pOnCondition; - setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); - setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; -} - -void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { - pColumn->slotId = pColumnNode->slotId; - pColumn->type = pColumnNode->node.resType.type; - pColumn->bytes = pColumnNode->node.resType.bytes; - pColumn->precision = pColumnNode->node.resType.precision; - pColumn->scale = pColumnNode->node.resType.scale; -} diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c new file mode 100644 index 0000000000000000000000000000000000000000..d7d6d963463bb400f940119d2192b63ddb7de16a --- /dev/null +++ b/source/libs/executor/src/joinoperator.c @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "function.h" +#include "os.h" +#include "querynodes.h" +#include "tdatablock.h" +#include "tmsg.h" +#include "executorimpl.h" +#include "tcompare.h" +#include "thash.h" +#include "ttypes.h" + +static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); +static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); +static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); +static void extractTimeCondition(SJoinOperatorInfo *Info, SLogicConditionNode* pLogicConditionNode); + +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, + int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, + SExecTaskInfo* pTaskInfo) { + SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } + + initResultSizeInfo(pOperator, 4096); + + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_JOIN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + + if (nodeType(pOnCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pOnCondition; + setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); + setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); + } else if (nodeType(pOnCondition) == QUERY_NODE_LOGIC_CONDITION) { + extractTimeCondition(pInfo, (SLogicConditionNode*) pOnCondition); + } + + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); + int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} + +void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { + pColumn->slotId = pColumnNode->slotId; + pColumn->type = pColumnNode->node.resType.type; + pColumn->bytes = pColumnNode->node.resType.bytes; + pColumn->precision = pColumnNode->node.resType.precision; + pColumn->scale = pColumnNode->node.resType.scale; +} + +void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { + SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; +} + +SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + + SSDataBlock* pRes = pJoinInfo->pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, 4096); + + int32_t nrows = 0; + + while (1) { + // todo extract method + if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + SOperatorInfo* ds1 = pOperator->pDownstream[0]; + publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); + publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->leftPos = 0; + if (pJoinInfo->pLeft == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + SOperatorInfo* ds2 = pOperator->pDownstream[1]; + publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); + publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); + + pJoinInfo->rightPos = 0; + if (pJoinInfo->pRight == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + break; + } + } + + SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId); + char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos); + + SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId); + char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); + + // only the timestamp match support for ordinary table + ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); + if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->pExpr[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + int32_t rowIndex = -1; + + SColumnInfoData* pSrc = NULL; + if (pJoinInfo->pLeft->info.blockId == blockId) { + pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + rowIndex = pJoinInfo->leftPos; + } else { + pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + rowIndex = pJoinInfo->rightPos; + } + + if (colDataIsNull_s(pSrc, rowIndex)) { + colDataAppendNULL(pDst, nrows); + } else { + char* p = colDataGetData(pSrc, rowIndex); + colDataAppend(pDst, nrows, p, false); + } + } + + pJoinInfo->leftPos += 1; + pJoinInfo->rightPos += 1; + + nrows += 1; + } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) { + pJoinInfo->leftPos += 1; + + if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { + continue; + } + } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) { + pJoinInfo->rightPos += 1; + if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { + continue; + } + } + + // the pDataBlock are always the same one, no need to call this again + pRes->info.rows = nrows; + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + return (pRes->info.rows > 0) ? pRes : NULL; +} + +static void extractTimeCondition(SJoinOperatorInfo *pInfo, SLogicConditionNode* pLogicConditionNode) { + int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList); + + for(int32_t i = 0; i < len; ++i) { + SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i); + if (nodeType(pNode) == QUERY_NODE_OPERATOR) { + SOperatorNode* pn1 = (SOperatorNode*)pNode; + setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft); + setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight); + break; + } + } +} diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 10dc482462f84a4dcdd7c53cb4bd164baa74f6af..79e675e2df33047dbcec027f97ad930a3a7b4ad2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -773,7 +773,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; @@ -1062,8 +1061,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. - - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); if (pInfo->invertible) { @@ -1377,7 +1374,6 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57dfcaedddcff22135d575dfa728d85cfe6a9852..763ccbf7a02a9fd3649c3e0466304aa4ccc46db0 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -584,6 +584,37 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId"; +static const char* jkExchangeLogicPlanSrcPrecision = "Precision"; + +static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) { + const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcPrecision, pNode->precision); + } + + return code; +} + +static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) { + SExchangeLogicNode* pNode = (SExchangeLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUTinyIntValue(pJson, jkExchangeLogicPlanSrcPrecision, &pNode->precision); + } + + return code; +} + static const char* jkFillLogicPlanMode = "Mode"; static const char* jkFillLogicPlanWStartTs = "WStartTs"; static const char* jkFillLogicPlanValues = "Values"; @@ -2987,6 +3018,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicProjectNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF: break; + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + return logicExchangeNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_FILL: return logicFillNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_SORT: @@ -3083,6 +3116,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicScanNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_PROJECT: return jsonToLogicProjectNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + return jsonToLogicExchangeNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_FILL: return jsonToLogicFillNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_SORT: diff --git a/source/libs/parser/src/parAuthenticator.c b/source/libs/parser/src/parAuthenticator.c index ad1dca68579111b9494a41255537f604dd76f5d8..250e7910d69847a130fa4f0b2132b3dcb99da8e7 100644 --- a/source/libs/parser/src/parAuthenticator.c +++ b/source/libs/parser/src/parAuthenticator.c @@ -14,6 +14,7 @@ */ #include "catalog.h" +#include "cmdnodes.h" #include "parInt.h" typedef struct SAuthCxt { @@ -65,8 +66,8 @@ static int32_t authSetOperator(SAuthCxt* pCxt, SSetOperator* pSetOper) { return code; } -static int32_t authDropUser(SAuthCxt* pCxt, SDropUserReq* pStmt) { - if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->user, TSDB_DEFAULT_USER)) { +static int32_t authDropUser(SAuthCxt* pCxt, SDropUserStmt* pStmt) { + if (!pCxt->pParseCxt->isSuperUser || 0 == strcmp(pStmt->useName, TSDB_DEFAULT_USER)) { return TSDB_CODE_PAR_PERMISSION_DENIED; } return TSDB_CODE_SUCCESS; @@ -92,7 +93,7 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) { case QUERY_NODE_ALTER_USER_STMT: break; case QUERY_NODE_DROP_USER_STMT: { - return authDropUser(pCxt, (SDropUserReq*)pStmt); + return authDropUser(pCxt, (SDropUserStmt*)pStmt); } case QUERY_NODE_USE_DATABASE_STMT: case QUERY_NODE_CREATE_DNODE_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1639abceba53198f8ec60434f116c9f6c648d2ae..9773b69ee75992b2405cb17d09ce34f727d4b7ce 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3725,7 +3725,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* req.type = TD_NORMAL_TABLE; req.name = strdup(pStmt->tableName); req.ntb.schema.nCols = LIST_LENGTH(pStmt->pCols); - req.ntb.schema.sver = 0; + req.ntb.schema.sver = 1; req.ntb.schema.pSchema = taosMemoryCalloc(req.ntb.schema.nCols, sizeof(SSchema)); if (NULL == req.name || NULL == req.ntb.schema.pSchema) { destroyCreateTbReq(&req); diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 821f480b2003e7752599812f22fb664eb7f33493..47424d313840a78427cf908f9f929cc7994be383 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -235,11 +235,17 @@ TEST_F(ParserSelectTest, semanticError) { TEST_F(ParserSelectTest, setOperator) { useDb("root", "test"); - // run("SELECT * FROM t1 UNION ALL SELECT * FROM t1"); + run("SELECT * FROM t1 UNION ALL SELECT * FROM t1"); - // run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)"); + run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)"); run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); } +TEST_F(ParserSelectTest, informationSchema) { + useDb("root", "test"); + + run("SELECT * FROM information_schema.user_databases WHERE name = 'information_schema'"); +} + } // namespace ParserTest diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index edac2a879fa9c8d6ec10cf532872ef70c82dfba8..e38c180ac6cd852dc46d6a3e2e954e76831e2480 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -392,7 +392,8 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, } static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { - if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { + if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) || + TSDB_SYSTEM_TABLE == pScan->pMeta->tableType) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 1266e8ae4ba1062d74cf7a113c445046070298b1..54bc24e8bb1233ab5adb9bdafad14568d83bdd7c 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -303,7 +303,7 @@ static SLogicNode* unMatchByNode(SLogicNode* pNode) { } SNode* pChild; FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild); + SLogicNode* pSplitNode = unMatchByNode((SLogicNode*)pChild); if (NULL != pSplitNode) { return pSplitNode; } @@ -318,7 +318,7 @@ static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan } pExchange->srcGroupId = pCxt->groupId; // pExchange->precision = pScan->pMeta->tableInfo.precision; - pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets); + pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys); if (NULL == pExchange->node.pTargets) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index b70cb4d19ae99de436914d95e4192cf297b2435d..67c09d706e34ea44ab0c4070d9bbb665a15dded1 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -47,4 +47,10 @@ TEST_F(PlanOtherTest, explain) { run("explain analyze SELECT * FROM t1"); run("explain analyze verbose true ratio 0.01 SELECT * FROM t1"); -} \ No newline at end of file +} + +TEST_F(PlanOtherTest, show) { + useDb("root", "test"); + + run("SHOW DATABASES"); +} diff --git a/source/libs/planner/test/planStateTest.cpp b/source/libs/planner/test/planStateTest.cpp index 83c9621916a3a080d578d29cf1a1abd6f0dc94d9..9ff035e1480c5ccbf17c7ab889976c4a739d951f 100644 --- a/source/libs/planner/test/planStateTest.cpp +++ b/source/libs/planner/test/planStateTest.cpp @@ -23,13 +23,13 @@ class PlanStateTest : public PlannerTestBase {}; TEST_F(PlanStateTest, basic) { useDb("root", "test"); - run("select count(*) from t1 state_window(c1)"); + run("SELECT COUNT(*) FROM t1 STATE_WINDOW(c1)"); } TEST_F(PlanStateTest, stateExpr) { useDb("root", "test"); - run("select count(*) from t1 state_window(c1 + 10)"); + run("SELECT COUNT(*) FROM t1 STATE_WINDOW(c1 + 10)"); } TEST_F(PlanStateTest, selectFunc) { diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index 11e5e980526c2129d2bd5a2676a47edcb619e32e..6a7cb91bb931fc5d0d22e7cefd5c2156fcf4c74a 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -25,11 +25,9 @@ TEST_F(PlanSubqeuryTest, basic) { if (0 == g_skipSql) { run("SELECT * FROM (SELECT * FROM t1)"); - - run("SELECT LAST(c1) FROM (SELECT * FROM t1)"); } - run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); + run("SELECT LAST(c1) FROM (SELECT * FROM t1)"); } TEST_F(PlanSubqeuryTest, doubleGroupBy) { @@ -39,3 +37,11 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) { "SELECT c1 + c3 a, c1 + COUNT(*) b FROM t1 WHERE c2 = 'abc' GROUP BY c1, c3) " "WHERE a > 100 GROUP BY b"); } + +TEST_F(PlanSubqeuryTest, withSetOperator) { + useDb("root", "test"); + + run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); + + run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)"); +} diff --git a/source/libs/planner/test/planSysTbTest.cpp b/source/libs/planner/test/planSysTbTest.cpp index fff6bfcca437887b28e70d59327a6873de13730d..e5c30030b38a9cf2444a640b9aedf940e4dd5337 100644 --- a/source/libs/planner/test/planSysTbTest.cpp +++ b/source/libs/planner/test/planSysTbTest.cpp @@ -27,8 +27,8 @@ TEST_F(PlanSysTableTest, show) { run("show stables"); } -TEST_F(PlanSysTableTest, information) { +TEST_F(PlanSysTableTest, informationSchema) { useDb("root", "information_schema"); - run("show tables"); + run("SELECT * FROM information_schema.user_databases WHERE name = 'information_schema'"); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 717958c033c491304ed9c7ceba909a49b1f03e3f..6a25b23c6b1d2669b599d134b041eb00071e57ee 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -812,13 +812,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, - tstrerror(ctx->rspCode)); + QW_TASK_ELOG("task already failed at phase %s, code:%s", qwPhaseStr(phase), tstrerror(ctx->rspCode)); QW_ERR_JRET(ctx->rspCode); } _return: - if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -836,7 +834,11 @@ _return: QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); } - QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { + QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); + } else { + QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); + } QW_RET(code); } diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 63f24b104fe47615750e6363207d93d19e9400b7..0a6e2428fee2b073aa35c035ca3e4cb312c3aa25 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr { } SSyncIndexMgr; SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode); +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode); void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8a21eea7b7c9ce2254ce0962c8a0b67489f06e8c..9b655fb0fae7f699aeccca4bc4047f9ce8008e56 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -247,6 +247,7 @@ typedef struct SSyncNode { // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeStart(SSyncNode* pSyncNode); +void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); // ping -------------- @@ -271,7 +272,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig); +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 159af1610e72a7b96f772e11e1589ce9b3e4498f..1b08d3f7a150822cd22758276b0b8fc667829f69 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -62,7 +62,6 @@ bool syncUtilUserPreCommit(tmsg_t msgType); bool syncUtilUserCommit(tmsg_t msgType); bool syncUtilUserRollback(tmsg_t msgType); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index aed19d042e54195ab98d5bd86a38bd8f5fa2be03..1a5d418e7545122b48b15e1b83c4a2bef3d9a860 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,11 +15,11 @@ #include "syncAppendEntries.h" #include "syncInt.h" +#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "syncRaftCfg.h" // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == @@ -200,7 +200,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - //if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { + // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { if (syncUtilUserRollback(pRollBackEntry->msgType)) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -229,7 +229,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -261,7 +261,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pAppendEntry->index; @@ -324,7 +324,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - //if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -338,10 +338,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // config change if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); ASSERT(ret == 0); syncNodeUpdateConfig(ths, &newSyncCfg); + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(ths); + } else { + syncNodeBecomeFollower(ths); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 620f0e9cd20d8f4738d9bf3cfdf7a985f7493b65..0f17cf267e8e8a54d6020f12b47bafb594c92434 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -16,10 +16,10 @@ #include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" -#include "syncRaftCfg.h" // \* Leader i advances its commitIndex. // \* This is done as a separate step from handling AppendEntries responses, @@ -102,7 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - //if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -114,12 +114,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } // config change - if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { - SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); - ASSERT(ret == 0); - - syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) { + SSyncCfg newSyncCfg; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); + ASSERT(ret == 0); + + syncNodeUpdateConfig(pSyncNode, &newSyncCfg); + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode); + } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index a3d1717c3bcabb22f910a821755c046fd7dcd6b0..8b7411c563baa6ae1621a08c7d56a55b0680dd4e 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,6 +15,7 @@ #include "syncIO.h" #include +#include "os.h" #include "syncMessage.h" #include "syncUtil.h" #include "tglobal.h" @@ -198,6 +199,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) { { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); + snprintf(rpcInit.localFqdn, sizeof(rpcInit.localFqdn), "%s", "127.0.0.1"); rpcInit.localPort = io->myAddr.eps[0].port; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index d33075054a888a1b2903fe230f6503e6c19aa580..5809cedb9038758744d20b8e6ee2270bd0720e47 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { return pSyncIndexMgr; } +void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) { + pSyncIndexMgr->replicas = &(pSyncNode->replicasId); + pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; + pSyncIndexMgr->pSyncNode = pSyncNode; + syncIndexMgrClear(pSyncIndexMgr); +} + void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { if (pSyncIndexMgr != NULL) { taosMemoryFree(pSyncIndexMgr); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index da23d8415bdc6f482f3db915487d8029954e96e2..469682a7b5e72fc180d8a2417e02a11e8e0bf381 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -103,6 +103,16 @@ void syncStart(int64_t rid) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); } +void syncStartStandBy(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return; + } + syncNodeStartStandBy(pSyncNode); + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); +} + void syncStop(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -116,7 +126,7 @@ void syncStop(int64_t rid) { int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t ret = 0; - char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); + char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; rpcMsg.noResp = 1; @@ -188,10 +198,9 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { (pEpSet->numOfEps)++; sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); - } pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex; - + sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -524,6 +533,17 @@ void syncNodeStart(SSyncNode* pSyncNode) { assert(ret == 0); } +void syncNodeStartStandBy(SSyncNode* pSyncNode) { + // state change + pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + syncNodeStopHeartbeatTimer(pSyncNode); + + // reset elect timer, long enough + int32_t electMS = TIMER_MAX_MS; + int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); + ASSERT(ret == 0); +} + void syncNodeClose(SSyncNode* pSyncNode) { int32_t ret; assert(pSyncNode != NULL); @@ -858,7 +878,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { return s; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { +void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) { pSyncNode->pRaftCfg->cfg = *newConfig; int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg); ASSERT(ret == 0); @@ -885,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg *newConfig) { for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); } + + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); + syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + + syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); } SSyncNode* syncNodeAcquire(int64_t rid) { @@ -1245,7 +1270,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -1267,7 +1292,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - //if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { + // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 8aeb9c4856d70294fbe27a908781419ee294e6dc..07a9397a580332f427ab3b206359de3ec0accf40 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -58,14 +58,15 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { syncMeta.term = pEntry->term; code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walWriteWithSyncInfo error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } - //assert(code == 0); + } + // assert(code == 0); walFsync(pWal, true); return code; @@ -77,16 +78,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - int32_t code = walReadWithHandle(pWalHandle, index); + int32_t code = walReadWithHandle(pWalHandle, index); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walReadWithHandle error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } - //assert(walReadWithHandle(pWalHandle, index) == 0); + } + // assert(walReadWithHandle(pWalHandle, index) == 0); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); assert(pEntry != NULL); @@ -112,16 +114,17 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - //assert(walRollback(pWal, fromIndex) == 0); + // assert(walRollback(pWal, fromIndex) == 0); int32_t code = walRollback(pWal, fromIndex); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); - sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); + sError("walRollback error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, + linuxErrMsg); ASSERT(0); - } + } return 0; // to avoid compiler error } @@ -145,16 +148,16 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - //assert(walCommit(pWal, index) == 0); + // assert(walCommit(pWal, index) == 0); int32_t code = walCommit(pWal, index); if (code != 0) { - int32_t err = terrno; - const char *errStr = tstrerror(err); - int32_t linuxErr = errno; - const char *linuxErrMsg = strerror(errno); + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t linuxErr = errno; + const char* linuxErrMsg = strerror(errno); sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); ASSERT(0); - } + } return 0; // to avoid compiler error } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 8afe9ff2a724d031531bbf5fd5c867673a00c46b..cfbdf0e96101535e22dc6dc3609bf0c584719d71 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -37,6 +37,7 @@ add_executable(syncRaftCfgTest "") add_executable(syncRespMgrTest "") add_executable(syncSnapshotTest "") add_executable(syncApplyMsgTest "") +add_executable(syncConfigChangeTest "") target_sources(syncTest @@ -195,6 +196,10 @@ target_sources(syncApplyMsgTest PRIVATE "syncApplyMsgTest.cpp" ) +target_sources(syncConfigChangeTest + PRIVATE + "syncConfigChangeTest.cpp" +) target_include_directories(syncTest @@ -392,6 +397,11 @@ target_include_directories(syncApplyMsgTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncConfigChangeTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -550,6 +560,10 @@ target_link_libraries(syncApplyMsgTest sync gtest_main ) +target_link_libraries(syncConfigChangeTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9a2d9a6b3461f46c6af744b2fa503ca9ce46a6b8 --- /dev/null +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -0,0 +1,259 @@ +#include +#include +#include "os.h" +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncUtil.h" +#include "wal.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410}; +const char* gDir = "./syncReplicateTest"; +int32_t gVgId = 1234; +SyncIndex gSnapshotLastApplyIndex; + +void init() { + int code = walInit(); + assert(code == 0); + + code = syncInit(); + assert(code == 0); + + sprintf(tsTempDir, "%s", "."); +} + +void cleanup() { walCleanUp(); } + +void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + SyncIndex beginIndex = SYNC_INDEX_INVALID; + if (pFsm->FpGetSnapshot != NULL) { + SSnapshot snapshot; + pFsm->FpGetSnapshot(pFsm, &snapshot); + beginIndex = snapshot.lastApplyIndex; + } + + if (cbMeta.index > beginIndex) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); + } else { + sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); + } +} + +void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, + cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + +void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); +} + +int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex; + pSnapshot->lastApplyTerm = 100; + return 0; +} + +SSyncFSM* createFsm() { + SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; + pFsm->FpGetSnapshot = GetSnapshotCb; + return pFsm; +} + +SWal* createWal(char* path, int32_t vgId) { + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + SWal* pWal = walOpen(path, &walCfg); + assert(pWal != NULL); + return pWal; +} + +int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) { + SSyncInfo syncInfo; + syncInfo.vgId = vgId; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = createFsm(); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + + if (isStandBy) { + pCfg->myIndex = 0; + pCfg->replicaNum = 1; + pCfg->nodeInfo[0].nodePort = gPorts[myIndex]; + taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + } else { + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = gPorts[i]; + taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn); + // snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + } + } + + int64_t rid = syncOpen(&syncInfo); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + syncNodeRelease(pSyncNode); + + return rid; +} + +void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) { + SSyncCfg syncCfg; + + syncCfg.myIndex = myIndex; + syncCfg.replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + syncCfg.nodeInfo[i].nodePort = gPorts[i]; + taosGetFqdn(syncCfg.nodeInfo[i].nodeFqdn); + } + + syncReconfig(rid, &syncCfg); +} + +void usage(char* exe) { + printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe); +} + +SRpcMsg* createRpcMsg(int i, int count, int myIndex) { + SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 256; + pMsg->pCont = rpcMallocCont(pMsg->contLen); + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs()); + return pMsg; +} + +int main(int argc, char** argv) { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + if (argc != 7) { + usage(argv[0]); + exit(-1); + } + + int32_t replicaNum = atoi(argv[1]); + int32_t myIndex = atoi(argv[2]); + int32_t lastApplyIndex = atoi(argv[3]); + int32_t writeRecordNum = atoi(argv[4]); + bool isStandBy = atoi(argv[5]); + bool isConfigChange = atoi(argv[6]); + gSnapshotLastApplyIndex = lastApplyIndex; + + if (!isStandBy) { + assert(replicaNum >= 1 && replicaNum <= 5); + assert(myIndex >= 0 && myIndex < replicaNum); + assert(lastApplyIndex >= -1); + assert(writeRecordNum >= 0); + } + + init(); + int32_t ret = syncIOStart((char*)"127.0.0.1", gPorts[myIndex]); + assert(ret == 0); + + char walPath[128]; + snprintf(walPath, sizeof(walPath), "%s_wal_replica%d_index%d", gDir, replicaNum, myIndex); + SWal* pWal = createWal(walPath, gVgId); + + int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy); + assert(rid > 0); + + if (isStandBy) { + syncStartStandBy(rid); + } else { + syncStart(rid); + } + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + if (isConfigChange) { + configChange(rid, 3, myIndex); + } + + //--------------------------- + int32_t alreadySend = 0; + while (1) { + char* s = syncNode2SimpleStr(pSyncNode); + + if (alreadySend < writeRecordNum) { + SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); + int32_t ret = syncPropose(rid, pRpcMsg, false); + if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + sTrace("%s value%d write not leader", s, alreadySend); + } else { + assert(ret == 0); + sTrace("%s value%d write ok", s, alreadySend); + } + alreadySend++; + + rpcFreeCont(pRpcMsg->pCont); + taosMemoryFree(pRpcMsg); + } else { + sTrace("%s", s); + } + + taosMsleep(1000); + taosMemoryFree(s); + taosMsleep(1000); + } + + syncNodeRelease(pSyncNode); + syncStop(rid); + walClose(pWal); + syncIOStop(); + cleanup(); + return 0; +} diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index fc840691b6a62753b4c3950d9c5df81c472c1b86..72ab250709718e5013911220b4801e79c487eb88 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -137,7 +137,9 @@ static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); -static int reallocConnRefHandle(SSrvConn* conn); +static void destroyConnRegArg(SSrvConn* conn); + +static int reallocConnRefHandle(SSrvConn* conn); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); @@ -429,6 +431,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (smsg->type == Release) { pHead->msgType = 0; pConn->status = ConnNormal; + + destroyConnRegArg(pConn); transUnrefSrvHandle(pConn); } else { pHead->msgType = pMsg->msgType; @@ -800,6 +804,12 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } +static void destroyConnRegArg(SSrvConn* conn) { + if (conn->regArg.init == 1) { + transFreeMsg(conn->regArg.msg.pCont); + conn->regArg.init = 0; + } +} static int reallocConnRefHandle(SSrvConn* conn) { uvReleaseExHandle(conn->refId); uvRemoveExHandle(conn->refId); @@ -827,16 +837,9 @@ static void uvDestroyConn(uv_handle_t* handle) { // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); - if (conn->regArg.init == 1) { - transFreeMsg(conn->regArg.msg.pCont); - conn->regArg.init = 0; - } QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); - if (conn->regArg.init == 1) { - transFreeMsg(conn->regArg.msg.pCont); - conn->regArg.init = 0; - } + destroyConnRegArg(conn); taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8a2a60d3e2f451277a59fd19a826b57dac6c6d05..b5e64242e4e50179352debf1989a97f70108eef5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,6 +322,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TB_NOT_EXIST, "Table not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_SMA_NOT_EXIST, "SMA not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_HASH_MISMATCH, "Hash value mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") + // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 0d16a2129af3b00f1fc078eb0af60c16e5abd5a4..c1053ce7de25170611ca7d5c243805ca82bacb2a 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -33,13 +33,13 @@ ENDIF() INCLUDE_DIRECTORIES(${TD_SOURCE_DIR}/src/util/inc) -# freelistTest -add_executable(freelistTest "") -target_sources(freelistTest - PRIVATE - "freelistTest.cpp" -) -target_link_libraries(freelistTest os util gtest gtest_main) +# # freelistTest +# add_executable(freelistTest "") +# target_sources(freelistTest +# PRIVATE +# "freelistTest.cpp" +# ) +# target_link_libraries(freelistTest os util gtest gtest_main) # # encodeTest # add_executable(encodeTest "encodeTest.cpp") diff --git a/source/util/test/freelistTest.cpp b/source/util/test/freelistTest.cpp deleted file mode 100644 index a445a16ad360fae3f8bede7bbf28c1c311e21656..0000000000000000000000000000000000000000 --- a/source/util/test/freelistTest.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include - -#include "tfreelist.h" - -TEST(TD_UTIL_FREELIST_TEST, simple_test) { - SFreeList fl; - - tFreeListInit(&fl); - - for (size_t i = 0; i < 1000; i++) { - void *ptr = NULL; - TFL_MALLOC(ptr, void*, 1024, &fl); - GTEST_ASSERT_NE(ptr, nullptr); - } - - tFreeListClear(&fl); -}