diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ec528f5827953d300e62b30a054da6b87eb239e1..611a273353f777d86fce6e498bedce2df2a599f9 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1498,14 +1498,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) { if (strcasecmp(option, "keepTimeOffset") == 0) { int32_t newKeepTimeOffset = atoi(value); - if (newKeepTimeOffset < 0 || newKeepTimeOffset > 23) { - uError("failed to set keepTimeOffset from %d to %d. Valid range: [0, 23]", tsKeepTimeOffset, newKeepTimeOffset); - return; - } - uInfo("keepTimeOffset set from %d to %d", tsKeepTimeOffset, newKeepTimeOffset); tsKeepTimeOffset = newKeepTimeOffset; - return; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index bb92bfb4c7607aab971d576252f3cfc295aef2f5..100513b93249574a7a139ca7dbc6a0a61fbf22b1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -70,6 +70,8 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); +static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pInMCfgReq, int32_t opLen, int32_t *pOutValue); + int32_t mndInitDnode(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_DNODE, @@ -947,7 +949,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { goto _OVER; } - mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", + mInfo("dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s", dropReq.dnodeId, dropReq.fqdn, dropReq.port, dropReq.force?"true":"false", dropReq.unsafe?"true":"false"); if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { goto _OVER; @@ -987,7 +989,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs()); - + if (isonline && force) { terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE; mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(), @@ -1060,6 +1062,20 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { strcpy(dcfgReq.config, "monitor"); snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); + } else if (strncasecmp(cfgReq.config, "keeptimeoffset", 14) == 0) { + int32_t optLen = strlen("keeptimeoffset"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag < 0 || flag > 23) { + mError("dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]", cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + return -1; + } + + strcpy(dcfgReq.config, "keeptimeoffset"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #ifdef TD_ENTERPRISE } else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) { int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE; @@ -1292,3 +1308,28 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +// get int32_t value from 'SMCfgDnodeReq' +static int32_t mndMCfgGetValInt32(SMCfgDnodeReq *pMCfgReq, int32_t opLen, int32_t *pOutValue) { + terrno = 0; + if (' ' != pMCfgReq->config[opLen] && 0 != pMCfgReq->config[opLen]) { + goto _err; + } + + if (' ' == pMCfgReq->config[opLen]) { + // 'key value' + if (strlen(pMCfgReq->value) != 0) goto _err; + *pOutValue = atoi(pMCfgReq->config + opLen + 1); + } else { + // 'key' 'value' + if (strlen(pMCfgReq->value) == 0) goto _err; + *pOutValue = atoi(pMCfgReq->value); + } + + return 0; + +_err: + mError("dnode:%d, failed to config keeptimeoffset since invalid conf:%s", pMCfgReq->dnodeId, pMCfgReq->config); + terrno = TSDB_CODE_INVALID_CFG; + return -1; +} diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 162e75d783112b86196f6261f023d8dcb15e5f84..d6537ef9926cd01bffd8f2ed985e9d3c7fff7d7f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1738,6 +1738,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa SSchema *pSrcSchema = &pStb->pColumns[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } @@ -1788,6 +1789,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, SSchema *pSrcSchema = &pStb->pColumns[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } @@ -1797,6 +1799,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, SSchema *pSrcSchema = &pStb->pTags[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; + pSchema->flags = pSrcSchema->flags; pSchema->colId = pSrcSchema->colId; pSchema->bytes = pSrcSchema->bytes; } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index ecda1d596aff2ff5632e1e333ca63422867a2ac1..54352d0a53216febb87163bb41f4849a886500b9 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -615,6 +615,31 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg* if (pCfg->ttl > 0) { *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " TTL %d", pCfg->ttl); } + + if (TSDB_SUPER_TABLE == pCfg->tableType || TSDB_NORMAL_TABLE == pCfg->tableType) { + int32_t nSma = 0; + for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { + if (IS_BSMA_ON(pCfg->pSchemas + i)) { + ++nSma; + } + } + + if (nSma < pCfg->numOfColumns) { + bool smaOn = false; + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA("); + for (int32_t i = 0; i < pCfg->numOfColumns; ++i) { + if (IS_BSMA_ON(pCfg->pSchemas + i)) { + if (smaOn) { + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ",`%s`", (pCfg->pSchemas + i)->name); + } else { + smaOn = true; + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "`%s`", (pCfg->pSchemas + i)->name); + } + } + } + *len += sprintf(buf + VARSTR_HEADER_SIZE + *len, ")"); + } + } } static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* pDbCfg, char* tbName, STableCfg* pCfg) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c4111ded9259ce9238862b53c874433422ef7f33..51da73d4d7b12d1eb84cded9c9921706cbc1f302 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) { bool compareStateKey(void* data, void* key) { if (!data || !key) { - return true; + return false; } SStateKeys* stateKey = (SStateKeys*)key; stateKey->pData = (char*)key + sizeof(SStateKeys); @@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); - pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); + pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size); + pCurWin->pStateKey = + (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pCurWin->pStateKey->type = pAggSup->stateKeyType; + pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys); + pCurWin->pStateKey->isNull = false; } if (code == TSDB_CODE_SUCCESS) { @@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; - pNextWin->winInfo.pOutputBuf = NULL; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); - code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0); + SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin); + int32_t nextSize = pAggSup->resultRowSize; + code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); + } else { + pNextWin->pStateKey = + (SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); + pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); + pNextWin->pStateKey->type = pAggSup->stateKeyType; + pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys); + pNextWin->pStateKey->isNull = false; + pNextWin->winInfo.isOutput = true; } pAggSup->stateStore.streamStateFreeCur(pCur); } @@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl SStateWindowInfo curWin = {0}; SStateWindowInfo nextWin = {0}; setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); + if (IS_VALID_SESSION_WIN(nextWin.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextWin.winInfo.pOutputBuf, &pAPI->stateStore); + } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); winRows = updateStateWindowInfo(&curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, pAggSup->pResultRows, pSeUpdated, pStDeleted); @@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; + SStateWindowInfo dummy = {0}; setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); + saveResult(curInfo.winInfo, pInfo->pStUpdated); + } + + if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + } + + if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); } } taosMemoryFree(pBuf);