diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1810c49a36d3f86a1685af00acbe4a837ee8d1dc..c59d5184a9460413d3c1bccf32749f5a004f3b0d 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2678,7 +2678,7 @@ int32_t mndAddIndexImpl(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pSt if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; - if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, alterOriData, alterOriDataLen) != 0) goto _OVER; + if (mndSetAlterStbRedoActions2(pMnode, pTrans, pDb, pStb, sql, len) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; return code; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b91692498e521bb86a70420118db563329609d7d..3cfa0d89f3da3605cef2b6c83511e54499c29f36 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -129,7 +129,6 @@ int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaAlterCache(SMeta* pMeta, int32_t nPage); int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); -} int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int64_t metaGetTimeSeriesNum(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index c5f21b252d12215dc758480894509562218a1e7a..b066522f4e0d50a81ea5b2de5e0877a87068d271 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -390,8 +390,166 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { return 0; } int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { - // impl later + SMetaEntry oStbEntry = {0}; + SMetaEntry nStbEntry = {0}; + + TBC *pUidIdxc = NULL; + TBC *pTbDbc = NULL; + void *pData; + int nData; + int64_t oversion; + SDecoder dc = {0}; + int32_t ret; + int32_t c = -2; + tb_uid_t suid = pReq->suid; + + // get super table + if (tdbTbGet(pMeta->pUidIdx, &suid, sizeof(tb_uid_t), &pData, &nData) != 0) { + ret = -1; + goto _err; + } + + // tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL); + // ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c); + // if (ret < 0 || c) { + // tdbTbcClose(pUidIdxc); + + // terrno = TSDB_CODE_TDB_STB_NOT_EXIST; + // return -1; + //} + + // ret = tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + // if (ret < 0) { + // tdbTbcClose(pUidIdxc); + + // terrno = TSDB_CODE_TDB_STB_NOT_EXIST; + // return -1; + //} + + // oversion = ((SUidIdxVal *)pData)[0].version; + + // tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL); + // ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c); + // if (!(ret == 0 && c == 0)) { + // metaError("meta/table: invalide ret: %" PRId32 " or c: %" PRId32 "alter stb failed.", ret, c); + // return -1; + // } + // ret = tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); + // if (ret < 0) { + // tdbTbcClose(pTbDbc); + + // terrno = TSDB_CODE_TDB_STB_NOT_EXIST; + // return -1; + //} + + oStbEntry.pBuf = taosMemoryMalloc(nData); + memcpy(oStbEntry.pBuf, pData, nData); + tDecoderInit(&dc, oStbEntry.pBuf, nData); + metaDecodeEntry(&dc, &oStbEntry); + + nStbEntry.version = version; + nStbEntry.type = TSDB_SUPER_TABLE; + nStbEntry.uid = pReq->suid; + nStbEntry.name = pReq->name; + nStbEntry.stbEntry.schemaRow = pReq->schemaRow; + nStbEntry.stbEntry.schemaTag = pReq->schemaTag; + + metaWLock(pMeta); + // compare two entry + if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) { + metaSaveToSkmDb(pMeta, &nStbEntry); + } + + // update table.db + metaSaveToTbDb(pMeta, &nStbEntry); + + // update uid index + metaUpdateUidIdx(pMeta, &nStbEntry); + + // metaStatsCacheDrop(pMeta, nStbEntry.uid); + + metaULock(pMeta); + + // Get target schema info + SSchemaWrapper *pTagSchema = &nStbEntry.stbEntry.schemaTag; + if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + + char *tagName = "tag_col"; + SSchema *pCol = NULL; + int32_t iCol = 0; + for (;;) { + pCol = NULL; + if (iCol >= pTagSchema->nCols) break; + pCol = &pTagSchema->pSchema[iCol]; + if (strcmp(pCol->name, tagName) == 0) break; + iCol++; + } + if (iCol == 0) { + terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; + goto _err; + } + if (pCol == NULL) { + terrno = TSDB_CODE_VND_COL_NOT_EXISTS; + goto _err; + } + + /* + * iterator all pTdDbc by uid and version + */ + TBC *pCtbIdxc = NULL; + tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL); + int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); + if (rc < 0) { + tdbTbcClose(pCtbIdxc); + goto _err; + } + for (;;) { + void *pKey, *pVal; + int nKey, nVal; + rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, &pVal, &nVal); + if (rc < 0) break; + if (((SCtbIdxKey *)pKey)->suid != suid) { + tdbFree(pKey); + tdbFree(pVal); + continue; + } + STagIdxKey *pTagIdxKey = NULL; + int32_t nTagIdxKey; + + const void *pTagData = NULL; + int32_t nTagData = 0; + + SCtbIdxKey *table = (SCtbIdxKey *)pKey; + STagVal tagVal = {.cid = pCol->colId}; + tTagGet((const STag *)pVal, &tagVal); + if (IS_VAR_DATA_TYPE(pCol->type)) { + pTagData = tagVal.pData; + nTagData = (int32_t)tagVal.nData; + } else { + pTagData = &(tagVal.i64); + nTagData = tDataTypes[pCol->type].bytes; + } + + if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, table->uid, &pTagIdxKey, &nTagIdxKey) < + 0) { + metaDestroyTagIdxKey(pTagIdxKey); + goto _err; + } + tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); + metaDestroyTagIdxKey(pTagIdxKey); + } + + if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); + tDecoderClear(&dc); + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + return TSDB_CODE_SUCCESS; +_err: + return TSDB_CODE_VND_COL_ALREADY_EXISTS; } int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { // impl later @@ -1292,7 +1450,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb goto _err; } - // Get targe schema info + // Get target schema info SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) { terrno = TSDB_CODE_VND_COL_ALREADY_EXISTS; @@ -1352,7 +1510,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb pTagData = &(tagVal.i64); nTagData = tDataTypes[pCol->type].bytes; } - if (metaCreateTagIdxKey(uid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) { + if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) { metaDestroyTagIdxKey(pTagIdxKey); goto _err; } diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 0bb454571adebae9695ae07cf7ff6af4efd9e95f..ffca6ad97fb9575b68730904ccc910dfbd3ce9c5 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -22,8 +22,8 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later #define MEM_TERM_LIMIT 10 * 10000 -#define MEM_THRESHOLD 8 * 512 * 1024 // 8M -#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20 +#define MEM_THRESHOLD 128 * 1024 * 1024 // 8M +#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 5 #define MEM_ESTIMATE_RADIO 1.5 static void idxMemRef(MemTable* tbl);